跳到主要内容

Spark Doris Connector

Spark Doris Connector 是 Apache Doris 与 Apache Spark 的连接器,支持通过 Spark 读取 Doris 中存储的数据,也支持通过 Spark 将数据写入 Doris。代码库地址:apache/doris-spark-connector

主要能力如下:

使用场景推荐方式说明
批量读取 Doris 数据DataFrame、Spark SQL也支持 RDD,推荐优先使用 DataFrame 或 Spark SQL。
批量写入 Doris 数据DataFrame、Spark SQL支持指定写入列,也支持从 1.3.0 版本开始的 Overwrite 模式。
流式写入 Doris 数据Structured Streaming支持普通结构化数据写入,也支持直接透传 DataFrame 第一列。
高速读取 Doris 数据Arrow Flight SQL从 24.0.0 版本开始支持,需要 Doris 版本 >= 2.1.0。
Catalog 方式访问 DorisSpark Doris Catalog从 24.0.0 版本开始支持,可通过 Spark Catalog 管理 Doris 数据库与表。

开始使用前

版本兼容

请先根据 Spark、Doris、Java 和 Scala 版本选择对应的 Connector 版本。

ConnectorSparkDorisJavaScala
26.0.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.2.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.1.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.13.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.03.5 - 3.1, 2.41.0 +82.12, 2.11
1.3.23.4 - 3.1, 2.4, 2.31.0 - 2.1.682.12, 2.11
1.3.13.4 - 3.1, 2.4, 2.31.0 - 2.1.082.12, 2.11
1.3.03.4 - 3.1, 2.4, 2.31.0 - 2.1.082.12, 2.11
1.2.03.2, 3.1, 2.31.0 - 2.0.282.12, 2.11
1.1.03.2, 3.1, 2.31.0 - 1.2.882.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

通过 Maven 添加依赖

在项目 pom.xml 中添加 Spark Doris Connector 依赖,并根据实际 Spark 和 Connector 版本替换 artifactIdversion

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-spark-3.5</artifactId>
<version>25.2.0</version>
</dependency>
提示

从 24.0.0 版本开始,Doris Connector 包命名规则发生调整:

  1. 不再包含 Scala 版本信息。
  2. 对于 Spark 2.x 版本,统一使用名称为 spark-doris-connector-spark-2 的包,并且默认只基于 Scala 2.11 版本编译。需要 Scala 2.12 版本时,请自行编译。
  3. 对于 Spark 3.x 版本,根据具体 Spark 版本使用名称为 spark-doris-connector-spark-3.x 的包,其中 Spark 3.0 版本可以使用 spark-doris-connector-spark-3.1 的包。

也可以从 Maven 仓库 下载对应版本的 Jar 包。

编译源码

如需自行编译,在源码目录下执行 sh build.sh,并根据提示输入需要的 Scala 与 Spark 版本。

编译成功后,目标 Jar 包会生成在 dist 目录下,例如 spark-doris-connector-spark-3.5-25.2.0.jar。将该文件复制到 Spark 的 classpath 中即可使用 Spark Doris Connector:

Spark 运行模式Jar 包放置方式
Local 模式将 Jar 包放入 jars/ 目录。
Yarn 集群模式将 Jar 包放入预部署包中。

例如,将 spark-doris-connector-spark-3.5-25.2.0.jar 上传到 HDFS,并通过 spark.yarn.jars 添加依赖:

# 1. 上传 spark-doris-connector-spark-3.5-25.2.0.jar 到 HDFS
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.2.0.jar /spark-jars/

# 2. 在集群中添加 spark-doris-connector-spark-3.5-25.2.0.jar 依赖
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.2.0.jar

场景一:批量读取 Doris 数据

Spark Doris Connector 支持通过 DataFrame、Spark SQL、RDD 和 PySpark 读取 Doris 数据。读取 Doris 数据时,推荐优先使用 DataFrame 或 Spark SQL。

通过 DataFrame 读取

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

通过 Spark SQL 读取

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

通过 RDD 读取

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()

通过 PySpark 读取

dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

# 展示 5 行数据
dorisSparkDF.show(5)

通过 Arrow Flight SQL 高速读取

从 24.0.0 版本开始,Spark Doris Connector 支持通过 Arrow Flight SQL 读取数据。该方式需要 Doris 版本 >= 2.1.0。

使用时需要设置以下参数:

