Routine Load

The Routine load method supports continuous import of data from Kafka. Routine load can be paused, restarted, stopped by SQL. This section introduces the basic principle and usage of this method.

Terminology explanation

  • RoutineLoadJob: A routine import job submitted by the user.
  • JobScheduler: Routine import job scheduler for scheduling and splitting a RoutineLoadJob into multiple subtasks.
  • Task: Subtasks of RoutineLoadJob split by JobScheduler according to rules.
  • TaskScheduler: Task scheduler for scheduling the execution of a task.

Fundamentals

routine load

The import process is illustrated above.

  1. User submits a Kafka import job to the FE via a client that supports the MySQL protocol.
  2. The FE splits the job into several tasks, each of which is responsible for importing a specified portion of the data.
  3. Each task is assigned to a specified BE for execution. On the BE, a task is treated as a normal import task, and performed according to the stream load import mechanism.
  4. The BE reports to the FE when the import is completed.
  5. Based on the report result, the FE continues to generate new tasks or retry the failed task.
  6. The FE keeps generating new tasks to complete the data import without interruption.

Import example

Environment requirements

  • It is supported to access Kafka clusters without authentication or with SSL authentication.
  • Supported CSV text format with one line per message and no line breaks at the end.
  • Only Kafka versions 0.10.0.0 and above are supported.

Create import task

Syntax:

CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
    [COLUMNS TERMINATED BY "column_separator" ,]
    [COLUMNS (col1, col2, ...) ,]
    [WHERE where_condition ,]
    [PARTITION (part1, part2, ...)]
    [PROPERTIES ("key" = "value", ...)]
    FROM [DATA_SOURCE]
    [(data_source_properties1 = 'value1', 
    data_source_properties2 = 'value2', 
    ...)]

Example:

An example of importing data from a local Kafka cluster:

CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
PROPERTIES
(
    "desired_concurrent_number"="1",
    "max_error_number"="1000"
)
FROM KAFKA
(
    "kafka_broker_list"= "localhost:9092",
    "kafka_topic" = "starrocks-load"
);

Description:

  • job_name: Required. The name of the imported job. The name of the imported database can be used as a prefix of job_name. A common naming convention is timestamp + table name.
  • table_name: Required. The name of the target table to be imported.
  • COLUMN TERMINATED clause: Optional. Used to specify the column separator in the source data file. The separator defaults to t.
  • COLUMN clause : Optional. Used to specify the mapping relationship between the columns of the source data and the columns of the table.
    • Mapped columns: If the target table has three columns col1, col2, col3, and the source data has four columns, where columns 1, 2, and 4 correspond to col2, col1, col3 respectively, then it is written as follows: COLUMNS (col2, col1, temp, col3), where temp column is a non-existent column, which is used to skip the third column in the source data.
    • Derived columns: In addition to directly importing the column of the source data, StarRocks can process operations on it. Suppose a fourth column col4 is added after the target table and its result is generated by col1 + col2, then it can be written as follows: COLUMNS (col2, col1, temp, col3, col4 = col1 + col2).
  • WHERE clause: Optional. Used to specify filtering conditions to filter out unwanted rows. The filter condition can specify mapped or derived columns. For example, to import only rows where k1 is greater than 100 and k2 is equal to 1000, it would be written as follows: WHERE k1 > 100 and k2 = 1000.
  • PARTITION clause: Optional. Used to specify to which partitions of the target table to import. If not specified, it will be automatically imported to the corresponding partition.
  • PROPERTIES clause: Optional. Used to specify the general parameters of the import job.
  • desired_concurrent_number: Import concurrency. Used to specify the maximum number of subtasks for one import job. This value must be greater than 0. The default value is 3.
  • max_batch_interval: The maximum execution time per subtask, in "seconds". The value range is 5 to 60. The default value is 10. After Version 1.15, this parameter is the scheduling time of subtasks, i.e. how often the task is executed. The data consumption time of a task is routine_load_task_consume_second in fe.conf with a default value of 3s.The execution timeout of a task is routine_load_task_timeout_second in fe.conf with a default value of 15s.
  • max_batch_rows: The maximum number of rows to be read per subtask. The value must be greater than or equal to 200000. The default value is 200000. * After version 1.15, this parameter is only used to define the range of error detection, which is 10.
  • max_batch_size: The maximum number of bytes to be read per subtask, in "bytes". The value range is 100MB to 1GB. The default value is 100MB. * After version 1.15, this parameter is deprecated. The data consumption time of a task is routine_load_task_consume_second in fe.conf with a default value of 3s.
  • max_error_number: The maximum number of error rows allowed within the sampling window. The default value is 0, i.e. no error rows are allowed. Note: rows that are filtered out by the where condition are not considered error rows.
  • strict_mode: If or not strict mode is enabled. The default value is on. When turned on, non-null raw data will be filtered if the column type transformation results in NULL. Strict mode can be turned off with "strict_mode" = "false".
  • timezone: Specify the timezone used by the import job. The default value is the session's timezone parameter. This parameter affects the results of all time zone-related functions involved in the import.
  • DATA_SOURCE: Specify the data source, KAFKA.
    • data_source_properties: Specify the information related to the data source.
    • kafka_broker_list: Kafka's broker connection information, in the format ip:host. Multiple brokers are separated by commas.
    • kafka_topic: Specify the topic of Kafka to subscribe to.
    • kafka_partitions/kafka_offsets: Specify the kafka partitions to subscribe to, and the starting offset of each partition.
    • property: the properties here mainly means kafka-related properties, and are functionally equivalent to the "--property" parameter in the kafka shell.

