Skip to main content

Postgres/MySQL Continuous Load

Overview

Supports using Job to continuously synchronize full and incremental data from multiple tables in MySQL, Postgres, etc. to Doris via Streaming Job. Suitable for scenarios requiring real-time multi-table data synchronization to Doris.

Supported Data Sources

  • MySQL
  • Postgres

Basic Principles

By integrating Flink CDC, Doris supports reading change logs from MySQL, Postgres, etc., enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream.

Notes:

  1. Currently only at-least-once semantics are guaranteed.
  2. Only primary key tables are supported for synchronization.
  3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required.

Quick Start

Prerequisites

MySQL

Enable Binlog on MySQL by adding the following to my.cnf:

log-bin=mysql-bin
binlog_format=ROW
server-id=1

Postgres

Enable logical replication on Postgres by adding the following to postgresql.conf:

wal_level=logical

Creating an Import Job

MySQL

CREATE JOB multi_table_sync
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
"driver_url" = "mysql-connector-j-8.0.31.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "test",
"include_tables" = "user_info,order_info",
"offset" = "initial"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1"
)

Postgres

CREATE JOB test_postgres_job
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"include_tables" = "test_tbls",
"offset" = "latest"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1"
)

Check Import Status

select * from jobs(type=insert) where ExecuteType = "STREAMING"
Id: 1765332859199
Name: mysql_db_sync
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1')
CreateTime: 2025-12-10 10:19:35
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: \N
CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"}
EndOffset: \N
LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

Pause Import Job

PAUSE JOB WHERE jobname = <job_name> ;

Resume Import Job

RESUME JOB where jobName = <job_name> ;

Modify Import Job

ALTER JOB <job_name>
FROM MYSQL (
"user" = "root",
"password" = "123456"
)
TO DATABASE target_test_db

Delete Import Job

DROP JOB where jobName = <job_name> ;

Reference Manual

Import Command

Syntax for creating a multi-table synchronization job:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
FROM <MYSQL|POSTGRES> (
[source_properties]
)
TO DATABASE <target_db> (
[target_properties]
)
ModuleDescription
job_nameJob name
job_propertiesGeneral import parameters
commentJob comment
source_propertiesSource (MySQL/PG) parameters
target_propertiesDoris target DB parameters

Import Parameters

FE Configuration Parameters

ParameterDefaultDescription
max_streaming_job_num1024Maximum number of Streaming jobs
job_streaming_task_exec_thread_num10Number of threads for StreamingTask
max_streaming_task_show_count100Max number of StreamingTask records in memory

General Job Import Parameters

ParameterDefaultDescription
max_interval10sIdle scheduling interval when no new data

Source Configuration Parameters

ParameterDefaultDescription
jdbc_url-JDBC connection string (MySQL/PG)
driver_url-JDBC driver jar path
driver_class-JDBC driver class name
user-Database username
password-Database password
database-Database name
schema-Schema name
include_tables-Tables to synchronize, comma separated
offsetinitialinitial: full + incremental, latest: incremental only
snapshot_split_size8096The size (in number of rows) of each split. During full synchronization, a table will be divided into multiple splits for synchronization.
snapshot_parallelism1The parallelism level during the full synchronization phase, i.e., the maximum number of splits a single task can schedule at once.

Doris Target DB Parameters

ParameterDefaultDescription
table.create.properties.*-Table properties when creating, e.g. replication_num
load.strict_mode-Whether to enable strict mode. Disabled by default.
load.max_filter_ratio-The maximum allowed filtering ratio within a sampling window. Must be between 0 and 1 (inclusive). The default value is 0, indicating zero tolerance. The sampling window equals max_interval * 10. If, within this window, the ratio of erroneous rows to total rows exceeds max_filter_ratio, the scheduled job will be paused and requires manual intervention to address data quality issues.

Import Status

Job

After submitting a job, you can run the following SQL to check the job status:

select * from jobs(type=insert) where ExecuteType = "STREAMING"
*************************** 1. row ***************************
Id: 1765332859199
Name: mysql_db_sync
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1')
CreateTime: 2025-12-10 10:19:35
SucceedTaskCount: 2
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: \N
CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"}
EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}
LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0}
ErrorMsg: \N
Result ColumnDescription
IDJob ID
NAMEJob name
DefinerJob definer
ExecuteTypeJob type: ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategyRecurring strategy, empty for Streaming
StatusJob status
ExecuteSqlJob's Insert SQL statement
CreateTimeJob creation time
SucceedTaskCountNumber of successful tasks
FailedTaskCountNumber of failed tasks
CanceledTaskCountNumber of canceled tasks
CommentJob comment
PropertiesJob properties
CurrentOffsetCurrent offset, only for Streaming jobs
EndOffsetMax end offset from source, only for Streaming
LoadStatisticJob statistics
ErrorMsgJob error message
JobRuntimeMsgJob runtime info

Task

You can run the following SQL to check the status of each Task:

select * from tasks(type='insert') where jobId='1765336137066'
*************************** 1. row ***************************
TaskId: 1765336137066
JobId: 1765332859199
JobName: mysql_db_sync
Label: 1765332859199_1765336137066
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-12-10 11:09:06
StartTime: 2025-12-10 11:09:16
FinishTime: 2025-12-10 11:09:18
TrackingUrl: \N
LoadStatistic: {"scannedRows":1,"loadBytes":333}
User: root
FirstErrorMsg:
RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"}
Result ColumnDescription
TaskIdTask ID
JobIDJob ID
JobNameJob name
LabelTask label
StatusTask status
ErrorMsgTask error message
CreateTimeTask creation time
StartTimeTask start time
FinishTimeTask finish time
LoadStatisticTask statistics
UserTask executor
RunningOffsetCurrent offset, only for Streaming jobs