参数说明
doris.read.mode设置为 arrow,表示使用 Arrow Flight SQL 方式读取。
doris.read.arrow-flight-sql.port设置为 FE 配置的 Arrow Flight SQL 端口。

服务端配置方式请参考 基于 Arrow Flight SQL 的高速数据传输链路

val df = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("doris.user", "$YOUR_DORIS_USERNAME")
.option("doris.password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.mode", "arrow")
.option("doris.read.arrow-flight-sql.port", "12345")
.load()

df.show()

场景二:批量写入 Doris 数据

Spark Doris Connector 支持通过 DataFrame 和 Spark SQL 批量写入 Doris 数据。

通过 DataFrame 写入

val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")

mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// 指定要写入的列
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
// 从 1.3.0 版本开始,支持覆盖写入
// .mode(SaveMode.Overwrite)
.save()

通过 Spark SQL 写入

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...);

-- insert into select
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE;

-- insert overwrite
INSERT OVERWRITE spark_doris SELECT * FROM YOUR_TABLE;

场景三:流式写入 Doris 数据

Spark Doris Connector 支持通过 Structured Streaming 写入 Doris。根据数据是否已经符合 Doris 表结构,可以选择结构化数据写入或直接透传第一列数据。

结构化数据写入

val df = spark.readStream.format("your_own_stream_source").load()

df.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()

直接写入数据流第一列

如果数据流的第一列数据符合 Doris 表结构,例如列顺序相同的 CSV 数据,或者字段名一致的 JSON 数据,可以将 doris.sink.streaming.passthrough 设置为 true,直接写入第一列数据,无需再将内容展开为多列 DataFrame 字段。

以下示例以 Kafka 源为例。假设要写入的 Doris 表结构如下:

