- 产品简介
- 快速开始
- 表设计
- 数据导入
- 数据提取
- 使用StarRocks
- 管理手册
- 参考手册
- SQL参考
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER CLUSTER
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE CLUSTER
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP CLUSTER
- DROP FILE
- ENTER
- INSTALL PLUGIN
- LINK DATABASE
- MIGRATE DATABASE
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW MIGRATIONS
- SHOW PLUGINS
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL ALTER
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- CREATE ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 数据类型
- 辅助命令
- 函数参考
- 日期函数
- convert_tz
- curdate
- current_timestamp
- curtime
- datediff
- date_add
- date_format
- date_sub
- date_trunc
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- minute
- month
- monthname
- now
- second
- str_to_date
- timediff
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- weekofyear
- year
- hours_diff
- minutes_diff
- months_diff
- seconds_diff
- weeks_diff
- years_diff
- year
- quarter
- timestamp
- time_to_sec
- str2date
- microseconds_add
- microseconds_sub
- 加密函数
- 地理位置函数
- 字符串函数
- append_trailing_char_if_absent
- ascii
- char_length
- concat
- concat_ws
- ends_with
- find_in_set
- get_json_double
- get_json_int
- get_json_string
- group_concat
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- rtrim
- money_format
- null_or_empty
- regexp_extract
- regexp_replace
- repeat
- reverse
- right
- rpad
- split
- split_part
- starts_with
- strleft
- strright
- hex
- unhex
- substr
- space
- parse_url
- JSON 函数
- JSON 函数和运算符
- JSON 构造函数
- JSON 查询和处理函数
- JSON 运算符
- 模糊/正则匹配函数
- 工具函数
- 聚合函数
- Bitmap函数
- 数组函数
- bit函数
- cast函数
- hash函数
- 条件函数
- 百分位函数
- 数学函数
- 日期函数
- 系统变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
- Release Notes
主键模型导入
StarRocks 支持通过导入任务,对主键模型的表进行数据变更(插入、更新和删除数据),并且支持部分更新。
内部实现
目前支持的导入数据方式有 Stream Load、Broker Load、Routine Load。
- 暂不支持通过 Spark Load 插入、更新和删除数据。
- 暂不支持通过 SQL DML 语句(INSERT、UPDATE、DELETE)插入、更新和删除数据,将在未来版本中支持。
导入时,所有操作默认为 UPSERT 操作,暂不支持区分 INSERT 和 UPDATE 操作。
值得注意的是,导入时,为同时支持 UPSERT 和 DELETE 操作,StarRocks 在 Stream Load、Broker Load 的创建任务语法中增加op字段,用于存储操作类型。在导入时,可以新增一列__op
,用于存储操作类型,取值为 0 时,代表 UPSERT 操作,取值为 1 时,代表 DELETE 操作。
建表时无需添加列
__op
。
通过 Stream Load 或 Broker Load 变更数据
Stream Load 和 Broker Load 导入数据的操作方式类似,根据导入的数据文件的操作形式有如下几种情况。这里通过一些例子来展示具体的导入操作:
1. 当导入的数据文件只有 UPSERT 操作时可以不添加 __op
列。可以指定 __op
为 UPSERT 操作,也可以不做任何指定,StarRocks 会默认导入为 UPSERT 操作。例如想要向表 t 中导入如下内容:
# 导入内容
0,aaaa
1,bbbb
2,\N
4,dddd
Stream Load 导入语句:
#不指定__op
curl --location-trusted -u root: -H "label:lineorder" \
-H "column_separator:," -T demo.csv \
http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
#指定__op
curl --location-trusted -u root: -H "label:lineorder" \
-H "column_separator:," -H " columns:__op ='upsert'" -T demo.csv \
http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
Broker Load 导入语句:
#不指定__op
load label demo_db.label1 (
data infile("hdfs://localhost:9000/demo.csv")
into table demo_tbl1
format as "csv"
) with broker "broker1";
#指定__op
load label demo_db.label2 (
data infile("hdfs://localhost:9000/demo.csv")
into table demo_tbl1
format as "csv"
set (__op ='upsert')
) with broker "broker1";
2. 当导入的数据文件只有 DELETE 操作时,只需指定__op
为 DELETE 操作。例如想要删除如下内容:
#导入内容
1, bbbb
4, dddd
注意:DELETE 操作虽然只用到 Primary Key 列,但同样要提供全部的列,与 UPSERT 操作保持一致。
Stream Load 导入语句:
curl --location-trusted -u root: -H "label:lineorder" -H "column_separator:," \
-H "columns:__op='delete'" -T demo.csv \
http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
Broker Load 导入语句:
load label demo_db.label3 (
data infile("hdfs://localhost:9000/demo.csv")
into table demo_tbl1
format as "csv"
set (__op ='delete')
) with broker "broker1";
3. 当导入的数据文件中同时包含 UPSERT 和 DELETE 操作时,需要指定额外的 __op
来表明操作类型。例如想要导入如下内容:
1,bbbb,1
4,dddd,1
5,eeee,0
6,ffff,0
注意:
- DELETE 操作虽然只用到 primary key 列,但同样要提供全部的列,与 UPSERT 操作保持一致。
- 上述导入内容表示删除 id 为 1、4 的行,添加 id 为 5、6 的行。
Stream Load 导入语句:
curl --location-trusted -u root: -H "label:lineorder" -H "column_separator:," \
-H " columns: c1,c2,c3,pk=c1,col0=c2,__op=c3 " -T demo.csv \
http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
其中,指定了 __op
为第三列。
Broker Load 导入语句:
load label demo_db.label4 (
data infile("hdfs://localhost:9000/demo.csv")
into table demo_tbl1
format as "csv"
(c1,c2,c3)
set (pk=c1,col0=c2,__op=c3)
) with broker "broker1";
其中,指定了 __op
为第三列。
更多关于 Stream Load 和 Broker Load 使用方法,请参考 STREAM LOAD 和 BROKER LOAD。
通过 Routine Load 变更数据
可以在创建 Routine Load 的语句中,在 columns 最后增加一列,指定为 __op
。在真实导入中,__op
为 0 则表示 UPSERT 操作,为 1 则表示 DELETE 操作。例如导入如下内容:
示例 1 导入 CSV 数据。
2020-06-23 2020-06-23 00: 00: 00 beijing haidian 1 -128 -32768 -2147483648 0
2020-06-23 2020-06-23 00: 00: 01 beijing haidian 0 -127 -32767 -2147483647 1
2020-06-23 2020-06-23 00: 00: 02 beijing haidian 1 -126 -32766 -2147483646 0
2020-06-23 2020-06-23 00: 00: 03 beijing haidian 0 -125 -32765 -2147483645 1
2020-06-23 2020-06-23 00: 00: 04 beijing haidian 1 -124 -32764 -2147483644 0
Routine Load 导入语句:
CREATE ROUTINE LOAD routine_load_basic_types_1631533306858 on primary_table_without_null
COLUMNS (k1, k2, k3, k4, k5, v1, v2, v3, __op),
COLUMNS TERMINATED BY '\t'
PROPERTIES (
"desired_concurrent_number" = "1",
"max_error_number" = "1000",
"max_batch_interval" = "5"
) FROM KAFKA (
"kafka_broker_list" = "localhgost:9092",
"kafka_topic" = "starrocks-data"
"kafka_offsets" = "OFFSET_BEGINNING"
);
示例 2 导入 JSON 数据,源数据中有字段表示 UPSERT 或者 DELETE 操作,比如下面常见的 Canal 同步到 Kafka 的数据样例(暂不支持同步 DDL 语句),type
可以表示本次操作的类型(支持取值为INSERT、UPDATE、DELETE)。
数据样例:
{
"data": [{
"query_id": "3c7ebee321e94773-b4d79cc3f08ca2ac",
"conn_id": "34434",
"user": "zhaoheng",
"start_time": "2020-10-19 20:40:10.578",
"end_time": "2020-10-19 20:40:10"
}],
"database": "center_service_lihailei",
"es": 1603111211000,
"id": 122,
"isDdl": false,
"mysqlType": {
"query_id": "varchar(64)",
"conn_id": "int(11)",
"user": "varchar(32)",
"start_time": "datetime(3)",
"end_time": "datetime"
},
"old": null,
"pkNames": ["query_id"],
"sql": "",
"sqlType": {
"query_id": 12,
"conn_id": 4,
"user": 12,
"start_time": 93,
"end_time": 93
},
"table": "query_record",
"ts": 1603111212015,
"type": "INSERT"
}
导入语句:
CREATE ROUTINE LOAD cdc_db.label5 ON cdc_table
COLUMNS(pk, col0, temp,__op =(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.data[0].query_id\",\"$.data[0].conn_id\",\"$.data[0].user\",\"$.data[0].start_time\",\"$.data[0].end_time\",\"$.type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "localhost:9092",
"kafka_topic" = "cdc-data",
"property.group.id" = "starrocks-group",
"property.client.id" = "starrocks-client",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
示例 3 导入 JSON 数据,源数据中有字段可以分别表示 UPSERT 和 DELETE 操作,可以不指定__op
。
数据样例:op_type 字段的取值只有 0 和 1,分别表示 UPSERT 和 DELETE 操作。
{"pk": 1, "col0": "123", "op_type": 0}
{"pk": 2, "col0": "456", "op_type": 0}
{"pk": 1, "col0": "123", "op_type": 1}
建表语句:
CREATE TABLE `demo_tbl2` (
`pk` bigint(20) NOT NULL COMMENT "",
`col0` varchar(65533) NULL COMMENT ""
) ENGINE = OLAP
PRIMARY KEY(`pk`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`pk`) BUCKETS 3
导入语句:
CREATE ROUTINE LOAD demo_db.label6 ON demo_tbl2
COLUMNS(pk,col0,__op)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.pk\",\"$.col0\",\"$.op_type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "localhost:9092",
"kafka_topic" = "pk-data",
"property.group.id" = "starrocks-group",
"property.client.id" = "starrocks-client",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
查询结果
mysql > select * from demo_db.demo_tbl2;
+------+------+
| pk | col0 |
+------+------+
| 2 | 456 |
+------+------+
Routine Load 更多使用方法请参考 ROUTINE LOAD。
部分更新【公测中】
自 StarRocks 2.2 起,主键模型的表支持部分更新,即只需要更新部分列。
本文以表demo为例进行说明,表demo包含id,name,age三列。
建表语句如下:
create table demo(
id int not null,
name string null default '',
age int not null default '0'
) primary key(id)
更新表 demo 中部分列时,比如只更新 id,name 两列(而保留 age 列不更新),则只需要给出如下表的两列的数据即可。
不过需要注意:
- 所更新的列必须包含主键列,这里是指 id 列。
- 所有行的列数必须相同(同普通 CSV 格式文件的要求),如下表中每行都是用逗号分割的2列数据。
0,aaaa
1,bbbb
2,\N
4,dddd
根据导入方式,执行相关命令。
- 如果通过Stream Load的方式导入,请执行如下命令。注意,需要设置
-H "partial_update:true"
,以指定为部分列更新,并且指定所需更新的列名"columns:id,name"
。Stream Load的具体设置方式,请参见STREAM LOAD。
curl --location-trusted -u root: \
-H "label:lineorder" -H "column_separator:," \
-H "partial_update:true" -H "columns:id,name" \
-T demo.csv http://localhost:8030/api/demo/demo/_stream_load
- 如果通过Broker Load的方式导入,请执行如下命令。注意,在 properties 中设置
"partial_update" = "true"
,指定为部分列更新,并且指定所需更新的列名set (id=c1, name=c2)
。Broker Load的具体设置方式,请参见BROKER LOAD。
load label demo.demo (
data infile("hdfs://localhost:9000/demo.csv")
into table t
format as "csv"
(c1, c2)
set (id=c1, name=c2)
) with broker "broker1"
properties (
"partial_update" = "true"
);
- 如果通过Routine Load的方式导入,请执行如下命令。注意,在 properties 中设置
"partial_update" = "true"
,指定为部分列更新,并且指定所需更新的列名COLUMNS (id, name)
。Routine Load的具体设置方式,请参见 ROUTINE LOAD。
CREATE ROUTINE LOAD routine_load_demo on demo
COLUMNS (id, name),
COLUMNS TERMINATED BY ','
PROPERTIES (
"partial_update" = "true"
) FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);