Flink
The Flink Doris Connector lets you import data produced by Flink (such as data Flink reads from Kafka or MySQL) into Doris in real time. It is suitable for real-time data ingestion and streaming ETL scenarios.
Applicable scenarios
| Scenario | Description |
|---|---|
| Real-time data ingestion | Write data from message queues such as Kafka or Pulsar into Doris in real time |
| Database synchronization | Synchronize data from databases such as MySQL or Oracle to Doris through Flink CDC |
| Streaming ETL | Perform real-time computation with Flink and write the results to Doris |
Limitations
- Requires a Flink cluster that you have already deployed.
- Requires the corresponding version of the Flink Doris Connector to be deployed in Flink.
Procedure
For complete instructions on importing data with Flink, see Flink-Doris-Connector. The following minimal example shows how to quickly complete an import through Flink.
The overall workflow consists of the following three steps:
- Create the target table in Doris
- Write data through FlinkSQL in Flink
- Verify in Doris that the data has been imported successfully
Step 1: Create a table in Doris
Create the target table students in Doris to receive data from Flink:
CREATE TABLE `students` (
`id` INT NULL,
`name` VARCHAR(256) NULL,
`age` INT NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Step 2: Import data with Flink
Run bin/sql-client.sh to open the FlinkSQL console, and execute the following statements to create a sink table and write data:
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.students',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO student_sink values(1,'zhangsan',123)
The key parameters are described below:
| Parameter | Description |
|---|---|
connector | Fixed as doris, indicating that the Flink Doris Connector is used |
fenodes | The HTTP address of the Doris FE, in the format host:http_port |
table.identifier | The identifier of the target table, in the format database.table |
username | The Doris login username |
password | The Doris login password |
sink.label-prefix | The Label prefix for the Stream Load import task. It must be globally unique |
Step 3: Check the imported data
Query the target table in Doris to confirm that the data has been imported successfully:
select * from test.students;
+------+----------+------+
| id | name | age |
+------+----------+------+
| 1 | zhangsan | 123 |
+------+----------+------+
FAQ
Q1: Which data sources does the Flink Doris Connector support?
In theory, all data sources supported by Flink (Kafka, MySQL CDC, file systems, message queues, etc.) can be used as upstream sources, processed by Flink, and then written to Doris.
Q2: Why does sink.label-prefix need to be unique?
The Flink Doris Connector implements imports based on Doris Stream Load. Each transaction requires a unique Label to guarantee Exactly-Once semantics. Duplicate Labels cause import conflicts.
Q3: Which port should fenodes use?
fenodes takes the HTTP port of the Doris FE (default 8030), not the MySQL protocol port (default 9030).
Q4: How can I synchronize data from databases such as MySQL or Oracle to Doris?
You can use it together with Flink CDC. For details, see the Flink-Doris-Connector documentation.