CREATE TABLE `t2` (
`c0` int NULL,
`c1` varchar(10) NULL,
`c2` date NULL
) ENGINE=OLAP
DUPLICATE KEY(`c0`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

Kafka 消息的 value{"c0":1,"c1":"a","c2":"2024-01-01"} 格式的 JSON 数据。

val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.load()

// 选择 value 为 DataFrame 的第一列
kafkaSource.selectExpr("CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// 设置此选项为 true,会将 DataFrame 的第一列直接写入
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
.start()
.awaitTermination()

以 JSON 格式写入

设置 doris.sink.properties.formatjson 后,Connector 会按 JSON 格式将数据写入 Doris。

val df = spark.readStream.format("your_own_stream_source").load()

df.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.sink.properties.format", "json")
.start()
.awaitTermination()

场景四:通过 Spark Doris Catalog 访问 Doris

从 24.0.0 版本开始,Spark Doris Connector 支持通过 Spark Catalog 方式访问 Doris。

Catalog 配置项

选项名称是否必须说明
spark.sql.catalog.your_catalog_name设置 Catalog 提供者的类名。对于 Doris,唯一有效值为 org.apache.doris.spark.catalog.DorisTableCatalog
spark.sql.catalog.your_catalog_name.doris.fenodes设置 Doris FE 节点,格式为 fe_ip:fe_http_port
spark.sql.catalog.your_catalog_name.doris.query.port设置 Doris FE 查询端口。当 spark.sql.catalog.your_catalog_name.doris.fe.auto.fetchtrue 时,可以不设置该选项。
spark.sql.catalog.your_catalog_name.doris.user设置 Doris 用户。
spark.sql.catalog.your_catalog_name.doris.password设置 Doris 密码。
spark.sql.defaultCatalog设置 Spark SQL 默认 Catalog。
提示

所有适用于 DataFrame 和 Spark SQL 的连接器参数都可以设置到 Catalog 上。例如,如需以 JSON 格式写入数据,可以将 spark.sql.catalog.your_catalog_name.doris.sink.properties.format 设置为 json

在 DataFrame 程序中使用 Catalog

val conf = new SparkConf()
conf.set("spark.sql.catalog.your_catalog_name", "org.apache.doris.spark.catalog.DorisTableCatalog")
conf.set("spark.sql.catalog.your_catalog_name.doris.fenodes", "192.168.0.1:8030")
conf.set("spark.sql.catalog.your_catalog_name.doris.query.port", "9030")
conf.set("spark.sql.catalog.your_catalog_name.doris.user", "root")
conf.set("spark.sql.catalog.your_catalog_name.doris.password", "")
val spark = builder.config(conf).getOrCreate()
spark.sessionState.catalogManager.setCurrentCatalog("your_catalog_name")

// show all databases
spark.sql("show databases")

// use databases
spark.sql("use your_doris_db")

// show tables in test
spark.sql("show tables")

// query table
spark.sql("select * from your_doris_table")

// write data
spark.sql("insert into your_doris_table values(xxx)")

在 Spark SQL CLI 中使用 Catalog

设置必要参数并启动 Spark SQL CLI:

spark-sql \
--conf "spark.sql.catalog.your_catalog_name=org.apache.doris.spark.catalog.DorisTableCatalog" \
--conf "spark.sql.catalog.your_catalog_name.doris.fenodes=192.168.0.1:8030" \
--conf "spark.sql.catalog.your_catalog_name.doris.query.port=9030" \
--conf "spark.sql.catalog.your_catalog_name.doris.user=root" \
--conf "spark.sql.catalog.your_catalog_name.doris.password=" \
--conf "spark.sql.defaultCatalog=your_catalog_name"

在 Spark SQL CLI 中执行查询:

-- show all databases
show databases;

-- use databases
use your_doris_db;

-- show tables in test
show tables;

-- query table
select * from your_doris_table;

-- write data
insert into your_doris_table values(xxx);
insert into your_doris_table select * from your_source_table;

-- access table with full name
select * from your_catalog_name.your_doris_db.your_doris_table;
insert into your_catalog_name.your_doris_db.your_doris_table values(xxx);
insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table;

Java 示例

samples/doris-demo/spark-demo/ 下提供了 Java 版本的示例,可参考 apache/doris 示例目录

配置参考

通用配置项

KeyDefault ValueComment
doris.fenodes--Doris FE HTTP 地址,支持多个地址,使用逗号分隔。
doris.table.identifier--Doris 表名,如 db1.tbl1
doris.user--访问 Doris 的用户名。
doris.password空字符串访问 Doris 的密码。
doris.request.retries3向 Doris 发送请求的重试次数。
doris.request.connect.timeout.ms30000向 Doris 发送请求的连接超时时间。
doris.request.read.timeout.ms30000向 Doris 发送请求的读取超时时间。
doris.request.query.timeout.s21600查询 Doris 的超时时间,默认值为 6 小时,-1 表示无超时限制。
doris.request.tablet.size1一个 RDD Partition 对应的 Doris Tablet 个数。此数值设置越小,则会生成越多 Partition,从而提升 Spark 侧并行度,但同时会对 Doris 造成更大压力。
doris.read.field--读取 Doris 表的列名列表,多列之间使用逗号分隔。
doris.batch.size4064一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数,从而减轻网络延迟带来的额外时间开销。
doris.exec.mem.limit8589934592单个查询的内存限制,默认为 8 GB,单位为字节。
doris.write.fields--指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。默认写入时要按照 Doris 表字段顺序写入全部字段。
doris.sink.batch.size500000单次写 BE 的最大行数。
doris.sink.max-retries0写 BE 失败之后的重试次数。从 1.3.0 版本开始,默认值为 0,即默认不进行重试。当该参数大于 0 时,会进行批次级别的失败重试,并在 Spark Executor 内存中缓存 doris.sink.batch.size 所配置大小的数据,可能需要适当增大内存分配。
doris.sink.retry.interval.ms10000配置重试次数之后,每次重试的间隔,单位为 ms。
doris.sink.properties.formatcsvStream Load 的数据格式,共支持 csvjsonarrow 3 种格式。更多参数请参考 Stream Load 手册
doris.sink.properties.*--Stream Load 的导入参数。例如,通过 'doris.sink.properties.column_separator' = ',' 指定列分隔符。更多参数请参考 Stream Load 手册
doris.sink.task.partition.size--Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作后,最后写入的 Partition 数可能较大,但每个 Partition 对应的记录数较少,导致写入频率增加和计算资源浪费。此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。
doris.sink.task.use.repartitionfalse是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制。注意,如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低。如果设置为 true,则采用 repartition 方式。注意,可设置最后 Partition 数,但会额外增加 shuffle 开销。
doris.sink.batch.interval.ms0每个批次 Sink 的间隔时间,单位为 ms。
doris.sink.enable-2pcfalse是否开启两阶段提交。开启后将在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务回滚。
doris.sink.auto-redirecttrue是否重定向 Stream Load 请求。开启后 Stream Load 将通过 FE 写入,不再显式获取 BE 信息。
doris.enable.httpsfalse是否开启 FE HTTPS 请求。
doris.https.key-store-path-HTTPS key store 路径。
doris.https.key-store-typeJKSHTTPS key store 类型。
doris.https.key-store-password-HTTPS key store 密码。
doris.read.modethriftDoris 读取模式,可选项为 thriftarrow
doris.read.arrow-flight-sql.port-Doris FE 的 Arrow Flight SQL 端口。当 doris.read.modearrow 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式请参考 基于 Arrow Flight SQL 的高速数据传输链路
doris.sink.label.prefixspark-dorisStream Load 方式写入时的导入标签前缀。
doris.thrift.max.message.size2147483647通过 Thrift 方式读取数据时,消息的最大尺寸。
doris.fe.auto.fetchfalse是否自动获取 FE 信息。当设置为 true 时,会根据 doris.fenodes 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 doris.read.arrow-flight-sql.portdoris.query.port
doris.read.bitmap-to-stringfalse是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 BITMAP_TO_STRING
doris.read.bitmap-to-base64false是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 BITMAP_TO_BASE64
doris.query.port-Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。

SQL 和 DataFrame 专有配置

KeyDefault ValueComment
doris.filter.query.in.max.count10000谓词下推中,in 表达式 value 列表元素最大数量。超过此数量后,in 表达式条件过滤在 Spark 侧处理。

Structured Streaming 专有配置

KeyDefault ValueComment
doris.sink.streaming.passthroughfalse将第一列的值不经过处理直接写入。

RDD 专有配置

KeyDefault ValueComment
doris.request.auth.user--访问 Doris 的用户名。
doris.request.auth.password--访问 Doris 的密码。
doris.filter.query--过滤读取数据的表达式,此表达式会透传给 Doris。Doris 使用该表达式完成源端数据过滤。

类型映射

Doris 到 Spark 列类型映射关系

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
JSONDataTypes.StringType
VARIANTDataTypes.StringType
TIMEDataTypes.DoubleType
HLLDataTypes.StringType
BitmapDataTypes.StringType

Spark 到 Doris 的数据类型映射

Spark TypeDoris Type
BooleanTypeBOOLEAN
ShortTypeSMALLINT
IntegerTypeINT
LongTypeBIGINT
FloatTypeFLOAT
DoubleTypeDOUBLE
DecimalTypeDECIMAL
StringTypeVARCHAR/STRING
DateTypeDATE
TimestampTypeDATETIME
ArrayTypeARRAY
MapTypeMAP/JSON
StructTypeSTRUCT/JSON
提示

从 24.0.0 版本开始,Bitmap 类型读取返回类型为字符串,默认返回字符串值 Read unsupported

常见问题与故障处理

如何写入 Bitmap 类型?

在 Spark SQL 中,通过 INSERT INTO 方式写入数据时,如果 Doris 的目标表中包含 BITMAPHLL 类型的数据,需要设置参数 doris.ignore-type 为对应类型,并通过 doris.write.fields 对列进行映射转换。

BITMAP

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD",
"doris.ignore-type"="bitmap",
"doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
);

HLL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD",
"doris.ignore-type"="hll",
"doris.write.fields"="col1,hll_col1=hll_hash(col1)"
);
提示

