# Spark Doris Connector

Spark Doris Connector can support reading data stored in Doris and writing data to Doris through Spark.

  • Support reading data from Doris.
  • Support Spark DataFrame batch/stream writing data to Doris
  • You can map the Doris table toDataFrame or RDD, it is recommended to useDataFrame.
  • Support the completion of data filtering on the Doris side to reduce the amount of data transmission.

# Version Compatibility

Connector Spark Doris Java Scala
1.0.0 2.x 0.12+ 8 2.11
1.0.0 3.x 0.12.+ 8 2.12

# Build and Install

Execute following command in dir extension/spark-doris-connector/:

Notice:

  1. If you have not compiled the doris source code as a whole, you need to compile the Doris source code first, otherwise the thrift command will not be found, and you need to execute sh build.sh in the incubator-doris directory.
  2. It is recommended to compile under the docker compile environment apache/incubator-doris:build-env-1.2 of doris, because the JDK version below 1.3 is 11, there will be compilation problems.
sh build.sh 3 ## spark 3.x version, the default is 3.1.2
sh build.sh 2 ## soark 2.x version, the default is 2.3.4

After successful compilation, the file doris-spark-1.0.0-SNAPSHOT.jar will be generated in the output/ directory. Copy this file to ClassPath in Spark to use Spark-Doris-Connector. For example, Spark running in Local mode, put this file in the jars/ folder. Spark running in Yarn cluster mode, put this file in the pre-deployment package.

# Example

# Read

# SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

# DataFrame

val dorisSparkDF = spark.read.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .load()

dorisSparkDF.show(5)

# RDD

import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
  tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
  cfg = Some(Map(
    "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
    "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
  ))
)

dorisSparkRDD.collect()

# Write

# SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE

# DataFrame(batch/stream)

## batch sink
val mockDataDF = List(
  (3, "440403001005", "21.cn"),
  (1, "4404030013005", "22.cn"),
  (33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  //other options
  //specify the fields to write
  .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
  .save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
  .option("startingOffsets", "latest")
  .option("subscribe", "$YOUR_KAFKA_TOPICS")
  .format("kafka")
  .load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
  .writeStream
  .format("doris")
  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  //other options
  //specify the fields to write
  .option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
  .start()
  .awaitTermination()

# Configuration

# General

Key Default Value Comment
doris.fenodes -- Doris FE http address, support multiple addresses, separated by commas
doris.table.identifier -- Doris table identifier, eg, db1.tbl1
doris.request.retries 3 Number of retries to send requests to Doris
doris.request.connect.timeout.ms 30000 Connection timeout for sending requests to Doris
doris.request.read.timeout.ms 30000 Read timeout for sending request to Doris
doris.request.query.timeout.s 3600 Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit
doris.request.tablet.size Integer.MAX_VALUE The number of Doris Tablets corresponding to an RDD Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the Spark side, but at the same time will cause greater pressure on Doris.
doris.batch.size 1024 The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Spark and Doris. Thereby reducing the extra time overhead caused by network delay.
doris.exec.mem.limit 2147483648 Memory limit for a single query. The default is 2GB, in bytes.
doris.deserialize.arrow.async false Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration
doris.deserialize.queue.size 64 Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true
doris.write.fields -- Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.
By default, all fields are written in the order of Doris table fields.

# SQL & Dataframe Configuration

Key Default Value Comment
user -- Doris username
password -- Doris password
doris.filter.query.in.max.count 100 In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side.

# RDD Configuration

Key Default Value Comment
doris.request.auth.user -- Doris username
doris.request.auth.password -- Doris password
doris.read.field -- List of column names in the Doris table, separated by commas
doris.filter.query -- Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.

# Doris & Spark Column Type Mapping

Doris Type Spark Type
NULL_TYPE DataTypes.NullType
BOOLEAN DataTypes.BooleanType
TINYINT DataTypes.ByteType
SMALLINT DataTypes.ShortType
INT DataTypes.IntegerType
BIGINT DataTypes.LongType
FLOAT DataTypes.FloatType
DOUBLE DataTypes.DoubleType
DATE DataTypes.StringType1
DATETIME DataTypes.StringType1
BINARY DataTypes.BinaryType
DECIMAL DecimalType
CHAR DataTypes.StringType
LARGEINT DataTypes.StringType
VARCHAR DataTypes.StringType
DECIMALV2 DecimalType
TIME DataTypes.DoubleType
HLL Unsupported datatype
  • Note: In Connector, DATE andDATETIME are mapped to String. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use String type to directly return the corresponding time readable text.