Skip to main content

PostgreSQL Table-Level Sync

Table-level sync is implemented through Job + CDC Stream TVF. The target is an existing Doris table (INSERT INTO tbl SELECT * FROM cdc_stream(...)). Doris SQL provides the expressiveness to support column mapping, filtering, and data transformation, while guaranteeing exactly-once semantics. This approach is suitable for real-time sync scenarios where data needs to be processed before writing.

By integrating the read capability of Flink CDC, Doris reads change logs (WAL) from PostgreSQL to perform full + incremental sync from the source table to the target table. If you want Doris to automatically create downstream tables and sync a group of tables on a per-database basis, see PostgreSQL Database-Level Sync.

Use Cases

  • Real-time ingestion of a single table into the warehouse, with column pruning, filtering, or expression-based transformation required before write.
  • The downstream Doris table already exists, and you want SQL to explicitly control field mapping.
  • High consistency requirements that demand exactly-once semantics.

Prerequisites

Before creating a job, confirm the following:

  1. Logical replication is enabled on the PostgreSQL side. See the Configuration Guide for details.
  2. The current user has the Load privilege.
  3. The source table is a primary key table (only primary key tables are currently supported for sync).
  4. The target table has been created in Doris in advance, and its schema is compatible with the mapping SQL.

Notes

  • Exactly-once semantics is supported.
  • Only primary key table sync is currently supported.
  • The Load privilege is required.
  • Logical replication must be enabled on the PostgreSQL side. See the Configuration Guide.

Quick Start

The overall workflow has two steps: create the continuous load job, then check the job status.

Step 1: Create the Load Job

Use CREATE STREAMING JOB to create a continuous load job that syncs public.source_table from PostgreSQL to db1.tbl1 in Doris:

CREATE JOB pg_single_sync
ON STREAMING
DO
INSERT INTO db1.tbl1
SELECT * FROM cdc_stream(
"type" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.1.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"table" = "source_table",
"offset" = "initial"
)

Step 2: Check the Load Status

Use the jobs table function to check the running status of Streaming-type jobs:

select * from jobs("type"="insert") where ExecuteType = "STREAMING";

For other common operations (pause, resume, delete, view tasks, and so on), see the Continuous Load Overview.

Parameter Reference

Data Source Parameters

Data source parameters are configured through the cdc_stream(...) TVF. They describe the PostgreSQL table to be synced and the read behavior.

ParameterDefaultDescription
type-Data source type. Set to postgres.
jdbc_url-PostgreSQL JDBC connection string.
driver_url-Path to the JDBC driver jar. Supports a file name, a local absolute path, or an HTTP URL. See the JDBC Catalog Overview for details.
driver_class-JDBC driver class name.
user-Database username.
password-Database password.
database-Database name.
schema-Schema name.
table-Name of the table to sync.
offsetinitialinitial: full + incremental sync. latest: incremental sync only.
snapshot_split_size8096Split size in rows. During full sync, the table is divided into multiple splits for syncing.
snapshot_parallelism1Parallelism for the full sync stage, that is, the maximum number of splits scheduled per Task.

Load Configuration Parameters

Load configuration parameters are set through the job's job_properties. They control load behavior and session variables.

ParameterDefaultDescription
session.*NoneAll session variables can be configured in job_properties. For load-related variables, see Insert Into Select.

For more common parameters (such as max_interval), see the Common Parameters section in the Continuous Load Overview.