跳到主要内容

从其他 TP 系统迁移数据

本文介绍如何将 MySQL、SQL Server、Oracle、PostgreSQL 等传统事务型(OLTP / TP)数据库中的数据迁移到 Apache Doris。Doris 提供了多种数据接入方案,可以覆盖一次性迁移、离线批量同步以及实时增量同步等不同场景。

迁移方式概览

下表汇总了 Doris 支持的几种主要迁移方案,可根据数据量、同步时效性以及是否需要 CDC(变更数据捕获)等需求选择。

方案适用场景同步模式是否支持整库是否支持增量 / CDC
Multi-Catalog一次性迁移、即席查询、CTAS 建表离线 / 按需通过 SQL 自行实现
Flink Doris Connector(JDBC)Flink 体系下的离线批量同步离线
Flink Doris Connector(CDC)实时全量 + 增量同步实时支持支持(INSERT / UPDATE / DELETE)
Spark ConnectorSpark 体系下的批量数据写入离线
Streaming JobDoris 内置的 MySQL / PostgreSQL 持续同步流式(持续)支持支持
第三方工具(DataX / SeaTunnel / CloudCanal)已有同步平台、可视化运维离线 / 实时支持视工具而定

说明:如果需要在不引入外部计算引擎的情况下完成迁移,推荐使用 Multi-CatalogStreaming Job;如果需要实时增量同步,推荐使用 Flink Doris Connector + CDCStreaming Job


方案一:Multi-Catalog(推荐用于一次性迁移)

通过 JDBC Catalog 将 TP 数据库中的表映射为 Doris 的外表,再使用 INSERT INTOCREATE TABLE AS SELECT (CTAS) 完成数据导入。

操作步骤

  1. 创建指向源 TP 数据库的 JDBC Catalog。
  2. 使用 INSERT INTO 写入已有的 Doris 表,或使用 CTAS 直接建表并写入。
  3. 验证数据条数与字段一致性。

示例:从 MySQL 迁移数据

CREATE CATALOG mysql_catalog properties(
'type' = 'jdbc',
'user' = 'root',
'password' = '123456',
'jdbc_url' = 'jdbc:mysql://host:3306/mysql_db',
'driver_url' = 'mysql-connector-java-8.0.25.jar',
'driver_class' = 'com.mysql.cj.jdbc.Driver'
);

-- 通过 INSERT INTO 导入已存在的表
INSERT INTO internal.doris_db.tbl1
SELECT * FROM iceberg_catalog.iceberg_db.table1;

-- 通过 CTAS 一步建表并导入
CREATE TABLE internal.doris_db.tbl1
PROPERTIES('replication_num' = '1')
AS
SELECT * FROM iceberg_catalog.iceberg_db.table1;

更多细节参见 Catalog 数据导入


借助 Flink,可以实现 TP 系统的离线批量同步与实时增量同步两种模式。

适用于 Flink 体系下的离线数据搬运。以 Flink SQL 为例:

CREATE TABLE student_source (
id INT,
name STRING,
age INT
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'students',
'username' = 'username',
'password' = 'password',
);

CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.students',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);

INSERT into student_sink select * from student_source;

更多细节参见 Flink JDBC

借助 Flink CDC,可以一次性完成全量数据读取 + 增量数据捕获,并支持 INSERT / UPDATE / DELETE 事件的同步。

单表实时同步示例

SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);

-- 支持同步 INSERT / UPDATE / DELETE 事件
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;

整库 / 多表同步

对于 TP 数据库的整库或多表同步,Flink Doris Connector 提供了开箱即用的整库同步功能,可一键完成 schema 同步与数据写入:

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

更多细节参见 整库同步


方案三:Spark Connector

适合已经使用 Spark 进行批处理的场景。可以通过 Spark Connector 的 JDBC Source 读取 TP 数据库,并通过 Doris Sink 写入 Doris。

val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

jdbcDF.write.format("doris")
.option("doris.table.identifier", "db.table")
.option("doris.fenodes", "127.0.0.1:8030")
.option("user", "root")
.option("password", "")
.save()

相关文档:


方案四:Streaming Job(Doris 内置持续同步)

Doris 提供了内置的 Streaming Job 能力,支持以持续运行的方式从 MySQL 或 PostgreSQL 同步数据,无需依赖外部计算引擎。

多表同步示例

CREATE JOB multi_table_sync
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
"driver_url" = "mysql-connector-j-8.0.31.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "test",
"include_tables" = "user_info,order_info",
"offset" = "initial"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1"
)

更多细节参见 Postgres / MySQL Continuous Load


方案五:第三方同步工具

除上述原生方案外,也可以使用社区生态中常用的第三方同步工具完成数据迁移,适合已有同步平台或希望通过可视化方式管理任务的场景:


FAQ

Q1:一次性把 MySQL 里的几张表迁到 Doris,应该选哪种方案?

推荐使用 Multi-Catalog。创建 JDBC Catalog 后,直接使用 INSERT INTOCTAS 即可完成迁移,无需额外组件。

Q2:需要从 MySQL 实时同步增量数据(含 UPDATE / DELETE)到 Doris,怎么做?

可以选择 Flink Doris Connector + Flink CDC,并在 Doris Sink 中开启 sink.enable-delete = true;或使用 Doris 内置的 Streaming Job 实现持续同步。

Q3:如何一次性同步整个 MySQL 库到 Doris?

使用 Flink Doris Connector 的 mysql-sync-database 整库同步功能,或使用 Streaming Job 的多表同步能力,均可一键完成 schema 与数据的同步。

Q4:源端是 Oracle / SQL Server / PostgreSQL,是否同样支持?

是的。Multi-Catalog 与 Flink / Spark Connector 均支持通过对应的 JDBC 驱动连接 Oracle、SQL Server、PostgreSQL 等 TP 数据库;Streaming Job 当前主要支持 MySQL 与 PostgreSQL。

Q5:迁移过程中如何指定 Doris 表的副本数?

CTAS 语句中通过 PROPERTIES('replication_num' = 'N') 指定,或在 Flink 整库同步命令中通过 --table-conf replication_num=N 指定。