Skip to main content

Continuous Load

Overview

Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job, querying the TVF in real time and writing the data into the Doris table.

Supported TVFs

S3 TVF

Basic Principles

S3

Iterates through the files in the specified directory of S3, splitting each file into a list and writing it to the Doris table in small batches.

Incremental Read Method

After creating the task, Doris continuously reads data from the specified path and polls for new files at a fixed frequency.

Note: The name of a new file must be lexicographically greater than the name of the last imported file; otherwise, Doris will not treat it as a new file. For example, if files are named file1, file2, and file3, they will be imported sequentially; if a new file named file0 is added later, Doris will not import it because it is lexicographically less than the last imported file, file3.

Quick Start

Creating an Import Job

Assume that files ending in CSV are periodically generated in the S3 directory. You can then create a Job.

CREATE JOB my_job 
ON STREAMING
DO
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

Check import status

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://bucket/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment: \N
Properties: \N
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
ErrorMsg: \N
JobRuntimeMsg: \N

Pause import job

PAUSE JOB WHERE jobname = <job_name> ;

Resume import job

RESUME JOB where jobName = <job_name> ;

Modify import job

-- -- Supports modifying Job properties and insert statements
Alter Job jobName
PROPERTIES(
"session.insert_max_filter_ratio"="0.5"
)
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

Delete imported jobs

DROP JOB where jobName = <job_name> ;

Reference

Import command

创建一个 Job + TVF 常驻导入作业语法如下:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
DO <Insert_Command>

The module description is as follows:

| Module | Description |

| -------------- | ------------------------------------------------------------ | | job_name | Task name | | job_properties | General import parameters used to specify the Job | | comment | Remarks used to describe the Job | | Insert_Command | SQL to execute; currently only Insert into table select * from s3() is supported |

Importing Parameters

FE Configuration Parameters

ParameterDefault Value
max_streaming_job_num1024Maximum number of Streaming jobs
job_streaming_task_exec_thread_num10Number of threads used to execute StreamingTasks
max_streaming_task_show_count100Maximum number of task execution records kept in memory for a StreamingTask

Import Configuration Parameters

ParameterDefault ValueDescription
session.*NoneSupports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters)
s3.max_batch_files256Triggers an import write when the cumulative number of files reaches this value.
s3.max_batch_bytes10GTriggers an import write when the cumulative data volume reaches this value.
max_interval10sThe idle scheduling interval when there are no new files or data added upstream.

Import Status

Job

After a job is successfully submitted, you can execute select * from job("insert") where ExecuteType = 'Streaming' to check the current status of the job.

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://wd-test123/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 5
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"}
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

The specific parameter results are displayed as follows:

Result ColumnsDescription
IDJob ID
NAMEJob Name
DefinerJob Definer
ExecuteTypeJob scheduling type: ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategyRecurring strategy. Used in normal Insert operations; empty when ExecuteType=Streaming
StatusJob status
ExecuteSqlJob's Insert SQL statement
CreateTimeJob creation time
SucceedTaskCountNumber of successful tasks
FailedTaskCountNumber of failed tasks
CanceledTaskCountNumber of canceled tasks
CommentJob comment
PropertiesJob properties
CurrentOffsetJob's current completion offset. Only ExecuteType=Streaming has a value.
EndOffsetThe maximum EndOffset obtained by the Job from the data source. Only ExecuteType=Streaming has a value.
LoadStatisticJob statistics.
ErrorMsgError messages during Job execution.
JobRuntimeMsgSome runtime information for the Job.

Task

You can execute select \* from tasks(type='insert') where jobId='1758534452459' to view the running status of each Task.

Note: Only the latest Task information will be retained.

mysql> select * from tasks(type='insert') where jobId='1758534452459'\G
*************************** 1. row ***************************
TaskId: 1758534723330
JobId: 1758534452459
JobName: test_streaming_insert_job_name
Label: 1758534452459_1758534723330
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-09-22 17:52:55
StartTime: \N
FinishTime: \N
TrackingUrl: \N
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
User: root
FirstErrorMsg: \N
RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"}
Results ColumnsDescription
TaskIdTask ID
JobIDJobID
JobNameJob Name
LabelLabel of Insert
StatusStatus of Task
ErrorMsgTask failure information
CreateTimeTask creation time
StartTimeTask start time
FinishTimeTask completion time
TrackingUrlError URL of Insert
LoadStatisticTask statistics
UserExecutor of task
FirstErrorMsgInformation about the first data quality error in a normal InsertTask
RunningOffsetOffset information of the current Task synchronization. Only has a value if Job.ExecuteType=Streaming