Job Scheduler
背景
データ管理の精度向上に対する需要が高まる中、スケジュール型のスケジューリングは重要な役割を果たしています。以下のようなシナリオで主に適用されます:
-
定期的なデータ更新。定期的なデータインポートやETL操作など、手動介入を減らし、データ処理の効率性と正確性を向上させます。
-
外部データソースとカタログの同期。多源データを効率的かつ正確にターゲットシステムに統合し、複雑なビジネス分析ニーズを満たします。
-
期限切れ・無効データの定期的なクリーンアップ。ストレージ容量を解放し、過剰な期限切れ・無効データがシステムパフォーマンスに与える影響を防止します。
Apache Doris 2.1より前のバージョンでは、上記の要件を満たすために、ビジネスコードによるスケジューリングやサードパーティスケジューリングツールおよび分散スケジューリングプラットフォームの導入など、外部スケジューリングシステムに依存する必要がありました。しかし、外部システム自体の制約により、Dorisのスケジューリング戦略とリソース管理の柔軟性要件を満たせない場合があります。また、外部スケジューリングシステムに障害が発生すると、ビジネスリスクが増大するだけでなく、対処のための追加の運用時間と人的リソースが必要になります。
Job Scheduler
上記の問題を解決するため、Apache Dorisはバージョン2.1でJob Scheduler機能を導入し、秒レベルの精度に到達するスケジューリング精度で自律的なタスクスケジューリング機能を実現しました。この機能の導入により、データインポートの完全性と一貫性が確保されるだけでなく、ユーザーがスケジューリング戦略を柔軟かつ便利に調整できるようになります。同時に、外部システムへの依存を減らすことで、システム障害のリスクと運用コストを削減し、コミュニティユーザーにより統一され信頼性の高いユーザーエクスペリエンスを提供します。
Doris Job Schedulerは、事前設定されたスケジュールに基づくタスク管理システムであり、特定の時点や指定された時間間隔で事前定義された操作をトリガーし、自動化されたタスク実行を実現します。Job Schedulerには以下の特徴があります:
- 効率的なスケジューリング: Job Schedulerは指定された時間間隔内でタスクとイベントを配置し、データ処理の効率性を確保します。時間輪アルゴリズムを使用して、イベントが秒レベルで正確にトリガーされることを保証します。
- 柔軟なスケジューリング: Job Schedulerは分、時間、日、週間隔でのスケジューリングなど、複数のスケジューリングオプションを提供します。また、一回限りのスケジューリングと繰り返し(定期的)イベントスケジューリングをサポートし、定期的なスケジューリングでは開始時刻と終了時刻を指定できます。
- イベントプールと高性能処理キュー: Job SchedulerはDisruptorを使用して高性能なプロデューサー・コンシューマーモデルを実装し、タスク実行の過負荷を最大限回避します。
- 追跡可能なスケジューリング記録: Job Schedulerは最新のTask実行記録を保存します(設定可能)。Task実行記録は簡単なコマンドで確認でき、プロセスの追跡可能性を確保します。
- 高可用性: Doris自体の高可用性メカニズムを活用し、Job Schedulerは自己回復と高可用性を簡単に実現できます。
関連ドキュメント: CREATE-JOB
構文説明
有効なJob文には以下が含まれている必要があります:
-
キーワード CREATE JOB の後にジョブ名を続ける必要があります。これはデータベース内でイベントを一意に識別します。
-
ON SCHEDULE 句はJobの種類、トリガー時刻、頻度を指定するために使用されます。
-
AT timestamp は一回限りのイベントに使用されます。指定された日時にJOBが一度だけ実行されることを指定し、AT current_timestamp は現在の日時を指定します。JOBが作成されると即座に実行され、非同期タスク作成にも使用できます。
-
EVERY: 定期的なジョブに使用され、ジョブの実行頻度を指定します。キーワードの後に時間間隔(週、日、時間、分)を指定する必要があります。
-
Interval: ジョブ実行の頻度を指定します。1 DAY はジョブが1日に1回実行されることを意味し、1 HOUR は1時間に1回、1 MINUTE は1分に1回、1 WEEK は1週間に1回を意味します。
-
EVERY 句には任意の STARTS 句が含まれます。STARTS の後はタイムスタンプ値で、繰り返しの開始時刻を定義し、CURRENT_TIMESTAMP は現在の日時を指定します。JOBが作成されると即座に実行されます。
-
EVERY 句には任意の ENDS 句が含まれます。ENDS キーワードの後はタイムスタンプ値で、JOBイベントが実行を停止する時刻を定義します。
-
-
-
DO 句はJobがトリガーされたときに実行される操作を指定するために使用されます。現在は INSERT 文のみサポートされています。
CREATE
JOB
job_name
ON SCHEDULE schedule
[COMMENT 'string']
DO execute_sql;
schedule: {
AT timestamp
| EVERY interval
[STARTS timestamp ]
[ENDS timestamp ]
}
interval:
quantity { WEEK |DAY | HOUR | MINUTE}
以下の例:
CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
このステートメントはmy_jobという名前のジョブを作成し、毎分一度実行されます。実行される操作はdb2.tbl2からdb1.tbl1にデータをインポートすることです。
使用例
一回限りのJobを作成する:2025-01-01 00:00:00に一度実行し、db2.tbl2からdb1.tbl1にデータをインポートします。
CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
終了時刻を指定しない定期Jobを作成する:2025-01-01 00:00:00から開始し、1日に1回実行し、db2.tbl2からdb1.tbl1にデータをインポートする。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
指定された終了時間を持つ定期Jobを作成:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートし、2026-01-01 00:10:00に終了する。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
Jobを使用した非同期実行の実装: DorisのJobは同期タスクとして作成されますが、その実行プロセスは非同期です。この機能により、Jobは一般的なINSERT INTO SELECT操作などの非同期タスクの実装に非常に適しています。
例えば、db2.tbl2からdb1.tbl1にデータをインポートする必要がある場合、JOBを1回限りのタスクとして指定し、開始時間を現在時刻に設定するだけです。
CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
CatalogとJob Schedulerに基づくデータ自動同期
eコマースシナリオを例に取ると、ユーザーはMySQLからビジネスデータを抽出し、このデータをDorisに同期してデータ分析を行い、それによって精密なマーケティング活動をサポートする必要がよくあります。Job SchedulerはMulti Catalogデータレイク機能と連携して、データソース間の定期的なデータ同期を効率的に完了することができます。
CREATE TABLE IF NOT EXISTS user.activity (
`user_id` INT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00',
`cost` BIGINT DEFAULT '0',
`max_dwell_time` INT DEFAULT '0',
`min_dwell_time` INT DEFAULT '99999'
);
INSERT INTO user.activity VALUES
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10),
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2),
(10001, '2017-10-01', 'BeiJing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22),
(10002, '2017-10-02', 'ShangHai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5),
(10003, '2017-10-02', 'GuangZhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11),
(10004, '2017-10-01', 'ShenZhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3),
(10004, '2017-10-03', 'ShenZhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
| user_id | date | city | age | sex | last_visit_date | cost | max_dwell_time | min_dwell_time |
|---|---|---|---|---|---|---|---|---|
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 6:00 | 20 | 10 | 10 |
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 7:00 | 15 | 2 | 2 |
| 10001 | 2017/10/1 | BeiJing | 30 | 1 | 2017/10/1 17:05 | 2 | 22 | 22 |
| 10002 | 2017/10/2 | ShangHai | 20 | 1 | 2017/10/2 12:59 | 200 | 5 | 5 |
| 10003 | 2017/10/2 | GuangZhou | 32 | 0 | 2017/10/2 11:20 | 30 | 11 | 11 |
| 10004 | 2017/10/1 | ShenZhen | 35 | 0 | 2017/10/1 10:00 | 100 | 3 | 3 |
| 10004 | 2017/10/3 | ShenZhen | 35 | 0 | 2017/10/3 10:20 | 11 | 6 | 6 |
上記のテーブルを例として、ユーザーは総支出金額、最終訪問時間、性別、都市などの特定の数値条件を満たすユーザーをクエリし、これらの条件を満たすユーザーの情報をDorisにインポートして、後続のターゲット広告に使用したいと考えています。
-
まず、Dorisテーブルを作成します
CREATE TABLE IF NOT EXISTS user_activity
(
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT SUM DEFAULT "0",
`max_dwell_time` INT MAX DEFAULT "0",
`min_dwell_time` INT MIN DEFAULT "99999"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
); -
次に、MySQLデータベースに対応するCatalogを作成します。
CREATE CATALOG activity PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.49.jar",
"driver_class" = "com.mysql.jdbc.Driver"
); -
最後に、MySQLデータをDorisにインポートします。Catalog + Insert Into方式を使用して全データセットをインポートします。フルインポートはシステムサービスの変動を引き起こす可能性があるため、通常はビジネスのオフピーク時間帯にこの操作を実行することが推奨されます。
-
ワンタイムスケジューリング:以下のコードに示すように、ワンタイムタスクを使用してスケジュールされた時刻にフルインポートタスクをトリガーし、トリガーは午前3:00に設定されています。
CREATE JOB one_time_load_job
ON SCHEDULE
AT '2024-8-10 03:00:00'
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity -
定期スケジューリング: ユーザーは定期的に最新データを更新するための定期スケジューリングタスクを作成することもできます。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity where last_visit_date >= days_add(now(),-1)
設計と実装
効率的なスケジューリングは多くの場合、大幅なリソース消費を伴い、高精度なスケジューリングはさらに要求が厳しくなります。従来のアプローチでは、Javaの組み込みスケジューリング機能(定期的にスレッドにアクセスするスケジュールタスク)や、さまざまなスケジューリングユーティリティライブラリを使用することが含まれます。しかし、これらの方法は精度とメモリ使用量の面で大きな問題があります。パフォーマンスをより良く保証しながらリソース消費を削減するために、私たちはTimingWheelアルゴリズムとDisruptorを組み合わせて秒レベルのタスクスケジューリングを実現することを選択しました。
具体的には、NettyのHashedWheelTimerを使用してTiming Wheelアルゴリズムを実装します。Job Managerは定期的に(デフォルトでは10分ごと)、将来のイベントをタイミングホイールに配置してスケジューリングを行います。効率的なタスクトリガーを確保し、過度なリソース使用を避けるために、Disruptorを使用してシングルプロデューサー、マルチコンシューマーモデルを構築します。タイミングホイールはイベントをトリガーするだけで、タスクを直接実行しません。期限切れ時にトリガーする必要があるタスクについては、Dispatchスレッドに配置され、このスレッドが適切な実行スレッドプールへのタスク分散を担当します。即座に実行する必要があるタスクについては、対応するタスク実行スレッドプールに直接配信されます。
1回限りのイベントの場合、タスクがスケジュールされた後にイベント定義が削除されます。定期的なイベントの場合、タイミングホイール内のシステムイベントが次の実行サイクルのタスクを定期的に取得します。これにより、大量のタスクが単一のバケットに集中することを避け、不要なトラバーサルを削減し、処理効率を向上させます。
トランザクショナルタスクについては、Job Schedulerはトランザクションとの強い関連付けとトランザクションコールバック機構により、トランザクショナルタスクの実行結果が期待値と一致することを保証し、データの整合性と一貫性を確保できます。
将来の計画
Doris Job Schedulerは強力で柔軟なタスクスケジューリングツールであり、データ処理における必須機能です。データレイク分析や内部ETLなどの一般的な使用例に加えて、Job Schedulerは非同期マテリアライズドビューの実装においても重要な役割を果たします。非同期マテリアライズドビューは事前計算され保存された結果セットであり、データ更新の頻度はソーステーブルの変更と密接に関連しています。ソーステーブルのデータが頻繁に更新される場合、マテリアライズドビューのデータを最新に保つために定期的な更新が必要です。バージョン2.1では、JOBスケジューリング機能を巧妙に利用してマテリアライズドビューとソーステーブルデータ間の一貫性を確保し、手動介入のコストを大幅に削減しました。
将来、Doris Job Schedulerは以下の機能もサポート予定です:
- UI経由で異なる時間帯に実行されたタスクの分布を表示する機能のサポート。
- JOBワークフローオーケストレーション、すなわちDAG JOBのサポート。これにより、内部データウェアハウスタスクオーケストレーションを実装でき、Catalog機能と合わせて、より効率的にデータ処理と分析タスクを完了できます。
- インポートタスク、UPDATE、DELETE操作のスケジューリングのサポート。