Skip to main content

SeaTunnel

SeaTunnel is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data and can stably and efficiently synchronize tens of billions of records every day.

This document is intended for users who need to synchronize data from SeaTunnel into Doris. It introduces, by use case, how to choose a Doris Sink, how to configure its parameters, and how to use it.

How to Choose a Connector

Use caseRecommended approachApplicable version or engineDescription
Need exactly-once writes or CDC data synchronizationConnector-V2 Doris SinkSeaTunnel 2.3.1 and aboveSupports Doris Sink, exactly-once writes, and CDC data synchronization.
Synchronize data into Doris through the Flink engineConnector-V1 Flink Doris SinkSeaTunnel 2.1.0, Flink engineSuitable for SeaTunnel jobs that already run on a Flink engine.
Synchronize data into Doris through the Spark engineConnector-V1 Spark Sink DorisSeaTunnel 2.1.0, Spark engineSuitable for jobs that already run on a Spark engine, for example migrating data from Hive into Doris.

Scenario 1: Use Connector-V2 to Write to Doris

The SeaTunnel 2.3.1 Apache SeaTunnel Connector-V2 supports Doris Sink, exactly-once writes, and CDC data synchronization.

For the plugin source code, see the SeaTunnel Doris Sink plugin code.

Parameter Configuration

ParameterTypeRequiredDefaultDescription
fenodesstringYes-Doris cluster FE node addresses, in the format fe_ip:fe_http_port,....
usernamestringYes-Doris username.
passwordstringYes-Password for the Doris user.
table.identifierstringYes-Doris table name, in the format DBName.TableName.
sink.label-prefixstringYes-Label prefix used by Stream Load. In a 2PC scenario, this prefix must be globally unique to guarantee SeaTunnel's EOS semantics.
sink.enable-2pcboolNotrueWhether to enable two-phase commit (2PC). The default is true, which guarantees exactly-once semantics. For two-phase commit, see the Stream Load manual.
sink.enable-deleteboolNofalseWhether to enable deletes. This option requires the Doris table to have batch delete enabled. Doris 0.15+ enables this feature by default, and only the Unique table model is supported. For more information, see Batch Delete.
doris.configmapYes-Stream Load data_desc parameters. For more parameters, see the Stream Load manual.

Import Data in JSON Format

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format = "json"
read_json_by_line = "true"
}
}
}

Import Data in CSV Format

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
line_delimiter = "\n"
}
}
}

Scenario 2: Use Connector-V1 to Write to Doris

Apache SeaTunnel 2.1.0 supports the Doris connector, which can synchronize data into Doris through either the Flink engine or the Spark engine.

For the plugin source code, see the SeaTunnel Flink Sink Doris plugin code.

Parameter Configuration

ParameterTypeRequiredDefaultSupported engineDescription
fenodesstringYes-FlinkDoris FE HTTP address, for example 127.0.0.1:8030.
databasestringYes-FlinkName of the database to write to in Doris.
tablestringYes-FlinkName of the table to write to in Doris.
userstringYes-FlinkDoris access user.
passwordstringYes-FlinkPassword for the Doris access user.
batch_sizeintNo100FlinkMaximum number of rows written to Doris in a single batch.
intervalintNo1000FlinkFlush interval, in milliseconds. After this interval elapses, an asynchronous thread writes the buffered data into Doris. Set to 0 to disable periodic writes.
max_retriesintNo1FlinkNumber of retries after a write to Doris fails.
doris.*stringNo-FlinkStream Load import parameters, for example doris.column_separator = ','. For more parameters, see the Stream Load manual.

Example: Write Socket Data to Doris

env {
execution.parallelism = 1
}

source {
SocketStream {
host = 127.0.0.1
port = 9999
result_table_name = "socket"
field_name = "info"
}
}

transform {
}

sink {
DorisSink {
fenodes = "127.0.0.1:8030"
user = root
password = 123456
database = test
table = test_tbl
batch_size = 5
max_retries = 1
interval = 5000
}
}

Start the job:

sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Use the Spark Engine to Write to Doris

For the plugin source code, see the SeaTunnel Spark Sink Doris plugin code.

Parameter Configuration

ParameterTypeRequiredDefaultSupported engineDescription
fenodesstringYes-SparkDoris FE node address, for example 127.0.0.1:8030.
databasestringYes-SparkName of the database to write to in Doris.
tablestringYes-SparkName of the table to write to in Doris.
userstringYes-SparkDoris access user.
passwordstringYes-SparkPassword for the Doris access user.
batch_sizeintYes100SparkNumber of rows submitted per batch when Spark writes to Doris through Stream Load.
doris.*stringNo-SparkStream Load HTTP parameters. Add the doris. prefix to any Stream Load parameter to use it, for example doris.column_separator. For more parameters, see the Stream Load manual.

Example: Migrate Data from Hive to Doris

env {
spark.app.name = "hive2doris-template"
}

spark {
spark.sql.catalogImplementation = "hive"
}

source {
hive {
preSql = "select * from tmp.test"
result_table_name = "test"
}
}

transform {
}

sink {
Console {
}

Doris {
fenodes = "xxxx:8030"
database = "tmp"
table = "test"
user = "root"
password = "root"
batch_size = 1000
doris.column_separator = "\t"
doris.columns = "date_key,date_value,day_in_year,day_in_month"
}
}

Start the job:

sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf