跳到主要内容
跳到主要内容

Spark Doris Connector

Spark Doris Connector

Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

代码库地址:https://github.com/apache/doris-spark-connector

  • 支持从Doris中读取数据
  • 支持Spark DataFrame批量/流式 写入Doris
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

版本兼容

ConnectorSparkDorisJavaScala
1.2.03.2, 3.1, 2.31.0 +82.12, 2.11
1.1.03.2, 3.1, 2.31.0 +82.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

编译与安装

准备工作

  1. 修改custom_env.sh.tpl文件,重命名为custom_env.sh

  2. 在源码目录下执行: sh build.sh 根据提示输入你需要的 Scala 与 Spark 版本进行编译。

编译成功后,会在 dist 目录生成目标jar包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 SparkClassPath 中即可使用 Spark-Doris-Connector

例如,Local 模式运行的 Spark,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。

例如将 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 上传到 hdfs 并在 spark.yarn.jars 参数上添加 hdfs 上的 Jar 包路径

  1. 上传 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 到hdfs。
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
  1. 在集群中添加 spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar 依赖。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar

使用Maven管理

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.2_2.12</artifactId>
<version>1.2.0</version>
</dependency>

注意

请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

使用示例

读取

SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

DataFrame

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

RDD

import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()

pySpark

dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
// show 5 lines data
dorisSparkDF.show(5)

写入

SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1","VALUE2",...);
# or
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE

DataFrame(batch/stream)

## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.save()

## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
//其它选项
//指定你要写入的字段
.option("doris.write.fields","$YOUR_FIELDS_TO_WRITE")
.start()
.awaitTermination()

Java示例

samples/doris-demo/spark-demo/ 下提供了 Java 版本的示例,可供参考,这里

配置

通用配置项

KeyDefault ValueComment
doris.fenodes--Doris FE http 地址,支持多个地址,使用逗号分隔
doris.table.identifier--Doris 表名,如:db1.tbl1
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个RDD Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.write.fields--指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照Doris表字段顺序写入全部字段。
sink.batch.size10000单次写BE的最大行数
sink.max-retries1写BE失败之后的重试次数
sink.properties.*--Stream Load 的导入参数。
例如: 'sink.properties.column_separator' = ', '
doris.sink.task.partition.size--Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。
doris.sink.task.use.repartitionfalse是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。
doris.sink.batch.interval.ms50每个批次sink的间隔时间,单位 ms。

SQL 和 Dataframe 专有配置

KeyDefault ValueComment
user--访问Doris的用户名
password--访问Doris的密码
doris.filter.query.in.max.count100谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

RDD 专有配置

KeyDefault ValueComment
doris.request.auth.user--访问Doris的用户名
doris.request.auth.password--访问Doris的密码
doris.read.field--读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query--过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。

Doris 和 Spark 列类型映射关系

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType1
DATETIMEDataTypes.StringType1
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype
  • 注:Connector中,将DATEDATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。