# Flink Doris Connector

Flink Doris Connector 可以支持通过 Flink 读写 Doris 中存储的数据。

  • 可以将Doris表映射为DataStream或者Table

# 版本兼容

Connector Flink Doris Java Scala
1.0.0 1.11.x , 1.12.x 0.13+ 8 2.12
1.0.0 1.13.x 0.13.+ 8 2.12

针对Flink 1.13.x版本适配问题

    <properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编译即可。

# 编译与安装

extension/flink-doris-connector/ 源码目录下执行:

注意:

  1. 这里如果你没有整体编译过 doris 源码,需要首先编译一次 Doris 源码,不然会出现 thrift 命令找不到的情况,需要到 incubator-doris 目录下执行 sh build.sh
  2. 建议在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。
sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。:

备注:

  1. doris FE 要在配置中配置启用http v2
  2. Scala版本目前只支持2.12.x版本

conf/fe.conf

enable_http_server_v2 = true

# 使用示例

此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。 有两种使用sql和java的方法。 以下是示例说明

# SQL

此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

# DataStreamSource

 Properties properties = new Properties();
 properties.put("fenodes","FE_IP:8030");
 properties.put("username","root");
 properties.put("password","");
 properties.put("table.identifier","db.table");
 env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print();

# DataStreamSink

// -------- sink with raw json string stream --------
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
env.fromElements( "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}")
     .addSink(
     	DorisSink.sink(
            DorisReadOptions.builder().build(),
         	DorisExecutionOptions.builder()
                    .setBatchSize(3)
                    .setBatchIntervalMs(0l)
                    .setMaxRetries(3)
                    .setStreamLoadProp(pro).build(),
         	DorisOptions.builder()
                    .setFenodes("FE_IP:8030")
                    .setTableIdentifier("db.table")
                    .setUsername("root")
                    .setPassword("").build()
     	));

OR
env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}")
    .addSink(
    	DorisSink.sink(
        	DorisOptions.builder()
                    .setFenodes("FE_IP:8030")
                    .setTableIdentifier("db.table")
                    .setUsername("root")
                    .setPassword("").build()
    	));


// -------- sink with RowData stream --------
DataStream<RowData> source = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
        @Override
        public RowData map(String value) throws Exception {
            GenericRowData genericRowData = new GenericRowData(3);
            genericRowData.setField(0, StringData.fromString("北京"));
            genericRowData.setField(1, 116.405419);
            genericRowData.setField(2, 39.916927);
            return genericRowData;
        }
    });

String[] fields = {"city", "longitude", "latitude"};
LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};

source.addSink(
    DorisSink.sink(
        fields,
        types,
        DorisReadOptions.builder().build(),
        DorisExecutionOptions.builder()
            .setBatchSize(3)
            .setBatchIntervalMs(0L)
            .setMaxRetries(3)
            .build(),
        DorisOptions.builder()
            .setFenodes("FE_IP:8030")
            .setTableIdentifier("db.table")
            .setUsername("root")
            .setPassword("").build()
    ));

# DataSetSink

MapOperator<String, RowData> data = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
        @Override
        public RowData map(String value) throws Exception {
            GenericRowData genericRowData = new GenericRowData(3);
            genericRowData.setField(0, StringData.fromString("北京"));
            genericRowData.setField(1, 116.405419);
            genericRowData.setField(2, 39.916927);
            return genericRowData;
        }
    });

DorisOptions dorisOptions = DorisOptions.builder()
    .setFenodes("FE_IP:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("").build();
DorisReadOptions readOptions = DorisReadOptions.defaults();
DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();

LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
String[] fiels = {"city", "longitude", "latitude"};

DorisDynamicOutputFormat outputFormat =
    new DorisDynamicOutputFormat(dorisOptions, readOptions, executionOptions, types, fiels);

outputFormat.open(0, 1);
data.output(outputFormat);
outputFormat.close();

# 配置

# 通用配置项

Key Default Value Comment
fenodes -- Doris FE http 地址
table.identifier -- Doris 表名,如:db1.tbl1
username -- 访问Doris的用户名
password -- 访问Doris的密码
doris.request.retries 3 向Doris发送请求的重试次数
doris.request.connect.timeout.ms 30000 向Doris发送请求的连接超时时间
doris.request.read.timeout.ms 30000 向Doris发送请求的读取超时时间
doris.request.query.timeout.s 3600 查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.size Integer.MAX_VALUE 一个Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Flink侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size 1024 一次从BE读取数据的最大行数。增大此数值可减少flink与Doris之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit 2147483648 单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.async false 是否支持异步转换Arrow格式到flink-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size 64 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.read.field -- 读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query -- 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。
sink.batch.size 100 单次写BE的最大行数
sink.max-retries 1 写BE失败之后的重试次数
sink.batch.interval 1s flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
sink.properties.* -- Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。
支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true'
Doris Type Flink Type
NULL_TYPE NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE STRING
DATETIME STRING
DECIMAL DECIMAL
CHAR STRING
LARGEINT STRING
VARCHAR STRING
DECIMALV2 DECIMAL
TIME DOUBLE
HLL Unsupported datatype