跳到主要内容

持续导入概览

概述

Doris 支持通过 Streaming Job 的方式,从多种数据源持续导入数据到 Doris 表中。提交 Job 后,Doris 会持续运行导入作业,实时读取数据源中的数据并写入到 Doris 表中。

持续导入支持以下数据源和导入模式:

数据源支持版本表级同步库级同步配置指南
MySQL5.6、5.7、8.0.xMySQL 表级同步MySQL 库级同步Amazon RDS MySQL · Amazon Aurora MySQL
PostgreSQL14、15、16、17PostgreSQL 表级同步PostgreSQL 库级同步Amazon RDS PostgreSQL · Amazon Aurora PostgreSQL
S3-S3 持续导入--

如何选择

表级同步和库级同步是两种实现机制完全不同的持续导入方式,并非"表数量"的区别。库级同步也支持通过 include_tables 只同步一张表,因此选型应以能力需求为准:

能力维度表级同步库级同步
底层机制Job + TVF(INSERT INTO tbl SELECT * FROM tvf()Job + 原生整库 DDL(FROM src TO DATABASE db
目标层级一张已存在的 Doris 表一个 Doris database 容器
同步范围单张表一张到多张到整库(由 include_tables 控制)
自动建表❌ 需预建✅ 首次同步自动创建主键表
SQL 灵活表达✅ 支持列映射、过滤、转换(SELECT 子句)❌ 原样复制,不支持 ETL
语义保证exactly-onceat-least-once
所需权限LoadLoad + Create(自动建表时)
典型适用场景需要列裁剪、字段重命名、类型转换、条件过滤的实时同步整库或一组表的镜像复制,希望下游表结构自动跟随上游
  • 需要对数据做 SQL 加工,或对精确一次语义有严格要求 → 选 表级同步
  • 希望 Doris 自动建表、一次配置同步一组表 → 选 库级同步
  • 数据源是 S3 对象存储 → 只支持表级同步(S3 TVF 方式)

作业状态流转

Streaming Job 在运行过程中会在以下状态之间迁移,表级同步和库级同步遵循同一套状态机:

stateDiagram-v2
[*] --> PENDING: create job
PENDING --> RUNNING: createStreamingTask()
RUNNING --> FINISHED: 源消费完成
RUNNING --> PAUSED: 执行失败(记录 failReason)
PAUSED --> PENDING: autoResume 指数退避到期
FINISHED --> [*]
状态含义
PENDING作业已创建但尚未调度出子任务;等待下一次调度创建 StreamingTask
RUNNING已派生子任务并在执行中,从源端读取增量数据并写入 Doris
FINISHED源消费完成,作业终止。S3 TVF 文件全部导入完成后会进入该状态
PAUSED子任务执行失败,作业自动暂停并记录 failReason;可通过 select * from jobs(...)ErrorMsg 字段查看原因

自动恢复(autoResume): 作业进入 PAUSED 后,调度器会按指数退避策略定时尝试恢复,恢复时回到 PENDING 继续创建子任务。无需人工介入临时故障(网络抖动、上游短暂不可用等)会被自动消化。 若需立即恢复或排查故障后手动启动,使用 RESUME JOB;需要彻底停止不再调度则使用 PAUSE JOB(手动暂停不会被 autoResume 唤醒)或 DROP JOB

通用操作

查看导入状态

select * from jobs("type"="insert") where ExecuteType = "STREAMING";
结果列说明
IDJob ID
NAMEJob 名称
DefinerJob 定义者
ExecuteTypeJob 调度的类型:ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategy循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空
StatusJob 状态
ExecuteSqlJob 的 Insert SQL 语句
CreateTimeJob 创建时间
SucceedTaskCount成功任务数量
FailedTaskCount失败任务数量
CanceledTaskCount取消任务数量
CommentJob 注释
PropertiesJob 的属性
CurrentOffsetJob 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值
EndOffsetJob 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值
LoadStatisticJob 的统计信息
ErrorMsgJob 执行的错误信息
JobRuntimeMsgJob 运行时的一些提示信息

查看 Task 状态

select * from tasks("type"="insert") where jobId='<job_id>';
结果列说明
TaskId任务 ID
JobIDJobID
JobNameJob 名称
LabelTask 导入的 Label
StatusTask 的状态
ErrorMsgTask 失败信息
CreateTimeTask 的创建时间
StartTimeTask 的开始时间
FinishTimeTask 的完成时间
LoadStatisticTask 的统计信息
UserTask 的执行者
RunningOffset当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值

暂停导入作业

PAUSE JOB WHERE jobname = <job_name>;

恢复导入作业

RESUME JOB WHERE jobName = <job_name>;

删除导入作业

DROP JOB WHERE jobName = <job_name>;

通用参数

FE 配置参数

参数默认值说明
max_streaming_job_num1024最大的 Streaming 作业数量
job_streaming_task_exec_thread_num10用于执行 StreamingTask 的线程数
max_streaming_task_show_count100StreamingTask 在内存中最多保留的 task 执行记录

Job 通用导入配置参数

参数默认值说明
max_interval10s当上游没有新增数据时,空闲的调度间隔。

FAQ

MySQL 连接报错 Public Key Retrieval is not allowed

原因: 配置的 MySQL 用户使用 SHA256 密码认证方式,需要通过 TLS 等协议传输密码。

解决方案一: 在 JDBC URL 中添加 allowPublicKeyRetrieval=true 参数:

jdbc:mysql://127.0.0.1:3306?allowPublicKeyRetrieval=true

解决方案二: 将 MySQL 用户的认证方式改为 mysql_native_password

ALTER USER 'username'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
FLUSH PRIVILEGES;