Skip to main content

Seatunnel Doris Sink

About SeaTunnel​

SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data. It can synchronize tens of billions of data stably and efficiently every day.

Connector-V2​

The connector-v2 for SeaTunnel supports Doris Sink since version 2.3.1 and supports exactly-once write and CDC data synchronization

Plugin Code​

SeaTunnel Doris Sink Plugin Code

Options​

nametyperequireddefault value
fenodesstringyes-
usernamestringyes-
passwordstringyes-
table.identifierstringyes-
sink.label-prefixstringyes-
sink.enable-2pcboolnotrue
sink.enable-deleteboolnofalse
doris.configmapyes-

fenodes [string]

Doris cluster FE Nodes address, the format is "fe_ip:fe_http_port, ..."

username [string]

Doris user username

password [string]

Doris`user password

table.identifier [string]

The name of Doris table,The format is DBName.TableName

sink.label-prefix [string]

The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.

sink.enable-2pc [bool]

Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.

sink.enable-delete [bool]

Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:

batch delete

doris.config [map]

The parameter of the stream load data_desc, you can get more detail at this link:

More Stream Load parameters

Example​

Use JSON format to import data

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}

Use CSV format to import data

sink {
Doris {
fenodes = "doris_fe:8030"
username = root
password = ""
table.identifier = "test.table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
}
}
}

Connector-V1​

Plugin Code​

Seatunnel Flink Sink Doris plugin code

Options​

nametyperequireddefault valueengine
fenodesstringyes-Flink
databasestringyes-Flink
tablestringyes-Flink
userstringyes-Flink
passwordstringyes-Flink
batch_sizeintno100Flink
intervalintno1000Flink
max_retriesintno1Flink
doris.*-no-Flink

fenodes [string]

Doris Fe http url, eg: 127.0.0.1:8030

database [string]

Doris database

table [string]

Doris table

user [string]

Doris user

password [string]

Doris password

batch_size [int]

The maximum number of lines to write to Doris at a time, the default value is 100

interval [int]

The flush interval (in milliseconds), after which the asynchronous thread writes the data in the cache to Doris. Set to 0 to turn off periodic writes.

max_retries [int]

Number of retries after writing to Doris fails

doris.* [string]

Import parameters for Stream load. For example: 'doris.column_separator' = ', ' etc.

More Stream Load parameter configuration

Examples​

Socket To Doris

env {
execution.parallelism = 1
}
source {
SocketStream {
host = 127.0.0.1
port = 9999
result_table_name = "socket"
field_name = "info"
}
}
transform {
}
sink {
DorisSink {
fenodes = "127.0.0.1:8030"
user = root
password = 123456
database = test
table = test_tbl
batch_size = 5
max_retries = 1
interval = 5000
}
}

Start command​

sh bin/start-seatunnel-flink.sh --config config/flink.streaming.conf

Spark Sink Doris​

Plugin Code​

Seatunnel Spark Sink Doris plugin code

Options​

nametyperequireddefault valueengine
fenodesstringyes-Spark
databasestringyes-Spark
tablestringyes-Spark
userstringyes-Spark
passwordstringyes-Spark
batch_sizeintyes100Spark
doris.*stringno-Spark

fenodes [string]

Doris FE address:8030

database [string]

Doris target database name

table [string]

Doris target table name

user [string]

Doris user name

password [string]

Doris user's password

batch_size [string]

Doris number of submissions per batch

doris. [string] Doris stream_load properties,you can use 'doris.' prefix + stream_load properties

More Doris stream_load Configurations

Examples​

Hive to Doris

Config properties

env{
spark.app.name = "hive2doris-template"
}

spark {
spark.sql.catalogImplementation = "hive"
}

source {
hive {
preSql = "select * from tmp.test"
result_table_name = "test"
}
}

transform {
}


sink {

Console {

}

Doris {
fenodes="xxxx:8030"
database="gl_mint_dim"
table="dim_date"
user="root"
password="root"
batch_size=1000
doris.column_separator="\t"
doris.columns="date_key,date_value,day_in_year,day_in_month"
}
}

Start command​

sh bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/spark.conf