Skip to main content

S3

Overview

Doris allows you to create a continuous import task using a Job + S3 TVF approach. After submitting the Job, Doris continuously runs the import job, querying the S3 TVF in real time and writing the data into the Doris table.

Basic Principles

Iterates through the files in the specified directory of S3, splitting each file into a list and writing it to the Doris table in small batches.

Incremental Read Method

After creating the task, Doris continuously reads data from the specified path and polls for new files at a fixed frequency.

Note: The name of a new file must be lexicographically greater than the name of the last imported file; otherwise, Doris will not treat it as a new file. For example, if files are named file1, file2, and file3, they will be imported sequentially; if a new file named file0 is added later, Doris will not import it because it is lexicographically less than the last imported file, file3.

Quick Start

Creating an Import Job

Use CREATE STREAMING JOB to create a continuous import job.

Assume that files ending in CSV are periodically generated in the S3 directory. You can then create a Job:

CREATE JOB my_job 
ON STREAMING
DO
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

Check Import Status

select * from jobs("type"="insert") where ExecuteType = "STREAMING"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://bucket/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment: \N
Properties: \N
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
ErrorMsg: \N
JobRuntimeMsg: \N

Modify Import Job

-- Support modifying Job properties and insert statement
Alter Job jobName
PROPERTIES(
"session.insert_max_filter_ratio"="0.5"
)
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

For more common operations (pause, resume, delete, check Task, etc.), see Continuous Load Overview.

Reference

Import Command

Syntax for creating an S3 TVF continuous import job:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
DO <Insert_Command>
ModuleDescription
job_nameJob name
job_propertiesGeneral import parameters for the Job
commentJob comment
Insert_CommandSQL to execute, i.e., INSERT INTO table SELECT * FROM S3()

Import Configuration Parameters

ParameterDefaultDescription
session.*-Supports all session variables in job_properties. See Insert Into Select for import variables
s3.max_batch_files256Triggers an import write when the accumulated file count reaches this value
s3.max_batch_bytes10GTriggers an import write when the accumulated data size reaches this value
max_interval10sIdle scheduling interval when no new files or data upstream