跳到主要内容

PostgreSQL

Doris 提供以下方式从 PostgreSQL 导入数据:

  • 使用 JDBC Catalog 导入 PostgreSQL 数据

Doris 通过 JDBC Catalog 将 PostgreSQL 映射为外部 Catalog,可以直接以 SQL 的方式查询 PostgreSQL 中的数据,并通过 INSERT INTOCREATE TABLE AS SELECT 完成数据导入,适用于一次性迁移或周期性批量导入场景。

  • 使用 Streaming Job 持续同步 PostgreSQL 数据

Doris 通过 Streaming Job 将 PostgreSQL 的全量与增量数据持续同步到 Doris 中。Streaming Job 集成了 Flink CDC 的读取能力,提交作业后 Doris 会持续运行任务,从 PostgreSQL 读取 WAL 并写入到 Doris 表中,支持 exactly-once 语义,分为表级同步和库级同步两种模式。该方式自 Doris 4.1.0 起支持。

  • 使用 Flink Doris Connector 导入 PostgreSQL 数据

可以通过 Flink Doris Connector 配合 Flink Postgres CDC 实现实时同步,适用于需要在 Flink 中对数据进行额外流式处理的场景。Connector 同时提供一键整库同步工具,更多信息请参考 Flink Doris Connector

  • 使用第三方工具导入 PostgreSQL 数据

DataXSeaTunnelCloudCanal 等数据集成工具同样支持将 PostgreSQL 数据同步到 Doris。

在大多数场景下,可以直接使用 JDBC Catalog 进行一次性的数据迁移;当需要持续同步全量+增量数据时,推荐使用 Streaming Job。

使用 JDBC Catalog 导入 PostgreSQL 数据

通过 JDBC Catalog 把 PostgreSQL 映射为 Doris 的外部 Catalog,再使用 INSERT INTOCREATE TABLE AS SELECT 完成数据导入。详细语法请参考 JDBC PostgreSQL Catalog

第 1 步:在 PostgreSQL 中准备数据

CREATE TABLE public.students (
id INT PRIMARY KEY,
name VARCHAR(64),
age INT
);

INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);

第 2 步:在 Doris 中创建 Catalog

CREATE CATALOG pg_catalog PROPERTIES (
"type" = "jdbc",
"user" = "postgres",
"password" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.1.jar",
"driver_class" = "org.postgresql.Driver"
);

第 3 步:在 Doris 中创建目标表

CREATE DATABASE IF NOT EXISTS doris_db;

CREATE TABLE doris_db.students (
id INT,
name VARCHAR(64),
age INT
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_num" = "1");

第 4 步:通过 INSERT INTO 导入数据

INSERT INTO doris_db.students
SELECT id, name, age FROM pg_catalog.postgres.public.students;

如果目标表尚未创建,也可以使用 CREATE TABLE AS SELECT 一步完成建表与导入:

CREATE TABLE doris_db.students
PROPERTIES ("replication_num" = "1")
AS
SELECT * FROM pg_catalog.postgres.public.students;

第 5 步:检查导入数据

SELECT * FROM doris_db.students;
+----+-------+------+
| id | name | age |
+----+-------+------+
| 1 | Emily | 25 |
| 2 | Bob | 30 |
+----+-------+------+

使用 Streaming Job 持续同步 PostgreSQL 数据

Streaming Job 通过集成 Flink CDC 持续读取 PostgreSQL 的 WAL 并写入 Doris,支持以下两种模式:

  • PostgreSQL 库级同步:以库为单位同步(通过 include_tables 可控制同步一张、多张或全部表),首次同步时由 Doris 自动建表,提供 at-least-once 语义。
  • PostgreSQL 表级同步:以表为单位同步,目标表需预先在 Doris 中创建,支持灵活的列映射和数据转换,提供 exactly-once 语义。

使用限制

  1. 仅支持主键表(Unique Key)同步。
  2. 需要 Load 权限,库级同步首次自动建表时还需 Create 权限。
  3. 该功能自 Doris 4.1.0 起支持。

前置配置

提交 Streaming Job 之前,需要在 PostgreSQL 端开启逻辑复制(wal_level=logical),并授予用户相应的复制(REPLICATION)权限。不同部署环境的具体配置步骤请参考:

操作示例:库级同步

库级同步使用 FROM POSTGRES ... TO DATABASE ... 语法,目标是一个 Doris database,首次同步时由 Doris 自动创建下游表。

第 1 步:在 PostgreSQL 中准备数据

CREATE TABLE public.students (
id INT PRIMARY KEY,
name VARCHAR(64),
age INT
);

INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);

第 2 步:在 Doris 中创建目标 database

库级同步不需要预建表,但需要先创建用于承接同步表的 database:

CREATE DATABASE IF NOT EXISTS doris_db;

第 3 步:创建 Streaming Job

下面的示例通过 include_tables 仅同步 students 一张表(多张表用逗号分隔,留空则同步整库):

CREATE JOB pg_db_sync
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.1.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"include_tables" = "students",
"offset" = "initial"
)
TO DATABASE doris_db (
"table.create.properties.replication_num" = "1" -- 单 BE 部署时设置为 1
);

第 4 步:查看导入状态

SELECT * FROM jobs("type"="insert") WHERE ExecuteType = "STREAMING";

第 5 步:检查自动创建的 Doris 表与导入数据

SHOW TABLES FROM doris_db;
SELECT * FROM doris_db.students;

更多通用操作和完整参数说明,请参考 PostgreSQL 库级同步

操作示例:表级同步

第 1 步:在 PostgreSQL 中准备数据

CREATE TABLE public.students (
id INT PRIMARY KEY,
name VARCHAR(64),
age INT
);

INSERT INTO public.students VALUES (1, 'Emily', 25), (2, 'Bob', 30);

第 2 步:在 Doris 中创建目标表

表级同步要求目标表预先存在:

CREATE DATABASE IF NOT EXISTS doris_db;

CREATE TABLE doris_db.students (
id INT,
name VARCHAR(64),
age INT
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_num" = "1");

第 3 步:创建 Streaming Job

通过 CREATE STREAMING JOB 创建表级同步作业,使用 INSERT INTO ... SELECT * FROM cdc_stream(...) 语法:

CREATE JOB pg_students_sync
ON STREAMING
DO
INSERT INTO doris_db.students
SELECT * FROM cdc_stream(
"type" = "postgres",
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.1.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"table" = "students",
"offset" = "initial"
);

第 4 步:查看导入状态

SELECT * FROM jobs("type"="insert") WHERE ExecuteType = "STREAMING";

第 5 步:检查导入数据

SELECT * FROM doris_db.students;

更多通用操作(暂停、恢复、删除、查看 Task 等)以及完整参数说明,请参考 PostgreSQL 表级同步