Spark StarRocks Connector

The Spark StarRocks Connector reads data stored in StarRocks via Spark.

  • The current version only supports reading data from StarRocks.
  • Support mapping StarRocks tables to DataFrame or RDD.DataFrame is more recommended.
  • Support data filtering on the StarRocks side to reduce the amount of data being transferred.

Version Requirements

ConnectorSparkJavaScala
1.0.02.x82.11

Usage examples

Code Reference: https://github.com/StarRocks/demo/tree/master/SparkDemo

SQL

CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
  "table.identifier"="$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
  "fenodes"="$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
  "user"="$YOUR_STARROCKS_USERNAME",
  "password"="$YOUR_STARROCKS_PASSWORD"
);

SELECT * FROM spark_starrocks;

DataFrame

val starrocksSparkDF = spark.read.format("starrocks")
  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
 .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT")
  .option("user", "$YOUR_STARROCKS_USERNAME")
  .option("password", "$YOUR_STARROCKS_PASSWORD")
  .load()

starrocksSparkDF.show(5)

RDD

import org.apache.starrocks.spark._
val starrocksSparkRDD = sc.starrocksRDD(
  tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
  cfg = Some(Map(
    "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESFUL_PORT",
    "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
    "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
  ))
)

starrocksSparkRDD.collect()

Configuration

General configuration

KeyDefault ValueComment
starrocks.fenodes--http address of StarRocks FE, multiple addresses supported, separated by commas
starrocks.table.identifier--table name of StarRocks (e.g. db1.tbl1)
starrocks.request.retries3number of retry requests sent to StarRocks
starrocks.request.connect.timeout.ms30000requests connection timeout sent to StarRocks
starrocks.request.read.timeout.ms30000requests read timeout sent to StarRocks
starrocks.request.query.timeout.s3600Query the timeout time of StarRocks, the default value is 1 hour, -1 means no timeout limit
starrocks.request.tablet.sizeInteger.MAX_VALUEThe number of StarRocks Tablets for an RDD Partition. The smaller this value is, the more partitions will be generated, which increases Spark's parallelism and puts more pressure on StarRocks.
starrocks.batch.size1024The maximum number of data rows to read from BE at a time. Increasing this value reduces the number of connections established between Spark and StarRocks and therefore mitigates overhead caused by network latency.
starrocks.exec.mem.limit2147483648Memory limit for a single query. Default to 2GB, in bytes
starrocks.deserialize.arrow.asyncfalseWhether to support asynchronous conversion of Arrow format to the RowBatch required for spark-starrocks-connector iteration.
starrocks.deserialize.queue.size64Internal processing queue for asynchronous conversion of Arrow format, effective when starrocks.deserialize.arrow.async is true.

SQL and Dataframe Configuration

KeyDefault ValueComment
user--StarRocks username
password--StarRocks password
starrocks.filter.query.in.max.count100The maximum number of elements of an “in” expression’s value list in the predicate pushdown. Beyond this number, the filtering of the in expression is handled in Spark.

RDD Configuration

KeyDefault ValueComment
starrocks.request.auth.user--StarRocks username
starrocks.request.auth.password--StarRocks password
starrocks.read.field--Retrieves a list of column names from the StarRocks table, with multiple columns separated by commas.
starrocks.filter.query--StarRocks uses this expression to complete the source-side data filtering.

StarRocks and Spark column type mapping

StarRocks TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType
DATETIMEDataTypes.StringType
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
  • Note: In Connector, DATE and DATETIME are mapped to String. Due to the operating mechanism of StarRock’s storage engine, the time range covered when using the time type cannot meet the demand. It’s recommended to use the String type to retrieve the data being read within the corresponding time.