Skip to main content

PostgreSQL Multi-table Import

Overview

Supports using Job to continuously synchronize full and incremental data from multiple tables in a PostgreSQL database to Doris via Stream Load. Suitable for scenarios requiring real-time multi-table data synchronization to Doris.

By integrating Flink CDC, Doris supports reading change logs from PostgreSQL databases, enabling full and incremental multi-table data synchronization. When synchronizing for the first time, Doris automatically creates downstream tables (primary key tables) and keeps the primary key consistent with the upstream.

Notes:

  1. Currently only at-least-once semantics are guaranteed.
  2. Only primary key tables are supported for synchronization.
  3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required.
  4. During automatic table creation, if the target table already exists, it will be skipped, and users can customize tables according to different scenarios.

Prerequisites

Enable logical replication on PostgreSQL by adding the following to postgresql.conf:

wal_level=logical

If you are using a cloud service, see the Setup Guide.

Quick Start

Creating an Import Job

Use CREATE STREAMING JOB to create a continuous import job:

CREATE JOB test_postgres_job
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"include_tables" = "test_tbls",
"offset" = "latest"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1" -- Set to 1 for single BE deployment
)

Check Import Status

select * from jobs("type"="insert") where ExecuteType = "STREAMING";

For more common operations (pause, resume, delete, check Task, etc.), see Continuous Load Overview.

Source Parameters

ParameterDefaultDescription
jdbc_url-PostgreSQL JDBC connection string
driver_url-JDBC driver jar path
driver_class-JDBC driver class name
user-Database username
password-Database password
database-Database name
schema-Schema name
include_tables-Tables to synchronize, comma separated. If not specified, all tables will be synchronized by default.
offsetinitialinitial: full + incremental sync, latest: incremental only
snapshot_split_size8096Split size (in rows). During full sync, the table is divided into multiple splits
snapshot_parallelism1Parallelism during full sync phase, i.e., max splits per task

Reference

Import Command

Syntax for creating a multi-table sync job:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
FROM POSTGRES (
[source_properties]
)
TO DATABASE <target_db> (
[target_properties]
)
ModuleDescription
job_nameJob name
job_propertiesGeneral import parameters
commentJob comment
source_propertiesPostgreSQL source parameters
target_propertiesDoris target DB parameters

Doris Target DB Parameters

ParameterDefaultDescription
table.create.properties.*-Table properties when creating, e.g. replication_num
load.strict_mode-Whether to enable strict mode. Disabled by default.
load.max_filter_ratio-The maximum allowed filtering ratio within a sampling window. Must be between 0 and 1 (inclusive). The default value is 0, indicating zero tolerance. The sampling window equals max_interval * 10. If, within this window, the ratio of erroneous rows to total rows exceeds max_filter_ratio, the scheduled job will be paused and requires manual intervention to address data quality issues.