View job status

  • Displays all routine import jobs (including stopped or canceled jobs) under [database]. The result is one or more rows.

    USE [database];
    SHOW ALL ROUTINE LOAD;
  • Displays the currently running routine import job under [database] with the name job_name.

    SHOW ROUTINE LOAD FOR [database.][job_name];

Note: StarRocks can only view tasks that are currently running. Closed and unstarted tasks cannot be viewed.

Using the import task created above as an example, the following command allows you to view all running routine load jobs.

MySQL [load_test]> SHOW ROUTINE LOAD\G;

*************************** 1. row ***************************

    Id: 14093

    Name: routine_load_wikipedia

   CreateTime: 2020-05-16 16:00:48

   PauseTime: N/A

   EndTime: N/A

   DbName: default_cluster:load_test

   TableName: routine_wiki_edit

    State: RUNNING

 DataSourceType: KAFKA

 CurrentTaskNum: 1

  JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}

DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}

 CustomProperties: {}

   Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}

   Progress: {"0":"13634667"}

ReasonOfStateChanged:

  ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8

   OtherMsg:

1 row in set (0.00 sec)

The import job named routine_load_wikipedia is created in the example, where the important fields are defined as follows:

  • State: the status of the import job. RUNNING indicates that the import task is in continuous operation.

  • Statistics are the logs of the import job.

  • receivedBytes: The size of the received data, in "Byte".

  • errorRows: The number of import error rows

  • committedTaskNum: The number of FE committed tasks

  • loadedRows: The number of rows that have been imported

  • loadedRowsRate: The rate of importing data, in "rows/s".

  • abortedTaskNum: The number of failed tasks in BE

  • totalRows: The total number of rows received

  • unselectedRows: The number of rows filtered by the where condition

  • receivedBytesRate: The rate of receiving data, in "Bytes/s

  • taskExecuteTimeMs: The time taken to import, in "ms

  • ErrorLogUrls: Error message log, accessible by URL

Suspend the import job

After using the PAUSE statement, the import job enters the PAUSED state. However, the job is not extinguished and can be restarted with the RESUME statement.

  • Suspend the routine import job with the name job_name.
PAUSE ROUTINE LOAD FOR [job_name];
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: 2020-05-16 16:03:39
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: PAUSED
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
      Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
      Progress: {"0":"13824771"}
ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'}
    ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
      OtherMsg:
1 row in set (0.01 sec)

After suspending the import job, the state of the job changes to PAUSED and the import information in statistics updating. At this point, the job is not extinguished and you can view the paused import job by using the SHOW ROUTINE LOAD statement.

Resume the import job

After using the RESUME statement, the job will briefly enter the NEED_SCHEDULE state, indicating that the job is being rescheduled and will resume to the RUNNING state shortly.

  • Restart the routine import job with the name job_name.
RESUME ROUTINE LOAD FOR [job_name];
MySQL [load_test]> RESUME ROUTINE LOAD FOR routine_load_wikipedia;
Query OK, 0 rows affected (0.01 sec)
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: NEED_SCHEDULE
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
      Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
      Progress: {"0":"13824771"}
ReasonOfStateChanged:
    ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
        OtherMsg:
1 row in set (0.00 sec)
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: N/A
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: RUNNING
      DataSourceType: KAFKA
      CurrentTaskNum: 1
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
      Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7422000,"taskExecuteTimeMs":23623}
      Progress: {"0":"14024771"}
ReasonOfStateChanged:
    ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.108.172:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
            OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified

After restarting the import job, the state changes to NEED_SCHEDULE at the first query, indicating that the job is being rescheduled. The state changes to RUNNING at the second query, and the import information in statistics starts to update.

Stopping the import job

Use the STOP statement to put the import job into the STOP state. The data stops being imported and the import job cannot be resumed.

  • Stop a routine import job with the name job_name.
STOP ROUTINE LOAD FOR [job_name];
MySQL [load_test]> STOP ROUTINE LOAD FOR routine_load_wikipedia;
Query OK, 0 rows affected (0.01 sec)
MySQL [load_test]> SHOW ALL ROUTINE LOAD\G;
*************************** 1. row ***************************
                  Id: 14093
                Name: routine_load_wikipedia
          CreateTime: 2020-05-16 16:00:48
           PauseTime: N/A
             EndTime: 2020-05-16 16:08:25
              DbName: default_cluster:load_test
           TableName: routine_wiki_edit
               State: STOPPED
      DataSourceType: KAFKA
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
    CustomProperties: {}
      Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecuteTimeMs":47173}
      Progress: {"0":"16414875"}
ReasonOfStateChanged:
        ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.108.172:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.108.172:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
            OtherMsg:

After stopping the import job, the state of the job changes to STOP, and the import information in statistics will never be updated again. At this point, you cannot view the stopped import job with the SHOW ROUTINE LOAD statement.

Frequently Asked Questions

  • Q:Import job is PAUSE, error: Broker: Offset out of range

    A:Check the latest offset by SHOW ROUTINE LOAD, and use Kafka client to check if there is any data in the offset.

    Possible causes:

    • The future offset was specified when importing.
    • Kafka has already cleaned up the data of this offset before importing. You need to consider the import speed of StarRocks and set reasonable parameters for log cleanup (e.g.log.retention.hours, log.retention.bytes, etc.).