- StarRocks
- 产品简介
- 快速开始
- 表设计
- 数据导入
- 数据提取
- 使用StarRocks
- 管理手册
- 参考手册
- SQL参考
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE RESOURCE GROUP
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW FILE
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- CREATE ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 数据类型
- 辅助命令
- 函数参考
- 日期函数
- convert_tz
- curdate
- current_timestamp
- curtime
- datediff
- date_add
- date_format
- date_sub
- date_trunc
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- minute
- month
- monthname
- now
- second
- str_to_date
- timediff
- time_slice
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- year
- hours_diff
- minutes_diff
- months_diff
- seconds_diff
- weeks_diff
- years_diff
- quarter
- timestamp
- time_to_sec
- str2date
- microseconds_add
- microseconds_sub
- 加密函数
- 地理位置函数
- 字符串函数
- JSON 函数
- 模糊/正则匹配函数
- 工具函数
- 聚合函数
- Bitmap函数
- 数组函数
- bit函数
- cast函数
- hash函数
- 条件函数
- 百分位函数
- 数学函数
- 日期函数
- 系统变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
- Release Notes
Spark Load
Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks 集群的计算资源。主要用于 初次迁移、大数据量导入 StarRocks 的场景(数据量可到 TB 级别)。
本文介绍导入任务的操作流程(包括相关客户端配置、创建和查看任务等)、系统配置、最佳实践和常见问题。
支持的数据格式
- CSV
- ORC(2.0 版本之后支持)
- PARQUET(2.0 版本之后支持)
基本原理
用户通过 MySQL 客户端提交 Spark 类型导入任务,FE记录元数据并返回用户提交成功。
Spark Load 任务的执行主要分为以下几个阶段:
- 用户向 FE 提交 Spark Load 任务;
- FE 调度提交 ETL 任务到 Spark 集群执行。
- Spark 集群执行 ETL 完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
- ETL 任务完成后,FE 获取预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。
- BE 通过 Broker 读取数据,转化为 StarRocks 存储格式。
- FE 调度生效版本,完成导入任务。
下图展示了 Spark Load 的主要流程:
基本操作
使用 Spark Load导入数据,需要按照 创建资源 -> 配置 Spark 客户端 -> 配置 YARN 客户端 -> 创建 Spark Load 导入任务
流程执行,具体的各个部分介绍请参考下问描述。
配置 ETL 集群
Spark 作为一种外部计算资源在 StarRocks 中用来完成 ETL 工作,因此我们引入 Resource Management 来管理 StarRocks 使用的外部资源。
提交 Spark 导入任务之前,需要配置执行 ETL 任务的 Spark 集群。操作语法:
创建资源
示例:
-- yarn cluster 模式
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
"working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- yarn HA cluster 模式
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
"working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
"broker" = "broker1"
);
-- HDFS HA cluster 模式
CREATE EXTERNAL RESOURCE "spark2"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://myha",
"spark.hadoop.dfs.nameservices" = "myha",
"spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
"spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://myha/tmp/starrocks",
"broker" = "broker2",
"broker.dfs.nameservices" = "myha",
"broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
"broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
"broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
spark0,spark1,spark2
为 StarRocks 中配置的 Spark 资源的名字。
PROPERTIES 是 Spark 资源相关参数,如下:
- type:资源类型,必填,目前仅支持 spark。
spark.master
: 必填,目前支持 yarn。spark.submit.deployMode
: Spark 程序的部署模式,必填,支持 cluster,client 两种。spark.hadoop.fs.defaultFS
: master 为 yarn 时必填。- yarn resource manager 相关参数,master 为 yarn 时需要填写。
- 单点 resource manager 需要配置
spark.hadoop.yarn.resourcemanager.address
: 单点 resource manager 地址。- HA resource manager 需要配置,其中 hostname 和 address 任选一个配置。
spark.hadoop.yarn.resourcemanager.ha.enabled
: resource manager 启用 HA,设置为 true。spark.hadoop.yarn.resourcemanager.ha.rm-ids
: resource manager 逻辑 id 列表。spark.hadoop.yarn.resourcemanager.hostname.rm-id
: 对于每个 rm-id,指定 resource manager 对应的主机名。spark.hadoop.yarn.resourcemanager.address.rm-id
: 对于每个 rm-id,指定 host: port 以供客户端提交作业。- 其他参数为可选,参考 Spark Configuration
- working_dir: ETL 使用的目录。spark 作为 ETL 资源使用时必填。例如:hdfs://host: port/tmp/starrocks。
- broker: broker 名字。spark 作为 ETL 资源使用时必填。需要使用
ALTER SYSTEM ADD BROKER
命令提前完成配置。 broker.property_key
: broker 读取 ETL 生成的中间文件时需要指定的认证信息等,详细可参考 BROKER LOAD。
其他 Resource 详细参数请参考 CREATE RESOURCE
查看资源
show resources;
普通账户只能看到自己有 USAGE-PRIV 使用权限的资源。root 和 admin 账户可以看到所有的资源。
资源权限通过 GRANT REVOKE 来管理,目前仅支持 USAGE_PRIV 使用权限。可以将 USAGE-PRIV 权限赋予某个用户或者某个角色,角色的使用与之前一致。请参考 GRANT USAGE_PRIV。
配置 Spark 客户端
FE 底层通过执行 spark-submit
的命令去提交 spark 任务,因此需要为 FE 配置 spark 客户端,建议使用 2.4.5 或以上的 spark2 官方版本,spark 下载地址,下载完成后,请按步骤完成以下配置:
配置 SPARK-HOME 环境变量
spark_home_default_dir: FE 配置,Spark 客户端目录,此配置项默认为 FE 根目录下的
lib/spark2x
路径,此项不可为空。配置 SPARK 依赖包
spark_resource_path: Spark 客户端 jar 包压缩包。默认值 fe/lib/spark2x/jars/spark-2x.zip 文件,需要将 Spark 客户端下的 jars 文件夹内所有 jar 包归档打包成一个 zip 文件 spark-2x.zip
(默认以 spark-2x.zip 命名)
,若没有找到则会报文件不存在的错误。
当提交 spark load 任务时,会将归档好的依赖文件上传至远端仓库,默认仓库路径挂在 working_dir/{cluster_id}
目录下,并以--spark-repository--{resource-name}命名,表示集群内的一个 resource 对应一个远端仓库,远端仓库目录结构参考如下:
---spark-repository--spark0/
|---archive-1.0.0/
| |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar
| |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
|---archive-1.1.0/
| |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar
| |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
|---archive-1.2.0/
| |-...
除了 spark 依赖,FE 还会上传 DPP 的依赖包至远端仓库,若此次 spark load 提交的所有依赖文件都已存在远端仓库,那么就不需要再上传依赖,省下原来每次重复上传大量文件的时间。
配置 YARN 客户端
FE 底层通过执行 yarn 命令去获取正在运行的 application 的状态,以及杀死 application,因此需要为 FE 配置 yarn 客户端,建议使用 2.5.2 或以上的 hadoop2 官方版本(hadoop 下载地址),下载完成后,请按步骤完成以下配置:
- 配置 YARN 可执行文件路径
yarn_client_path: FE 配置,默认为 FE 根目录下的 lib/yarn-client/hadoop/bin/yarn
路径。
- 配置生成 YARN 所需的配置文件的路径(可选)
yarn_config_dir: FE 配置,默认会在 FE 根目录下的 lib/yarn-config
路径生成执行 yarn 命令所需的配置文件,目前生成的配置文件包括 core-site.xml
和 yarn-site.xml
。
创建导入任务
示例 1:上游数据源为 hdfs 文件的情况
LOAD LABEL db1.label1 #此处的label建议记录,可用于任务查询
(
DATA INFILE("hdfs://abc.com:8888/user/starRocks/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
),
DATA INFILE("hdfs://abc.com:8888/user/starRocks/test/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
where col1 > 1
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
示例 2:上游数据源是 hdfs orc 文件的情况
LOAD LABEL db1.label2
(
DATA INFILE("hdfs://abc.com:8888/user/starRocks/test/ml/file3")
INTO TABLE tbl3
COLUMNS TERMINATED BY ","
FORMAT AS "orc"
(col1, col2)
where col1 > 1
)
WITH RESOURCE 'spark0'
(
"spark.executor.memory" = "2g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
示例 3:上游数据源是 hive 表的情况
step 1: 新建 hive 资源
CREATE EXTERNAL RESOURCE hive0 properties ( "type" = "hive", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );
step 2: 新建 hive 外部表
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "resource" = "hive0", "database" = "tmp", "table" = "t1" );
step 3: 提交 load 命令,要求导入的 StarRocks 表中的列必须在 hive 外部表中存在。
LOAD LABEL db1.label3 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
创建导入的详细语法请参考 SPARK LOAD。这里主要介绍 Spark load 的创建导入语法中参数意义和注意事项。
- Label
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 Label。具体规则与 Broker Load 一致。
- 导入作业参数
导入作业参数主要指的是 Spark load 创建导入语句中的属于 opt_properties
部分的参数。导入作业参数是作用于整个导入作业的。规则与 Broker Load 一致。
- Spark 资源参数
Spark 资源需要提前配置到 StarRocks 系统中并且赋予用户 USAGE-PRIV 权限后才能使用 Spark Load。 当用户有临时性的需求,比如增加任务使用的资源而修改 Spark configs,可以在这里设置,设置仅对本次任务生效,并不影响 StarRocks 集群中已有的配置。
WITH RESOURCE 'spark0'
(
"spark.driver.memory" = "1g",
"spark.executor.memory" = "3g"
)
- 数据源为 hive 表时的导入
目前如果期望在导入流程中将 hive 表作为数据源,那么需要先新建一张类型为 hive 的外部表,然后提交导入命令时指定外部表的表名即可。
- 导入流程构建全局字典
适用于 StarRocks 表聚合列的数据类型为 bitmap 类型。 在 load 命令中指定需要构建全局字典的字段即可,格式为:StarRocks字段名称=bitmap_dict(hive表字段名称)
需要注意的是目前 只有在上游数据源为 hive 表 时才支持全局字典的构建。
查看导入任务
Spark Load 导入方式同 Broker Load 一样都是异步的,用户必须将创建导入的 Label 记录下来,并且在 SHOW LOAD
命令中使用 Label 来查看导入结果。查看导入的命令在所有导入方式中是通用的,具体语法可参考 SHOW LOAD。示例如下:
mysql > show load where label="label1"\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
JobDetails: {"ScannedRows":28133395, "TaskNumber":1, "FileNumber":1,"FileSize":200000}
返回结果集中参数的意义可参考 查看导入状态。
取消导入
当 Spark load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可参考 CANCEL LOAD 。示例如下:
CANCEL LOAD FROM db1 WHERE LABEL = "label1";
查看 Spark Launcher 提交日志
有时用户需要查看 spark 任务提交过程中产生的详细日志,日志默认保存在 FE 根目录下 log/spark_launcher_log
路径下,并以 spark-launcher-{load-job-id}-{label}.log 命名,日志会在此目录下保存一段时间,当 FE 元数据中的导入信息被清理时,相应的日志也会被清理,默认保存时间为 3 天。
相关系统配置
FE 配置: 下面配置属于 Spark Load 的系统级别配置,也就是作用于所有 Spark Load 导入任务的配置。主要通过修改 fe.conf 来调整配置值。
- enable-spark-load:开启 Spark load 和创建 resource 功能。默认为 false,关闭此功能。
- spark-load-default-timeout-second:任务默认超时时间为 86400 秒(1 天)。
- spark-home-default-dir:spark客户端路径 (fe/lib/spark2x) 。
- spark-resource-path:打包好的spark依赖文件路径(默认为空)。
- spark-launcher-log-dir:spark客户端的提交日志存放的目录(fe/log/spark-launcher-log)。
- yarn-client-path:yarn二进制可执行文件路径 (fe/lib/yarn-client/hadoop/bin/yarn) 。
- yarn-config-dir:yarn配置文件生成路径 (fe/lib/yarn-config) 。
最佳实践
全局字典
适用场景
目前StarRocks中BITMAP列是使用类库Roaringbitmap实现的,而Roaringbitmap的输入数据类型只能是整型,因此如果要在导入流程中实现对于BITMAP列的预计算,那么就需要将输入数据的类型转换成整型。
在StarRocks现有的导入流程中,全局字典的数据结构是基于Hive表实现的,保存了原始值到编码值的映射。
构建流程
1. 读取上游数据源的数据,生成一张 Hive 临时表,记为 hive-table。
2. 从 hive-table 中抽取待去重字段的去重值,生成一张新的Hive表,记为distinct-value-table。
3. 新建一张全局字典表,记为dict-table;一列为原始值,一列为编码后的值。
4. 将distinct-value-table与dict-table做left join,计算出新增的去重值集合,然后对这个集合使用窗口函数进行编码,此时去重列原始值就多了一列编码后的值,最后将这两列的数据写回dict-table。
5. 将dict-table与hive-table做join,完成hive-table中原始值替换成整型编码值的工作。
6. hive-table会被下一步数据预处理的流程所读取,经过计算后导入到StarRocks中。
Spark 程序导入
完整 spark load 导入示例,参考 github 上的 demo: sparkLoad2StarRocks
常见问题
Q:报错 When running with master 'yarn' either HADOOP-CONF-DIR or YARN-CONF-DIR must be set in the environment.
A:使用 Spark Load 时没有在 Spark 客户端的 spark-env.sh 配置 HADOOP-CONF-DIR 环境变量。
Q:提交 Spark job 时用到 spark-submit 命令,报错:Cannot run program "xxx/bin/spark-submit": error = 2, No such file or directory
A:使用 Spark Load 时
spark_home_default_dir
配置项没有指定或者指定了错误的 spark 客户端根目录。Q:报错 File xxx/jars/spark-2x.zip does not exist 错误。
A:使用 Spark Load 时 spark-resource-path 配置项没有指向打包好的 zip 文件,检查指向文件路径和文件名词是否一致。
Q:报错 yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn
A:使用 Spark Load 时 yarn-client-path 配置项没有指定 yarn 的可执行文件。
Q:报错 Cannot execute hadoop-yarn/bin/../libexec/yarn-config.sh
A:使用 CDH 的 Hadoop 时,需要配置 HADOOP_LIBEXEC_DIR 环境变量,由于 hadoop-yarn 和 hadoop 目录不同,默认 libexec 目录会找 hadoop-yarn/bin/../libexec,而 libexec 在 hadoop 目录下。
yarn application status
命令获取Spark任务状态报错导致导入作业失败。