跳到主要内容

SeaTunnel

SeaTunnel 是一个非常简单易用的超高性能分布式数据集成平台,支持海量数据的实时同步,可以每天稳定高效地同步数百亿条数据。

本文面向需要将 SeaTunnel 数据同步到 Doris 的用户,按使用场景介绍 Doris Sink 的选择方式、参数配置和使用示例。

如何选择连接器

使用场景推荐方式适用版本或引擎说明
需要 exactly-once 写入或 CDC 数据同步Connector-V2 Doris SinkSeaTunnel 2.3.1 及以上支持 Doris Sink、exactly-once 精准一次写入和 CDC 数据同步。
通过 Flink 引擎同步数据到 DorisConnector-V1 Flink Doris SinkSeaTunnel 2.1.0,Flink 引擎适用于已有 Flink 引擎链路的 SeaTunnel 任务。
通过 Spark 引擎同步数据到 DorisConnector-V1 Spark Sink DorisSeaTunnel 2.1.0,Spark 引擎适用于已有 Spark 引擎链路,例如从 Hive 迁移数据到 Doris。

场景一:使用 Connector-V2 写入 Doris

SeaTunnel 2.3.1 版本的 Apache SeaTunnel Connector-V2 支持 Doris Sink,并支持 exactly-once 精准一次写入和 CDC 数据同步。

插件代码请参考 SeaTunnel Doris Sink 插件代码

参数配置

参数类型是否必填默认值说明
fenodesstring-Doris 集群 FE 节点地址,格式为 fe_ip:fe_http_port,...
usernamestring-Doris 用户名。
passwordstring-Doris 用户密码。
table.identifierstring-Doris 表名称,格式为 DBName.TableName
sink.label-prefixstring-Stream Load 导入使用的标签前缀。在 2PC 场景下,该前缀需要全局唯一,以保证 SeaTunnel 的 EOS 语义。
sink.enable-2pcbooltrue是否启用两阶段提交(2PC)。默认值为 true,用于保证 exactly-once 语义。两阶段提交请参考 Stream Load 手册
sink.enable-deleteboolfalse是否启用删除。该选项要求 Doris 表开启批量删除功能,Doris 0.15+ 版本默认开启该功能,并且只支持 Unique 表模型。更多信息请参考 批量删除
doris.configmap-Stream Load data_desc 参数。更多参数请参考 Stream Load 手册

使用 JSON 格式导入数据

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format = "json"
read_json_by_line = "true"
}
}
}

使用 CSV 格式导入数据

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
line_delimiter = "\n"
}
}
}

场景二:使用 Connector-V1 写入 Doris

Apache SeaTunnel 2.1.0 支持 Doris 连接器,可以通过 Flink 引擎和 Spark 引擎将数据同步到 Doris。

插件代码请参考 SeaTunnel Flink Sink Doris 插件代码

参数配置

参数类型是否必填默认值支持引擎说明
fenodesstring-FlinkDoris FE HTTP 访问地址,例如 127.0.0.1:8030
databasestring-Flink写入 Doris 的库名。
tablestring-Flink写入 Doris 的表名。
userstring-FlinkDoris 访问用户。
passwordstring-FlinkDoris 访问用户密码。
batch_sizeint100Flink单次写入 Doris 的最大行数。
intervalint1000Flinkflush 间隔时间,单位为毫秒。超过该时间后,异步线程会将缓存中的数据写入 Doris。设置为 0 表示关闭定期写入。
max_retriesint1Flink写入 Doris 失败后的重试次数。
doris.*string-FlinkStream Load 导入参数,例如 doris.column_separator = ','。更多参数请参考 Stream Load 手册

示例:Socket 数据写入 Doris

env {
execution.parallelism = 1
}

source {
SocketStream {
host = 127.0.0.1
port = 9999
result_table_name = "socket"
field_name = "info"
}
}

transform {
}

sink {
DorisSink {
fenodes = "127.0.0.1:8030"
user = root
password = 123456
database = test
table = test_tbl
batch_size = 5
max_retries = 1
interval = 5000
}
}

启动任务:

sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

使用 Spark 引擎写入 Doris

插件代码请参考 SeaTunnel Spark Sink Doris 插件代码

参数配置

参数类型是否必填默认值支持引擎说明
fenodesstring-SparkDoris FE 节点地址,例如 127.0.0.1:8030
databasestring-Spark写入 Doris 的库名。
tablestring-Spark写入 Doris 的表名。
userstring-SparkDoris 访问用户。
passwordstring-SparkDoris 访问用户密码。
batch_sizeint100SparkSpark 通过 Stream Load 写入 Doris 时,每个批次提交的行数。
doris.*string-SparkStream Load HTTP 参数。在 Stream Load 参数前增加 doris. 前缀即可使用,例如 doris.column_separator。更多参数请参考 Stream Load 手册

示例:Hive 迁移数据至 Doris

env {
spark.app.name = "hive2doris-template"
}

spark {
spark.sql.catalogImplementation = "hive"
}

source {
hive {
preSql = "select * from tmp.test"
result_table_name = "test"
}
}

transform {
}

sink {
Console {
}

Doris {
fenodes = "xxxx:8030"
database = "tmp"
table = "test"
user = "root"
password = "root"
batch_size = 1000
doris.column_separator = "\t"
doris.columns = "date_key,date_value,day_in_year,day_in_month"
}
}

启动任务:

sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf