Skip to main content

PostgreSQL Database-level Sync

Overview

Database-level Sync is implemented via the native FROM POSTGRES (...) TO DATABASE (...) DDL, using a database as the sync unit with a Doris database as the target container. You can sync one, several, or all tables via include_tables; on first sync Doris automatically creates downstream primary-key tables and keeps primary keys consistent with the upstream. Suitable for mirror replication scenarios where downstream schema should track upstream automatically and no SQL processing is needed.

By integrating Flink CDC, Doris reads change logs from PostgreSQL and continuously writes full + incremental data of a group of tables into Doris via Stream Load. If you need column mapping, filtering, or data transformation during sync, see PostgreSQL Table-level Sync.

Notes:

  1. Currently only at-least-once semantics are guaranteed.
  2. Only primary key tables are supported for synchronization.
  3. LOAD privilege is required. If the downstream table does not exist, CREATE privilege is also required.
  4. During automatic table creation, if the target table already exists, it will be skipped, and users can customize tables according to different scenarios.

Quick Start

Creating an Import Job

Use CREATE STREAMING JOB to create a continuous import job:

CREATE JOB test_postgres_job
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.1.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"include_tables" = "test_tbls",
"offset" = "latest"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1" -- Set to 1 for single BE deployment
)

Check Import Status

select * from jobs("type"="insert") where ExecuteType = "STREAMING";

For more common operations (pause, resume, delete, check Task, etc.), see Continuous Load Overview.

Source Parameters

ParameterDefaultDescription
jdbc_url-PostgreSQL JDBC connection string
driver_url-JDBC driver jar path. Supports file name, local absolute path, and HTTP URL. See JDBC Catalog Overview for details.
driver_class-JDBC driver class name
user-Database username
password-Database password
database-Database name
schema-Schema name
include_tables-Tables to synchronize, comma separated. If not specified, all tables will be synchronized by default.
offsetinitialinitial: full + incremental sync, latest: incremental only
snapshot_split_size8096Split size (in rows). During full sync, the table is divided into multiple splits
snapshot_parallelism1Parallelism during full sync phase, i.e., max splits per task

Reference

Import Command

Syntax for creating a database-level sync job:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
FROM POSTGRES (
[source_properties]
)
TO DATABASE <target_db> (
[target_properties]
)
ModuleDescription
job_nameJob name
job_propertiesGeneral import parameters
commentJob comment
source_propertiesPostgreSQL source parameters
target_propertiesDoris target DB parameters

Doris Target DB Parameters

ParameterDefaultDescription
table.create.properties.*-Table properties when creating, e.g. replication_num
load.strict_mode-Whether to enable strict mode. Disabled by default.
load.max_filter_ratio-The maximum allowed filtering ratio within a sampling window. Must be between 0 and 1 (inclusive). The default value is 0, indicating zero tolerance. The sampling window equals max_interval * 10. If, within this window, the ratio of erroneous rows to total rows exceeds max_filter_ratio, the scheduled job will be paused and requires manual intervention to address data quality issues.