跳到主要内容

持续导入

概述

Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。

支持的 TVF

S3 TVF

基本原理

S3

遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。

增量读取方式

创建任务后,Doris 会持续从指定路径中读取数据,并以固定频率轮询是否有新文件。

注意:新文件的名称必须按字典序大于上一次已导入的文件名,否则 Doris 不会将其作为新文件处理。比如,文件命名为 file1、file2、file3 时会按顺序导入;如果随后新增一个 file0,由于它在字典序上小于最后已导入的文件 file3,Doris 将不会导入该文件。

快速上手

创建导入作业

假设 S3 的目录下,会定期的产生以 CSV 结尾的文件。此时可以创建 Job

CREATE JOB my_job 
ON STREAMING
DO
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

查看导入状态

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://bucket/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment: \N
Properties: \N
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
ErrorMsg: \N
JobRuntimeMsg: \N

暂停导入作业

PAUSE JOB WHERE jobname = <job_name> ;

恢复导入作业

RESUME JOB where jobName = <job_name> ;

修改导入作业

-- 支持修改Job的properties和insert语句
Alter Job jobName
PROPERTIES(
"session.insert_max_filter_ratio"="0.5"
)
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

删除导入作业

DROP JOB where jobName = <job_name> ;

参考手册

导入命令

创建一个 Job + TVF 常驻导入作业语法如下:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
DO <Insert_Command>

创建的模块说明如下:

模块说明
job_name任务名
job_properties用于指定 Job 的通用导入参数
comment用于描述 Job 作业的备注信息
Insert_Command用于执行的 SQL,目前只支持 Insert into table select * from s3()

导入参数

FE 配置参数

参数默认值
max_streaming_job_num1024最大的 Streaming 作业数量
job_streaming_task_exec_thread_num10用于执行 StreamingTask 的线程数
max_streaming_task_show_count100StreamingTask 在内存中最多保留的 task 执行记录

导入配置参数

参数默认值说明
session.*支持在 job_properties 上配置所有的 session 变量,导入变量可参考 Insert Into Select
s3.max_batch_files256当累计文件数达到该值时触发一次导入写入
s3.max_batch_bytes10G当累计数据量达到该值时触发一次导入写入
max_interval10s当上游没有新增文件或数据时,空闲的调度间隔。

导入状态

Job

Job 提交成功后,可以执行 select * from job("insert") where ExecuteType = 'Streaming' 来查看 Job 当前的状态

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://wd-test123/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 5
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"}
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

具体显示参数结果如下

结果列说明
IDJob ID
NAMEJob 名称
Definerjob 定义者
ExecuteTypeJob 调度的类型:ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategy循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空
StatusJob 状态
ExecuteSqlJob 的 Insert SQL 语句
CreateTimejob 创建时间
SucceedTaskCount成功任务数量
FailedTaskCount失败任务数量
CanceledTaskCount取消任务数量
Commentjob 注释
Propertiesjob 的属性
CurrentOffsetJob 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值
EndOffsetJob 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值
LoadStatisticJob 的统计信息
ErrorMsgJob 执行的错误信息
JobRuntimeMsgJob 运行时的一些提示信息

Task

可以执行select * from tasks(type='insert') where jobId='1758534452459' 来查看每次 Task 的运行状态。

注:只会保留当前最新的一次 Task 信息。

mysql> select * from tasks(type='insert') where jobId='1758534452459'\G
*************************** 1. row ***************************
TaskId: 1758534723330
JobId: 1758534452459
JobName: test_streaming_insert_job_name
Label: 1758534452459_1758534723330
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-09-22 17:52:55
StartTime: \N
FinishTime: \N
TrackingUrl: \N
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
User: root
FirstErrorMsg: \N
RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"}
结果列说明
TaskId任务 ID
JobIDJobID
JobNameJob 名称
LabelInsert 的 Label
StatusTask 的状态
ErrorMsgtask 失败信息
CreateTimeTask 的创建时间
StartTimeTask 的开始时间
FinishTimeTask 的完成时间
TrackingUrlInsert 的错误 URL
LoadStatisticTask 的统计信息
Usertask 的执行者
FirstErrorMsg普通的 InsertTask 第一次数据质量错误的信息
RunningOffset当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值