Skip to main content

PostgreSQL Single-table Import

Overview

Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into a specified Doris table using Job + CDC Stream TVF. This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table.

By integrating Flink CDC reading capabilities, Doris supports reading change logs (WAL) from PostgreSQL databases, enabling full and incremental data synchronization for a single table.

Notes:

  1. Supports exactly-once semantics.
  2. Currently only primary key tables are supported for synchronization.
  3. LOAD privilege is required.
  4. Logical replication must be enabled on the PostgreSQL side. If you are using a cloud service, see the Setup Guide.

Prerequisites

Enable logical replication on PostgreSQL by adding the following to postgresql.conf:

wal_level=logical

If you are using a cloud service, refer to the corresponding prerequisites guide.

Quick Start

Creating an Import Job

Use CREATE STREAMING JOB to create a continuous import job:

CREATE JOB pg_single_sync
ON STREAMING
DO
INSERT INTO db1.tbl1
SELECT * FROM cdc_stream(
"type" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"table" = "source_table",
"offset" = "initial"
)

Check Import Status

select * from jobs("type"="insert") where ExecuteType = "STREAMING";

For more common operations (pause, resume, delete, check Task, etc.), see Continuous Load Overview.

Source Parameters

ParameterDefaultDescription
type-Data source type, set to postgres
jdbc_url-PostgreSQL JDBC connection string
driver_url-JDBC driver jar path
driver_class-JDBC driver class name
user-Database username
password-Database password
database-Database name
schema-Schema name
table-Table name to synchronize
offsetinitialinitial: full + incremental sync, latest: incremental only
snapshot_split_size8096Split size (in rows). During full sync, the table is divided into multiple splits
snapshot_parallelism1Parallelism during full sync phase, i.e., max splits per task

Import Configuration Parameters

ParameterDefaultDescription
session.*-Supports all session variables in job_properties. See Insert Into Select for import variables

For more common parameters (such as max_interval), see Continuous Load Overview.