- StarRocks
- 产品简介
- 快速开始
- 表设计
- 数据导入
- 数据提取
- 使用StarRocks
- 管理手册
- 参考手册
- SQL参考
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE RESOURCE GROUP
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW FILE
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- CREATE ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 数据类型
- 辅助命令
- 函数参考
- 日期函数
- convert_tz
- curdate
- current_timestamp
- curtime
- datediff
- date_add
- date_format
- date_sub
- date_trunc
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- minute
- month
- monthname
- now
- second
- str_to_date
- timediff
- time_slice
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- year
- hours_diff
- minutes_diff
- months_diff
- seconds_diff
- weeks_diff
- years_diff
- quarter
- timestamp
- time_to_sec
- str2date
- microseconds_add
- microseconds_sub
- 加密函数
- 地理位置函数
- 字符串函数
- JSON 函数
- 模糊/正则匹配函数
- 工具函数
- 聚合函数
- Bitmap函数
- 数组函数
- bit函数
- cast函数
- hash函数
- 条件函数
- 百分位函数
- 数学函数
- 日期函数
- 系统变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
- Release Notes
MySQL 实时同步至StarRocks
功能简介
StarRocks 提供 Flink CDC connector、flink-connector-starrocks 和 StarRocks-migrate-tools(简称smt),实现 MySQL 数据实时同步至 StarRocks,满足业务实时场景的数据分析。
基本原理
通过 Flink CDC connector、flink-connector-starrocks 和 smt 可以实现 MySQL 数据的秒级同步至StarRocks。
如图所示,Smt 可以根据 MySQL 和 StarRocks 的集群信息和表结构自动生成 source table 和 sink table 的建表语句。
通过 Flink-cdc-connector 消费 MySQL 的 binlog,然后通过 Flink-connector-starrocks 写入 StarRocks。
操作步骤
修改/etc/my.cnf,开启 MySQL binlog
#开启 binlog 日志 log-bin =/var/lib/mysql/mysql-bin #log_bin = ON ##binlog 日志的基本文件名 #log_bin_basename =/var/lib/mysql/mysql-bin ##binlog 文件的索引文件,管理所有 binlog 文件 #log_bin_index =/var/lib/mysql/mysql-bin.index #配置 serverid server-id = 1 binlog_format = row
重启mysqld,然后可以通过 SHOW VARIABLES LIKE 'log_bin'; 确认是否已经打开。
下载 Flink, 推荐使用 1.13,最低支持版本 1.11。
下载 Flink CDC connector,请注意下载对应 Flink 版本的 Flink-MySQL-CDC。
下载 Flink-connector-starrocks,请注意 1.13 版本和 1.11/1.12 版本使用不同的 connector.
复制
flink-sql-connector-mysql-cdc-xxx.jar
,flink-connector-starrocks-xxx.jar
到flink-xxx/lib/
下载并解压 smt.tar.gz
解压并修改配置文件
Db
需要修改成 MySQL 的连接信息。be_num
需要配置成 StarRocks 集群的节点数(这个能帮助更合理的设置 bucket 数量)。[table-rule.1]
是匹配规则,可以根据正则表达式匹配数据库和表名生成建表的 SQL,也可以配置多个规则。flink.starrocks.*
是 StarRocks 的集群配置信息,参考 Flink-connector-starrocks 配置。[db] host = 192.168.1.1 port = 3306 user = root password = [other] # number of backends in StarRocks be_num = 3 # `decimal_v3` is supported since StarRocks-1.18.1 use_decimal_v3 = false # file to save the converted DDL SQL output_dir = ./result [table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030 flink.starrocks.load-url= 192.168.1.1:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000
执行 starrocks-migrate-tool,flink 和 starrocks 建表语句都生成在 result 目录下
$./starrocks-migrate-tool $ls result flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql
利用上一步生成的 StarRocks 的建表语句在 StarRocks 中建表
Mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.1.sql
执行如下语句,生成Flink table 并开始同步,同步任务会持续执行
需要确保flink集群已经启动,未启动可以使用flink/bin/start-cluster.sh启动
bin/sql-client.sh -f flink-create.1.sql
这个执行以后同步任务会持续执行
如果是 Flink 1.13 之前的版本可能无法直接执行脚本,需要逐行提交
观察任务状况
bin/flink list
上述命令可以查看 flink 同步任务状态,如果有任务请查看 log 日志,或者调整 conf/flink-conf.yaml 中的系统配置中内存和 slot,具体配置请参考 Flink 配置参数。
注意事项
如果有多组规则,需要给每一组规则匹配 database,table 和 flink-connector 的配置
[table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030 flink.starrocks.load-url= 192.168.1.1:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000 [table-rule.2] # pattern to match databases for setting properties database = ^database2.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030 flink.starrocks.load-url= 192.168.1.1:8030 flink.starrocks.username=root flink.starrocks.password= # 如果导入数据不方便选出合适的分隔符可以考虑使用 Json 格式,但是会有一定的性能损失, 使用方法:用以下参数替换 flink.starrocks.sink.properties.column_separator 和 flink.starrocks.sink.properties.row_delimiter 参数 flink.starrocks.sink.properties.strip_outer_array=true flink.starrocks.sink.properties.format=json
Flink.starrocks.sink 的参数可以参考上文,比如可以给不同的规则配置不同的导入频率等参数。
针对分库分表的大表可以单独配置一个规则,比如:有两个数据库 edu_db_1,edu_db_2,每个数据库下面分别有course_1,course_2 两张表,并且所有表的数据结构都是相同的,通过如下配置把他们导入StarRocks的一张表中进行分析。
[table-rule.3] # pattern to match databases for setting properties database = ^edu_db_[0-9]*$ # pattern to match tables for setting properties table = ^course_[0-9]*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url = jdbc: mysql://192.168.1.1:9030 flink.starrocks.load-url = 192.168.1.1:8030 flink.starrocks.username = root flink.starrocks.password = flink.starrocks.sink.properties.column_separator =\x01 flink.starrocks.sink.properties.row_delimiter =\x02 flink.starrocks.sink.buffer-flush.interval-ms = 5000
这样会自动生成一个多对一的导入关系,在StarRocks默认生成的表名是 course__auto_shard,也可以自行在生成的配置文件中修改。
如果在sql-client中命令行执行建表和同步任务,需要做对'\'字符进行转义
'sink.properties.column_separator' = '\\x01' 'sink.properties.row_delimiter' = '\\x02'