PostgreSQL Single-table Import
Overview
Doris supports continuously synchronizing full and incremental data from a single PostgreSQL table into a specified Doris table using Job + CDC Stream TVF. This is suitable for real-time synchronization scenarios that require flexible column mapping and data transformation on a single table.
By integrating Flink CDC reading capabilities, Doris supports reading change logs (WAL) from PostgreSQL databases, enabling full and incremental data synchronization for a single table.
Notes:
- Supports exactly-once semantics.
- Currently only primary key tables are supported for synchronization.
- LOAD privilege is required.
- Logical replication must be enabled on the PostgreSQL side. If you are using a cloud service, see the Setup Guide.
Prerequisites
Enable logical replication on PostgreSQL by adding the following to postgresql.conf:
wal_level=logical
If you are using a cloud service, refer to the corresponding prerequisites guide.
Quick Start
Creating an Import Job
Use CREATE STREAMING JOB to create a continuous import job:
CREATE JOB pg_single_sync
ON STREAMING
DO
INSERT INTO db1.tbl1
SELECT * FROM cdc_stream(
"type" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"table" = "source_table",
"offset" = "initial"
)
Check Import Status
select * from jobs("type"="insert") where ExecuteType = "STREAMING";
For more common operations (pause, resume, delete, check Task, etc.), see Continuous Load Overview.
Source Parameters
| Parameter | Default | Description |
|---|---|---|
| type | - | Data source type, set to postgres |
| jdbc_url | - | PostgreSQL JDBC connection string |
| driver_url | - | JDBC driver jar path |
| driver_class | - | JDBC driver class name |
| user | - | Database username |
| password | - | Database password |
| database | - | Database name |
| schema | - | Schema name |
| table | - | Table name to synchronize |
| offset | initial | initial: full + incremental sync, latest: incremental only |
| snapshot_split_size | 8096 | Split size (in rows). During full sync, the table is divided into multiple splits |
| snapshot_parallelism | 1 | Parallelism during full sync phase, i.e., max splits per task |
Import Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
| session.* | - | Supports all session variables in job_properties. See Insert Into Select for import variables |
For more common parameters (such as max_interval), see Continuous Load Overview.