跳到主要内容

CDC_STREAM

描述

CDC Stream 表函数(table-valued-function, tvf)可以让用户通过 CDC 方式读取关系型数据库(如 MySQL、PostgreSQL)的增量变更数据。通过集成 Flink CDC 的读取能力,持续读取数据库的变更日志(Binlog/WAL)并写入 Doris。

通常与 CREATE JOB ON STREAMING 配合使用,实现持续的单表增量数据同步。详细使用方式请参考 MySQL 单表导入PostgreSQL 单表导入

备注

CDC Stream TVF 单独使用时仅支持增量数据同步,不支持全量快照读取。配合 CREATE STREAMING JOB 使用时支持全量 + 增量同步。

语法

cdc_stream(
"type" = "<source_type>",
"jdbc_url" = "<jdbc_url>",
"driver_url" = "<driver_url>",
"driver_class" = "<driver_class>",
"user" = "<user>",
"password" = "<password>",
"database" = "<database>",
"table" = "<table>"
[, "<optional_property_key>" = "<optional_property_value>" [, ...] ]
)

必选参数

参数描述
type数据源类型,目前支持 mysqlpostgres
jdbc_urlJDBC 连接串,如 jdbc:mysql://127.0.0.1:3306jdbc:postgresql://127.0.0.1:5432/postgres
driver_urlJDBC 驱动 jar 包路径
driver_classJDBC 驱动类名,MySQL 为 com.mysql.cj.jdbc.Driver,PostgreSQL 为 org.postgresql.Driver
user数据库用户名
password数据库密码
database数据库名
table需要同步的表名

可选参数

参数默认值描述
schema-Schema 名称,PostgreSQL 必填

注意事项

  1. CDC Stream TVF 单独使用时仅支持增量数据同步;配合 CREATE JOB ON STREAMING 使用时支持全量 + 增量同步。
  2. 通常需要配合 CREATE JOB ON STREAMING 使用,不建议直接在普通查询中使用。
  3. 使用 MySQL 类型时,需要在 MySQL 端开启 Binlog(binlog_format=ROW)。
  4. 使用 PostgreSQL 类型时,需要开启逻辑复制(wal_level=logical)。
  5. 支持 exactly-once 语义。

示例

  • 从 MySQL 持续同步单表数据

    CREATE JOB mysql_cdc_job
    ON STREAMING
    DO
    INSERT INTO db1.target_table
    SELECT * FROM cdc_stream(
    "type" = "mysql",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
    "driver_url" = "mysql-connector-j-8.0.31.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "user" = "root",
    "password" = "123456",
    "database" = "source_db",
    "table" = "source_table",
    )
  • 从 PostgreSQL 持续同步单表数据

    CREATE JOB pg_cdc_job
    ON STREAMING
    DO
    INSERT INTO db1.target_table
    SELECT * FROM cdc_stream(
    "type" = "postgres",
    "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
    "driver_url" = "postgresql-42.5.0.jar",
    "driver_class" = "org.postgresql.Driver",
    "user" = "postgres",
    "password" = "postgres",
    "database" = "postgres",
    "schema" = "public",
    "table" = "source_table",
    )
  • 指定列映射和数据转换

    CREATE JOB mysql_cdc_transform_job
    ON STREAMING
    DO
    INSERT INTO db1.target_table (id, name, age)
    SELECT id, name, cast(age as INT) as age
    FROM cdc_stream(
    "type" = "mysql",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
    "driver_url" = "mysql-connector-j-8.0.31.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "user" = "root",
    "password" = "123456",
    "database" = "source_db",
    "table" = "source_table",
    )