- StarRocks
- Quick Start
- Table Design
- Data Loading
- Data Export
- Using StarRocks
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW FILE
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Data Types
- Auxiliary Commands
- Function Reference
- Date Functions
- Geographic Functions
- String Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON constructor functions
- JSON query and processing functions
- JSON operators
- Aggregate Functions
- Bitmap Functions
- Array Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Utility Functions
- System variables
- Error code
- System limits
- SQL Reference
- Administration
- FAQ
- Deploy
- Data Migration
- SQL
- Other FAQs
- Benchmark
- Release Notes
Load data into tables of Primary Key model
StarRocks supports load jobs. You can run load jobs to insert, update, or delete data from tables that use the Primary Key model. You can also run load jobs to update only portions of such tables. The support for partial updates is in preview.
Internal implementation
StarRocks supports the following data ingestion methods: stream load, broker load, and routine load.
Note:
You cannot insert, update, or delete data by performing a Spark load job.
You cannot insert, update, or delete data by executing the SQL DML statements INSERT and UPDATE. These operations will be supported in future StarRocks versions.
All load operations are UPSERT by default. INSERT operations are not distinguished from UPDATE operations. To support both UPSERT and DELETE operations during a load job, StarRocks provides a field named op
in the syntax that is used to create a stream or broker load job. The op
field is used to hold operation types. You can create a column named __op
when you start a load job. A value of 0
indicates an UPSERT operation, and a value of 1
indicates a DELETE operation.
Note: You do not need to add a column named
__op
when you create a table.
Update data by running a stream or broker load job
A stream load job runs in a similar way as a broker load job. The operations for a stream or broker load job vary based on the file into which you want to load data.
Example 1
If you perform only UPSERT operations on a file, you do not need to add the __op
column to the file. However, if you add the __op
column to the file, you can specify the operation type as UPSERT or leave the operation type unspecified for the column. Suppose that you want to load the following data into a table named t
by running a stream or broker load job:
# The data that you want to load into the table is as follows:
0,aaaa
1,bbbb
2,\N
4,dddd
Run a stream load job.
# Load the data without an operation type specified for the __op field column. curl --location-trusted -u root: -H "label:lineorder" \ -H "column_separator:," -T demo.csv \ http://localhost:8030/api/demo_db/demo_tbl1/_stream_load # Load the data with an operation type specified for the __op field column. curl --location-trusted -u root: -H "label:lineorder" \ -H "column_separator:," -H "columns:__op ='upsert'" -T demo.csv \ http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
Run a broker load job.
# Load the data without an operation type specified for the __op field column. load label demo_db.label1 ( data infile("hdfs://localhost:9000/demo.csv") into table demo_tbl1 columns terminated by "," format as "csv" ) with broker "broker1"; # Load the data with an operation type specified for the __op field column. load label demo_db.label2 ( data infile("hdfs://localhost:9000/demo.csv") into table demo_tbl1 columns terminated by "," format as "csv" set (__op ='upsert') ) with broker "broker1";
Example 2
If you perform only DELETE operations on a file, you only need to specify the operation type as DELETE for the __op
column. Suppose that you want to delete the following data by running a stream or broker load job:
# The data that you want to delete is as follows:
1, bbbb
4, dddd
Note: DELETE operations are performed only on the column that is defined as the primary key of the table. However, you still need to provide all columns of the table. This is the same for UPSERT operations.
Run a stream load job.
curl --location-trusted -u root: -H "label:lineorder" -H "column_separator:," \ -H "columns:__op='delete'" -T demo.csv \ http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
Run a broker load job.
load label demo_db.label3 ( data infile("hdfs://localhost:9000/demo.csv") into table demo_tbl1 columns terminated by "," format as "csv" set (__op ='delete') ) with broker "broker1";
Example 3
If you perform both UPSERT and DELETE operations on a file, you must use the __op
field to indicate the type of each operation. Suppose that you want to delete columns 1 and 4 and add columns 5 and 6 to a table:
1,bbbb,1
4,dddd,1
5,eeee,0
6,ffff,0
Note: DELETE operations are performed only on the column that is defined as the primary key of the table. However, you still need to provide all columns of the table. This is the same for UPSERT operations.
Run a stream load job.
curl --location-trusted -u root: -H "label:lineorder" -H "column_separator:," \ -H "columns: c1,c2,c3,pk=c1,col0=c2,__op=c3 " -T demo.csv \ http://localhost:8030/api/demo_db/demo_tbl1/_stream_load
In the job, column
3
is specified as the__op
column.Run a broker load job.
load label demo_db.label4 ( data infile("hdfs://localhost:9000/demo.csv") into table demo_tbl1 columns terminated by "," format as "csv" (c1,c2,c3) set (pk=c1,col0=c2,__op=c3) ) with broker "broker1";
In the job, column
3
is specified as the__op
column.
For more information about stream load and broker load, see Stream Load and Broker Load.
Update data by running a routine load job
You can add a column named __op
in the COLUMNS
keyword in the statement that is used to create a routine load job. A value of 0
in the __op
column indicates an UPSERT operation, and the value of 1
in the __op
column indicates a DELETE operation.
Example 1
Load CSV data.
For example, you want to load the following data:
2020-06-23 2020-06-23 00: 00: 00 beijing haidian 1 -128 -32768 -2147483648 0
2020-06-23 2020-06-23 00: 00: 01 beijing haidian 0 -127 -32767 -2147483647 1
2020-06-23 2020-06-23 00: 00: 02 beijing haidian 1 -126 -32766 -2147483646 0
2020-06-23 2020-06-23 00: 00: 03 beijing haidian 0 -125 -32765 -2147483645 1
2020-06-23 2020-06-23 00: 00: 04 beijing haidian 1 -124 -32764 -2147483644 0
Execute the following statement to create and run a routine load job:
CREATE ROUTINE LOAD routine_load_basic_types_1631533306858 on primary_table_without_null
COLUMNS (k1, k2, k3, k4, k5, v1, v2, v3, __op),
COLUMNS TERMINATED BY '\t'
PROPERTIES (
"desired_concurrent_number" = "1",
"max_error_number" = "1000",
"max_batch_interval" = "5"
) FROM KAFKA (
"kafka_broker_list" = "localhgost:9092",
"kafka_topic" = "starrocks-data"
"kafka_offsets" = "OFFSET_BEGINNING"
);
Example 2
Load JSON data, which contains a field to specify whether an operation is UPSERT or DELETE.
For example, you want to synchronize the following data from Canal into Apache Kafka®, with the type
field specified to indicate the operation type, which can be INSERT, UPDATE, or DELETE.
Note: Data definition language (DDL) operations cannot be synchronized from Canal to Apache Kafka®.
{
"data": [{
"query_id": "3c7ebee321e94773-b4d79cc3f08ca2ac",
"conn_id": "34434",
"user": "zhaoheng",
"start_time": "2020-10-19 20:40:10.578",
"end_time": "2020-10-19 20:40:10"
}],
"database": "center_service_lihailei",
"es": 1603111211000,
"id": 122,
"isDdl": false,
"mysqlType": {
"query_id": "varchar(64)",
"conn_id": "int(11)",
"user": "varchar(32)",
"start_time": "datetime(3)",
"end_time": "datetime"
},
"old": null,
"pkNames": ["query_id"],
"sql": "",
"sqlType": {
"query_id": 12,
"conn_id": 4,
"user": 12,
"start_time": 93,
"end_time": 93
},
"table": "query_record",
"ts": 1603111212015,
"type": "INSERT"
}
Execute the following statement to create and run a routine load job:
CREATE ROUTINE LOAD cdc_db.label5 ON cdc_table
COLUMNS(pk, col0, temp,__op =(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.data[0].query_id\",\"$.data[0].conn_id\",\"$.data[0].user\",\"$.data[0].start_time\",\"$.data[0].end_time\",\"$.type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "localhost:9092",
"kafka_topic" = "cdc-data",
"property.group.id" = "starrocks-group",
"property.client.id" = "starrocks-client",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Example 3
Load JSON data, which contains a field to indicate whether an operation is UPSERT or DELETE. You can choose not to specify the __op
field.
For example, you want to load the following data. The valid values of the op_type
field are 0
and 1
. A value of 0
indicates an UPSERT operation, and a value of 1
indicates a DELETE operation.
{"pk": 1, "col0": "123", "op_type": 0}
{"pk": 2, "col0": "456", "op_type": 0}
{"pk": 1, "col0": "123", "op_type": 1}
Execute the following statement to create a table:
CREATE TABLE `demo_tbl2` (
`pk` bigint(20) NOT NULL COMMENT "",
`col0` varchar(65533) NULL COMMENT ""
) ENGINE = OLAP
PRIMARY KEY(`pk`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`pk`) BUCKETS 3
Execute the following statement to create and run a routine load job:
CREATE ROUTINE LOAD demo_db.label6 ON demo_tbl2
COLUMNS(pk,col0,__op)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_error_number" = "1000",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.pk\",\"$.col0\",\"$.op_type\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "localhost:9092",
"kafka_topic" = "pk-data",
"property.group.id" = "starrocks-group",
"property.client.id" = "starrocks-client",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
The query result is as follows:
mysql > select * from demo_db.demo_tbl2;
+------+------+
| pk | col0 |
+------+------+
| 2 | 456 |
+------+------+
For more information about routine loads, see Routine Load.
[Preview] Update partial data
Note: Since v2.2, StarRocks supports updates to only specified columns of tables that use the Primary Key model.
In this section, a table named demo
is used as an example. The demo
table consists of three columns: id
, name
, and age
.
Execute the following statement to create a table named demo
:
create table demo(
id int not null,
name string null default '',
age int not null default '0'
) primary key(id)
If you update specified columns, such as columns id
and name
, of the demo
table, you need to provide only the data of columns id
and name
.
Note:
The columns that you specify must include the column that is defined as the primary key of the table. In this example, the primary key column is column
id
.All columns of the table must consist of the same number of rows. This requirement is the same for CSV files. In the following example, each line represents two columns, which are separated by a comma (,).
0,aaaa
1,bbbb
2,\N
4,dddd
Execute the following statements to update the data:
If you want to run a stream load job, execute the following statement:
curl --location-trusted -u root: \ -H "label:lineorder" -H "column_separator:," \ -H "partial_update:true" -H "columns:id,name" \ -T demo.csv http://localhost:8030/api/demo/demo/_stream_load
Note: In the preceding statement, you must specify the
-H "partial_update:true"
setting, and specify the columns that you want to update in the"columns:id,name"
format. For more information about the parameter settings for a stream load job, see Stream Load.If you want to run a broker load job, execute the following statement:
load label demo.demo ( data infile("hdfs://localhost:9000/demo.csv") into table t columns terminated by "," format as "csv" (c1, c2) set (id=c1, name=c2) ) with broker "broker1" properties ( "partial_update" = "true" );
Note: In the preceding statement, you must specify the
-H "partial_update:true"
setting in theproperties
class. Additionally, you must specify the columns that you want to update. For example, if you want to update two columns,c1
andc2
, specify theset (id=c1, name=c2)
setting in the preceding statement. For more information about the parameter settings for a stream load job, see Broker Load.If you want to run a routine load job, execute the following statement:
CREATE ROUTINE LOAD routine_load_demo on demo COLUMNS (id, name), COLUMNS TERMINATED BY ',' PROPERTIES ( "partial_update" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
Note: In the preceding statement, you must specify the
-H "partial_update:true"
setting, and specify the columns that you want to update in theCOLUMNS (id, name)
format. For more information about the parameter settings for a stream load job, see Routine Load.
References
For more information about the usage of the DELETE statement in the Primary Key model, see DELETE.