从 24.0.0 版本开始,doris.ignore-type 被废弃,写入时无需添加该参数。

如何使用 Overwrite 写入?

从 1.3.0 版本开始,Connector 支持 Overwrite 模式写入。Overwrite 只支持全表级别的数据覆盖,使用方式如下。

DataFrame

resultDf.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
// your own options
.mode(SaveMode.Overwrite)
.save()

SQL

INSERT OVERWRITE your_target_table SELECT * FROM your_source_table;

如何读取 Bitmap 类型?

从 24.0.0 版本开始,Connector 支持通过 Arrow Flight SQL 方式读取转换后的 Bitmap 数据。该能力需要 Doris 版本 >= 2.1.0。

Bitmap to String

以 DataFrame 方式为例,设置 doris.read.bitmap-to-stringtrue,具体结果格式见选项定义。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-string", "true")
.load()

Bitmap to Base64

以 DataFrame 方式为例,设置 doris.read.bitmap-to-base64true,具体结果格式见选项定义。

spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-base64", "true")
.load()

DataFrame 写入时报 ErrorIfExists 怎么处理?

如果 DataFrame 写入时报错 org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.,需要将 save mode 设置为 Append

resultDf.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_HTTP_PORT")
// your own options
.mode(SaveMode.Append)
.save()