メインコンテンツまでスキップ

TVF連続負荷

概要

DorisではJob + TVFアプローチを使用して継続的なインポートタスクを作成できます。Jobを送信すると、Dorisは継続的にインポートジョブを実行し、リアルタイムでTVFにクエリを実行してデータをDorisテーブルに書き込みます。

サポートされているTVF

S3 TVF

基本原理

S3

S3の指定されたディレクトリ内のファイルを反復処理し、各ファイルをリストに分割して小さなバッチでDorisテーブルに書き込みます。

インクリメンタル読み込み方法

タスクの作成後、Dorisは指定されたパスから継続的にデータを読み込み、固定頻度で新しいファイルをポーリングします。

注意:新しいファイルの名前は最後にインポートされたファイルの名前よりも辞書順で大きくなければなりません。そうでない場合、Dorisは新しいファイルとして扱いません。例えば、ファイルがfile1、file2、file3と命名されている場合、これらは順次インポートされます。後からfile0という名前の新しいファイルが追加された場合、最後にインポートされたファイルfile3よりも辞書順で小さいため、Dorisはインポートしません。

クイックスタート

インポートジョブの作成

S3ディレクトリでCSVで終わるファイルが定期的に生成されると仮定します。その後、Jobを作成できます。

CREATE JOB my_job 
ON STREAMING
DO
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

インポートステータスの確認

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://bucket/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment: \N
Properties: \N
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
ErrorMsg: \N
JobRuntimeMsg: \N

インポートジョブの一時停止

PAUSE JOB WHERE jobname = <job_name> ;

インポートジョブの再開

RESUME JOB where jobName = <job_name> ;

import ジョブの変更

-- -- Supports modifying Job properties and insert statements
Alter Job jobName
PROPERTIES(
"session.insert_max_filter_ratio"="0.5"
)
INSERT INTO db1.tbl1
select * from S3(
"uri" = "s3://bucket/*.csv",
"s3.access_key" = "<s3_access_key>",
"s3.secret_key" = "<s3_secret_key>",
"s3.region" = "<s3_region>",
"s3.endpoint" = "<s3_endpoint>",
"format" = "<format>"
)

インポートされたジョブを削除する

DROP JOB where jobName = <job_name> ;

Reference

Import command

Job + TVF 常駐インポートジョブを作成する構文は以下の通りです:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
DO <Insert_Command>

モジュールの説明は以下の通りです:

| モジュール | 説明 |

| -------------- | ------------------------------------------------------------ | | job_name | タスク名 | | job_properties | Jobを指定するために使用される一般的なインポートパラメータ | | comment | Jobを説明するために使用される備考 | | Insert_Command | 実行するSQL;現在はInsert into table select * from s3()のみサポート |

インポートパラメータ

FE設定パラメータ

パラメータデフォルト値
max_streaming_job_num1024Streamingジョブの最大数
job_streaming_task_exec_thread_num10StreamingTaskを実行するために使用されるスレッド数
max_streaming_task_show_count100StreamingTaskのメモリ内に保持されるタスク実行記録の最大数

インポート設定パラメータ

パラメータデフォルト値説明
session.*なしjob_propertiesでのすべてのsession変数の設定をサポート。インポート変数については、[Insert Into Select](../../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters)を参照してください
s3.max_batch_files256累積ファイル数がこの値に達したときにインポート書き込みをトリガーします。
s3.max_batch_bytes10G累積データ量がこの値に達したときにインポート書き込みをトリガーします。
max_interval10s上流で新しいファイルやデータが追加されていない場合のアイドルスケジューリング間隔。

インポート状況

Job

ジョブが正常に送信された後、select * from job("insert") where ExecuteType = 'Streaming' を実行してジョブの現在の状況を確認できます。

select * from job(type=insert) where ExecuteType = "streaming"
Id: 1758538737484
Name: my_job1
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: INSERT INTO test.`student1`
SELECT * FROM S3
(
"uri" = "s3://wd-test123/s3/demo/*.csv",
"format" = "csv",
"column_separator" = ",",
"s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
"s3.region" = "ap-southeast-1",
"s3.access_key" = "",
"s3.secret_key" = ""
)
CreateTime: 2025-09-22 19:24:51
SucceedTaskCount: 5
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"}
CurrentOffset: {"fileName":"s3/demo/test/1.csv"}
EndOffset: {"fileName":"s3/demo/test/1.csv"}
LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

具体的なパラメータ結果は以下のように表示されます:

Result ColumnsDescription
IDJob ID
NAMEJob Name
DefinerJob Definer
ExecuteTypeJobスケジューリングタイプ: ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategy繰り返し戦略。通常のInsert操作で使用される;ExecuteType=Streamingの場合は空
StatusJobステータス
ExecuteSqlJobのInsert SQL文
CreateTimeJob作成時刻
SucceedTaskCount成功したタスク数
FailedTaskCount失敗したタスク数
CanceledTaskCountキャンセルされたタスク数
CommentJobコメント
PropertiesJobプロパティ
CurrentOffsetJobの現在の完了オフセット。ExecuteType=Streamingの場合のみ値を持つ。
EndOffsetJobがデータソースから取得した最大EndOffset。ExecuteType=Streamingの場合のみ値を持つ。
LoadStatisticJob統計。
ErrorMsgJob実行中のエラーメッセージ。
JobRuntimeMsgJobのランタイム情報。

Task

select \* from tasks(type='insert') where jobId='1758534452459'を実行して、各Taskの実行状況を確認できます。

注意:最新のTask情報のみが保持されます。

mysql> select * from tasks(type='insert') where jobId='1758534452459'\G
*************************** 1. row ***************************
TaskId: 1758534723330
JobId: 1758534452459
JobName: test_streaming_insert_job_name
Label: 1758534452459_1758534723330
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-09-22 17:52:55
StartTime: \N
FinishTime: \N
TrackingUrl: \N
LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256}
User: root
FirstErrorMsg: \N
RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"}
Results ColumnsDescription
TaskIdTask ID
JobIDJobID
JobNameJob名
LabelInsertのラベル
StatusTaskのステータス
ErrorMsgTask失敗情報
CreateTimeTask作成時刻
StartTimeTask開始時刻
FinishTimeTask完了時刻
TrackingUrlInsertのエラーURL
LoadStatisticTask統計
Usertaskの実行者
FirstErrorMsg通常のInsertTaskにおける最初のデータ品質エラーに関する情報
RunningOffset現在のTask同期のオフセット情報。Job.ExecuteType=Streamingの場合のみ値を持つ