# Flink Doris Connector

Flink Doris Connector can support read and write data stored in Doris through Flink.

  • You can map the Doris table toDataStream or Table.

# Version Compatibility

Connector Flink Doris Java Scala
1.0.0 1.11.2 0.13+ 8 2.12
1.0.0 1.13.x 0.13.+ 8 2.12

For Flink 1.13.x version adaptation issues

     <properties>
         <scala.version>2.12</scala.version>
         <flink.version>1.11.2</flink.version>
         <libthrift.version>0.9.3</libthrift.version>
         <arrow.version>0.15.1</arrow.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <doris.home>${basedir}/../../</doris.home>
         <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
     </properties>

Just change the flink.version here to be the same as your Flink cluster version, and edit again

# Build and Install

Execute following command in dir extension/flink-doris-connector/:

Notice:

  1. If you have not compiled the doris source code as a whole, you need to compile the Doris source code first, otherwise the thrift command will not be found, and you need to execute sh build.sh in the incubator-doris directory.
  2. It is recommended to compile under the docker compile environment apache/incubator-doris:build-env-1.2 of doris, because the JDK version below 1.3 is 11, there will be compilation problems.
sh build.sh

After successful compilation, the file doris-flink-1.0.0-SNAPSHOT.jar will be generated in the output/ directory. Copy this file to ClassPath in Flink to use Flink-Doris-Connector. For example, Flink running in Local mode, put this file in the jars/ folder. Flink running in Yarn cluster mode, put this file in the pre-deployment package.

Remarks:

  1. Doris FE should be configured to enable http v2 in the configuration
  2. Scala version currently only supports 2.12.x version

conf/fe.conf

enable_http_server_v2 = true

# How to use

The purpose of this step is to register the Doris data source on Flink. This step is operated on Flink. There are two ways to use sql and java. The following are examples to illustrate

# SQL

The purpose of this step is to register the Doris data source on Flink. This step is operated on Flink

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

# DataStreamSource

 Properties properties = new Properties();
 properties.put("fenodes","FE_IP:8030");
 properties.put("username","root");
 properties.put("password","");
 properties.put("table.identifier","db.table");
 env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print();

# DataStreamSink

// -------- sink with raw json string stream --------
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("strip_outer_array", "true");
env.fromElements( "{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}")
     .addSink(
     	DorisSink.sink(
            DorisReadOptions.builder().build(),
         	DorisExecutionOptions.builder()
                    .setBatchSize(3)
                    .setBatchIntervalMs(0l)
                    .setMaxRetries(3)
                    .setStreamLoadProp(pro).build(),
         	DorisOptions.builder()
                    .setFenodes("FE_IP:8030")
                    .setTableIdentifier("db.table")
                    .setUsername("root")
                    .setPassword("").build()
     	));

OR
env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}")
    .addSink(
    	DorisSink.sink(
        	DorisOptions.builder()
                    .setFenodes("FE_IP:8030")
                    .setTableIdentifier("db.table")
                    .setUsername("root")
                    .setPassword("").build()
    	));


// -------- sink with RowData stream --------
DataStream<RowData> source = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
        @Override
        public RowData map(String value) throws Exception {
            GenericRowData genericRowData = new GenericRowData(3);
            genericRowData.setField(0, StringData.fromString("北京"));
            genericRowData.setField(1, 116.405419);
            genericRowData.setField(2, 39.916927);
            return genericRowData;
        }
    });

String[] fields = {"city", "longitude", "latitude"};
LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};

source.addSink(
    DorisSink.sink(
        fields,
        types,
        DorisReadOptions.builder().build(),
        DorisExecutionOptions.builder()
            .setBatchSize(3)
            .setBatchIntervalMs(0L)
            .setMaxRetries(3)
            .build(),
        DorisOptions.builder()
            .setFenodes("FE_IP:8030")
            .setTableIdentifier("db.table")
            .setUsername("root")
            .setPassword("").build()
    ));

# DataSetSink

MapOperator<String, RowData> data = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
        @Override
        public RowData map(String value) throws Exception {
            GenericRowData genericRowData = new GenericRowData(3);
            genericRowData.setField(0, StringData.fromString("北京"));
            genericRowData.setField(1, 116.405419);
            genericRowData.setField(2, 39.916927);
            return genericRowData;
        }
    });

DorisOptions dorisOptions = DorisOptions.builder()
    .setFenodes("FE_IP:8030")
    .setTableIdentifier("db.table")
    .setUsername("root")
    .setPassword("").build();
DorisReadOptions readOptions = DorisReadOptions.defaults();
DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();

LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
String[] fiels = {"city", "longitude", "latitude"};

DorisDynamicOutputFormat outputFormat =
    new DorisDynamicOutputFormat(dorisOptions, readOptions, executionOptions, types, fiels);

outputFormat.open(0, 1);
data.output(outputFormat);
outputFormat.close();

# General

Key Default Value Comment
fenodes -- Doris FE http address, support multiple addresses, separated by commas
table.identifier -- Doris table identifier, eg, db1.tbl1
username -- Doris username
password -- Doris password
doris.request.retries 3 Number of retries to send requests to Doris
doris.request.connect.timeout.ms 30000 Connection timeout for sending requests to Doris
doris.request.read.timeout.ms 30000 Read timeout for sending request to Doris
doris.request.query.timeout.s 3600 Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit
doris.request.tablet.size Integer.MAX_VALUE The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris.
doris.batch.size 1024 The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris. Thereby reducing the extra time overhead caused by network delay.
doris.exec.mem.limit 2147483648 Memory limit for a single query. The default is 2GB, in bytes.
doris.deserialize.arrow.async false Whether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration
doris.deserialize.queue.size 64 Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true
doris.read.field -- List of column names in the Doris table, separated by commas
doris.filter.query -- Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering.
sink.batch.size 100 Maximum number of lines in a single write BE
sink.max-retries 1 Number of retries after writing BE failed
sink.batch.interval 1s The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing.
sink.properties.* -- The stream load parameters.eg:sink.properties.column_separator' = ','. Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\x01' will translate to binary 0x01
Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true'
Doris Type Flink Type
NULL_TYPE NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE STRING
DATETIME STRING
DECIMAL DECIMAL
CHAR STRING
LARGEINT STRING
VARCHAR STRING
DECIMALV2 DECIMAL
TIME DOUBLE
HLL Unsupported datatype