Design Background

The official flink-connector-jdbc provided by Flink is not enough to meet the import performance requirements, so we have added a new flink-connector-starrocks that is implemented internally by caching and bulk import by stream load.


Add com.starrocks.table.connector.flink.StarRocksDynamicTableSinkFactory to:src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.

Add the following two parts to pom.xml:

    <version>1.0.32-SNAPSHOT</version>  <!-- for flink-1.11 ~ flink-1.12 -->
    <version>1.0.32_1.13-SNAPSHOT</version>  <!-- for flink-1.13 -->

Use as follows:

// -------- sink with raw json string stream --------
fromElements(new String[]{
    "{\"score\": \"99\", \"name\": \"stephen\"}",
    "{\"score\": \"100\", \"name\": \"lebron\"}"
        // the sink options
            .withProperty("jdbc-url", "jdbc:mysql://fe_ip:query_port,fe_ip:query_port?xxxxx")
            .withProperty("load-url", "fe_ip:http_port;fe_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("", "json")
            .withProperty("", "true")

// -------- sink with stream transformation --------
class RowData {
    public int score;
    public String name;
    public RowData(int score, String name) {
    new RowData[]{
        new RowData(99, "stephen"),
        new RowData(100, "lebron")
        // the table structure
            .field("score", DataTypes.INT())
            .field("name", DataTypes.VARCHAR(20))
        // the sink options
            .withProperty("jdbc-url", "jdbc:mysql://fe_ip:query_port,fe_ip:query_port?xxxxx")
            .withProperty("load-url", "fe_ip:http_port;fe_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("", "\\x01")
            .withProperty("", "\\x02")
        // set the slots with streamRowData
        (slots, streamRowData) -> {
            slots[0] = streamRowData.score;
            slots[1] =;


// create a table with `structure` and `properties`
// Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory`
        "name VARCHAR," +
        "score BIGINT" +
    ") WITH ( " +
        "'connector' = 'starrocks'," +
        "'jdbc-url'='jdbc:mysql://fe_ip:query_port,fe_ip:query_port?xxxxx'," +
        "'load-url'='fe_ip:http_port;fe_ip:http_port'," +
        "'database-name' = 'xxx'," +
        "'table-name' = 'xxx'," +
        "'username' = 'xxx'," +
        "'password' = 'xxx'," +
        "'' = '\\x01'," +
        "'' = '\\x02'," +
        "'' = 'k1, v1'" +

The options of sink are as follows.

jdbc-urlYESNONEStringthis will be used to execute queries in starrocks.
load-urlYESNONEStringfe_ip:http_port;fe_ip:http_port separated with ';', which would be used to do the batch sinking.
database-nameYESNONEStringstarrocks database name
table-nameYESNONEStringstarrocks table name
usernameYESNONEStringstarrocks connecting username
passwordYESNONEStringstarrocks connecting password
sink.semanticNOat-least-onceStringat-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush.* won't work either).
sink.buffer-flush.max-bytesNO94371840(90M)Stringthe max batching size of the serialized data, range: [64MB, 10GB].
sink.buffer-flush.max-rowsNO500000Stringthe max batching rows, range: [64,000, 5000,000].
sink.buffer-flush.interval-msNO300000Stringthe flushing time interval, range: [1000ms, 3600000ms].
sink.max-retriesNO1Stringmax retry times of the stream load request, range: [0, 10].
sink.connect.timeout-msNO1000StringTimeout in millisecond for connecting to the load-url, range: [100, 60000].*NONONEStringthe stream load properties like '' = 'k1, k2, k3'.


  • To achieve exactly-once, the two-phase commit mechanism of the external system is required. Since StarRocks does not have this mechanism, we need to rely on Flink's checkpoint-interval to save the batch data and its label at each checkpoint, and block flush all the cached data at the first invoke after the checkpoint is completed. However, if StarRocks hangs, it will cause the user's Flink sink stream algorithm to block for a long time and cause Flink's monitoring alerts or force kills.

  • CSV format is used for import by default, and users can customize the row separator and column separator by specifying '' = '\\x02' (supported in version above StarRocks-1.15.0) and '' = '\\ x01'.

  • If you encounter an import stop, try increasing the memory of the Flink job.