メインコンテンツまでスキップ

ストリーミングジョブを作成

説明

Doris Streaming JobはJobアプローチに基づいた継続的なインポートタスクです。Jobが送信された後、Dorisは継続的にインポートジョブを実行し、TVFまたは上流のデータソースをリアルタイムでクエリして、データをDorisテーブルに書き込みます。

構文

CREATE JOB <job_name>
ON STREAMING
[ PROPERTIES (
<job_property>
[ , ... ]
)
]
[ COMMENT <comment> ]
(
DO <Insert_Command>
|
(
FROM <sourceType> (
<source_property>
[ , ... ])
TO DATABASE <target_db>
[ PROPERTIES (
<target_property>
[ , ... ])
]
)

必須パラメータ

1. <job_name>

ジョブ名。データベース内のイベントを一意に識別します。ジョブ名はグローバルに一意である必要があります。同名のジョブが既に存在する場合、エラーが報告されます。

2. <Insert_Command>

DO句は、ジョブがトリガーされた際に実行される操作、つまりSQL文を指定します。現在、S3 TVFのみサポートされています。

3. <sourceType>

サポートされているデータソース:現在MySQLとPostgresのみ。

4. <source_property>

パラメータデフォルト説明
jdbc_url-JDBC接続文字列(MySQL/PG)
driver_url-JDBCドライバjarパス
driver_class-JDBCドライバクラス名
user-データベースユーザー名
password-データベースパスワード
database-データベース名
schema-スキーマ名
include_tables-同期対象テーブル、カンマ区切り
offsetinitialinitial:フル+増分同期、latest:増分のみ
snapshot_split_size8096各スプリットのサイズ(行数)。フル同期中、テーブルは同期のために複数のスプリットに分割されます。
snapshot_parallelism1フル同期フェーズ中の並列度レベル、つまり単一タスクが一度にスケジュールできるスプリットの最大数。

5. <target_db>

インポート先のDorisターゲットデータベース名。

6. <target_property>

パラメータデフォルト説明
table.create.properties.*-テーブル作成時のテーブルプロパティ、例:replication_num
load.strict_mode-厳密モードを有効にするかどうか。デフォルトでは無効。
load.max_filter_ratio-サンプリングウィンドウ内で許可される最大フィルタリング率。0と1の間(両端含む)である必要があります。デフォルト値は0で、ゼロトレラントを示します。サンプリングウィンドウはmax_interval * 10と等しくなります。このウィンドウ内で、エラー行の総行数に対する比率がmax_filter_ratioを超えると、スケジュールされたジョブは一時停止され、データ品質の問題に対処するための手動介入が必要になります。

オプションパラメータ

1. <job_property>

パラメータデフォルト説明
session.*Nonejob_propertiesでのすべてのセッション変数の設定をサポート
s3.max_batch_files256累積ファイル数がこの値に達するとインポート書き込みをトリガー
s3.max_batch_bytes10G累積データ量がこの値に達するとインポート書き込みをトリガー
max_interval10s上流に新しいファイルやデータがない場合のアイドルスケジューリング間隔

権限制御

このSQLコマンドを実行するユーザーは、最低でも以下の権限を持つ必要があります:

権限オブジェクト備考
LOAD_PRIVDatabase現在、この操作ではLOAD権限のみサポートされています

注意事項

  • TASKは最新の100件のレコードのみ保持します。
  • 現在、Insert_CommandはINSERT internal table Select * From S3(...)のみサポートしています。将来的により多くの操作がサポートされる予定です。

  • my_jobという名前のジョブを作成し、S3の指定されたディレクトリ内のファイルを継続的に監視し、.csvで終わるファイルからdb1.tbl1にデータをインポートします。

    CREATE JOB my_job
    ON STREAMING
    DO
    INSERT INTO db1.`tbl1`
    SELECT * FROM S3
    (
    "uri" = "s3://bucket/s3/demo/*.csv",
    "format" = "csv",
    "column_separator" = ",",
    "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com",
    "s3.region" = "ap-southeast-1",
    "s3.access_key" = "",
    "s3.secret_key" = ""
    );
  • MySQL上流のuser_infoテーブルとorder_infoテーブルを最初からtarget_test_dbデータベースに同期するmulti_table_syncという名前のジョブを作成します。

    CREATE JOB multi_table_sync
    ON STREAMING
    FROM MYSQL (
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
    "driver_url" = "mysql-connector-j-8.0.31.jar",
    "driver_class" = "com.mysql.cj.jdbc.Driver",
    "user" = "root",
    "password" = "123456",
    "database" = "test",
    "include_tables" = "user_info,order_info",
    "offset" = "initial"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )
  • test_postgres_jobという名前のジョブを作成し、PostgreSQLアップストリームのtest_tblsテーブルから増分データを継続的にtarget_test_dbデータベースに同期します。

    CREATE JOB test_postgres_job
    ON STREAMING
    FROM POSTGRES (
    "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
    "driver_url" = "postgresql-42.5.0.jar",
    "driver_class" = "org.postgresql.Driver",
    "user" = "postgres",
    "password" = "postgres",
    "database" = "postgres",
    "schema" = "public",
    "include_tables" = "test_tbls",
    "offset" = "latest"
    )
    TO DATABASE target_test_db (
    "table.create.properties.replication_num" = "1"
    )

CONFIG

fe.conf

ParameterDefaultDescription
max_streaming_job_num1024Streamingジョブの最大数
job_streaming_task_exec_thread_num10StreamingTaskのスレッド数
max_streaming_task_show_count100メモリ内のStreamingTaskレコードの最大数