Job Scheduler
背景
精密なデータ管理への要求が高まる中、スケジュールされた調度は重要な役割を果たしています。一般的に以下のシナリオで適用されます:
-
定期的なデータインポートやETL操作などの定期的なデータ更新により、手動介入を削減し、データ処理の効率性と正確性を向上させる。
-
外部データソースとカタログを同期し、マルチソースデータの効率的で正確な統合をターゲットシステムに実現し、複雑なビジネス分析ニーズを満たす。
-
期限切れ/無効なデータを定期的にクリーンアップして、ストレージ容量を解放し、過剰な期限切れ/無効なデータがシステムパフォーマンスに影響を与えることを防ぐ。
Apache Doris 2.1以前のバージョンでは、上記の要件を満たすために、ビジネスコードでのスケジューリングやサードパーティのスケジューリングツールや分散スケジューリングプラットフォームの導入など、外部スケジューリングシステムに依存することが一般的に必要でした。しかし、外部システム自体の制限により、Dorisのスケジューリング戦略とリソース管理の柔軟性の要件を満たせない場合があります。さらに、外部スケジューリングシステムが故障した場合、ビジネスリスクが増加するだけでなく、対処するために追加の運用時間と人員が必要となります。
Job Scheduler
上記の問題を解決するため、Apache Dorisはバージョン2.1でJob Scheduler機能を導入し、秒レベルに達するスケジューリング精度で自律的なタスクスケジューリング機能を実現しました。この機能の導入により、データインポートの整合性と一貫性を保証するだけでなく、ユーザーがスケジューリング戦略を柔軟かつ便利に調整できるようになります。同時に、外部システムへの依存を減らすことで、システム故障のリスクと運用コストも削減し、コミュニティユーザーにより統一された信頼性の高いユーザー体験を提供します。
Doris Job Schedulerは、事前設定されたスケジュールに基づくタスク管理システムで、特定の時点や指定された時間間隔で事前定義された操作をトリガーし、自動化されたタスク実行を実現できます。Job Schedulerは以下の機能を持ちます:
- 効率的なスケジューリング:Job Schedulerは指定された時間間隔内でタスクとイベントを配置し、データ処理の効率性を確保できます。タイムホイールアルゴリズムを使用して、秒レベルでイベントを正確にトリガーできることを保証します。
- 柔軟なスケジューリング:Job Schedulerは分、時間、日、または週の間隔でのスケジューリングなど、複数のスケジューリングオプションを提供します。また、一回限りのスケジューリングや定期的(周期的)なイベントスケジューリングをサポートし、定期スケジューリングでは開始時刻と終了時刻を指定できます。
- イベントプールと高性能処理キュー:Job SchedulerはDisruptorを使用して高性能プロデューサー・コンシューマーモデルを実装し、タスク実行の過負荷を最大限回避します。
- 追跡可能なスケジューリング記録:Job Schedulerは最新のTask実行記録を保存します(設定可能)。Task実行記録は簡単なコマンドで確認でき、プロセスの追跡可能性を確保します。
- 高可用性:Doris自体の高可用性メカニズムを活用し、Job Schedulerは自己回復と高可用性を簡単に実現できます。
関連ドキュメント: CREATE-JOB
構文説明
有効なJob文には以下を含める必要があります:
-
キーワードCREATE JOBの後にジョブ名を続け、これによりデータベース内のイベントを一意に識別します。
-
ON SCHEDULE句は、Jobのタイプ、トリガー時刻、および頻度を指定するために使用されます。
-
AT timestampは一回限りのイベントに使用されます。JOBが指定された日時に一度だけ実行されることを指定し、AT current_timestampは現在の日時を指定します。JOBが作成されると、すぐに実行され、非同期タスクの作成にも使用できます。
-
EVERY:定期的なジョブに使用し、ジョブの実行頻度を指定します。キーワードの後に時間間隔(週、日、時間、分)を指定する必要があります。
-
Interval:ジョブ実行の頻度を指定します。1 DAYはジョブが1日に1回実行されることを意味し、1 HOURは1時間に1回、1 MINUTEは1分に1回、1 WEEKは1週間に1回を意味します。
-
EVERY句には、オプションのSTARTS句が含まれます。STARTSの後はtimestamp値で、繰り返しの開始時刻を定義し、CURRENT_TIMESTAMPは現在の日時を指定します。JOBが作成されると、すぐに実行されます。
-
EVERY句には、オプションのENDS句が含まれます。ENDSキーワードの後はtimestamp値で、JOBイベントが実行を停止する時刻を定義します。
-
-
-
DO句は、Jobがトリガーされたときに実行される操作を指定するために使用されます。現在、INSERT文のみがサポートされています。
CREATE
JOB
job_name
ON SCHEDULE schedule
[COMMENT 'string']
DO execute_sql;
schedule: {
AT timestamp
| EVERY interval
[STARTS timestamp ]
[ENDS timestamp ]
}
interval:
quantity { WEEK |DAY | HOUR | MINUTE}
以下の例:
CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
この文は、毎分1回実行されるmy_jobという名前のジョブを作成します。実行される操作は、db2.tbl2からdb1.tbl1にデータをインポートすることです。
使用例
ワンタイムJobの作成:2025-01-01 00:00:00に1回実行し、db2.tbl2からdb1.tbl1にデータをインポートします。
CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
終了時刻を指定しない定期Jobを作成:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートする。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
指定された終了時刻を持つ定期Jobの作成:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1へデータをインポートし、2026-01-01 00:10:00で終了する。
CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >= days_add(now(),-1);
Jobによる非同期実行の実装:DorisのJobは同期タスクとして作成されますが、その実行プロセスは非同期です。この機能により、Jobは一般的なINSERT INTO SELECT操作などの非同期タスクの実装に非常に適しています。
例えば、db2.tbl2からdb1.tbl1にデータをインポートする必要がある場合、JOBを1回限りのタスクとして指定し、開始時刻を現在時刻に設定するだけです。
CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;
Catalogとジョブスケジューラーに基づくデータ自動同期
Eコマースシナリオを例にすると、ユーザーはしばしばMySQLからビジネスデータを抽出し、このデータをDorisに同期してデータ分析を行い、精密なマーケティング活動をサポートする必要があります。ジョブスケジューラーはMulti Catalogデータレイク機能と連携して、データソース間の定期的なデータ同期を効率的に完了することができます。
CREATE TABLE IF NOT EXISTS user.activity (
`user_id` INT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00',
`cost` BIGINT DEFAULT '0',
`max_dwell_time` INT DEFAULT '0',
`min_dwell_time` INT DEFAULT '99999'
);
INSERT INTO user.activity VALUES
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10),
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2),
(10001, '2017-10-01', 'BeiJing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22),
(10002, '2017-10-02', 'ShangHai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5),
(10003, '2017-10-02', 'GuangZhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11),
(10004, '2017-10-01', 'ShenZhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3),
(10004, '2017-10-03', 'ShenZhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
| user_id | date | city | age | sex | last_visit_date | cost | max_dwell_time | min_dwell_time |
|---|---|---|---|---|---|---|---|---|
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 6:00 | 20 | 10 | 10 |
| 10000 | 2017/10/1 | BeiJing | 20 | 0 | 2017/10/1 7:00 | 15 | 2 | 2 |
| 10001 | 2017/10/1 | BeiJing | 30 | 1 | 2017/10/1 17:05 | 2 | 22 | 22 |
| 10002 | 2017/10/2 | ShangHai | 20 | 1 | 2017/10/2 12:59 | 200 | 5 | 5 |
| 10003 | 2017/10/2 | GuangZhou | 32 | 0 | 2017/10/2 11:20 | 30 | 11 | 11 |
| 10004 | 2017/10/1 | ShenZhen | 35 | 0 | 2017/10/1 10:00 | 100 | 3 | 3 |
| 10004 | 2017/10/3 | ShenZhen | 35 | 0 | 2017/10/3 10:20 | 11 | 6 | 6 |
上記の表を例として、ユーザーは総支出金額、最終訪問時間、性別、都市などの特定の数値条件を満たすユーザーをクエリし、これらの条件を満たすユーザーの情報をDorisにインポートして、後続のターゲット型プロモーションに使用したいと考えています。
-
まず、Dorisテーブルを作成します
CREATE TABLE IF NOT EXISTS user_activity
(
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT SUM DEFAULT "0",
`max_dwell_time` INT MAX DEFAULT "0",
`min_dwell_time` INT MIN DEFAULT "99999"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
); -
次に、MySQLデータベースに対応するCatalogを作成します。
CREATE CATALOG activity PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false",
"driver_url" = "mysql-connector-java-5.1.49.jar",
"driver_class" = "com.mysql.jdbc.Driver"
); -
最後に、MySQLデータをDorisにインポートします。Catalog + Insert Intoメソッドを使用して完全なデータセットをインポートします。完全インポートはシステムサービスの変動を引き起こす可能性があるため、通常はビジネスのオフピーク時間にこの操作を実行することが推奨されます。
-
一回限りのスケジューリング: 以下のコードに示すように、一回限りのタスクを使用して予定された時間に完全インポートタスクをトリガーし、トリガーを午前3:00に設定します。
CREATE JOB one_time_load_job
ON SCHEDULE
AT '2024-8-10 03:00:00'
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity -
定期スケジューリング: ユーザーは定期的に最新データを更新するための定期スケジューリングタスクを作成することもできます。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DO
INSERT INTO user_activity SELECT * FROM activity.user.activity where last_visit_date >= days_add(now(),-1)
設計と実装
効率的なスケジューリングには大きなリソース消費が伴うことが多く、高精度スケジューリングはさらに多くのリソースを要求します。従来のアプローチでは、Javaの組み込みスケジューリング機能(定期的にスレッドにアクセスするスケジュールタスク)や、さまざまなスケジューリングユーティリティライブラリを使用していました。しかし、これらの方法は精度とメモリ使用量の面で重大な問題があります。パフォーマンスをより適切に保証しながらリソース消費を削減するため、TimingWheelアルゴリズムとDisruptorを組み合わせて秒レベルのタスクスケジューリングを実現することを選択しました。
具体的には、NettyのHashedWheelTimerを使用してTiming Wheelアルゴリズムを実装しています。Job Managerは定期的に(デフォルトでは10分ごと)、将来のイベントをスケジューリング用のタイミングホイールに配置します。効率的なタスクトリガーを確保し、過度なリソース使用を回避するため、Disruptorを使用して単一プロデューサー、複数コンシューマーモデルを構築しています。タイミングホイールはイベントをトリガーするのみで、タスクを直接実行しません。期限切れ時にトリガーする必要があるタスクについては、Dispatchスレッドに配置され、適切な実行スレッドプールにタスクを分散する責任を持ちます。即座に実行する必要があるタスクについては、対応するタスク実行スレッドプールに直接送信されます。
一回限りのイベントについては、タスクがスケジュールされた後にイベント定義が削除されます。定期的なイベントについては、タイミングホイール内のシステムイベントが次の実行サイクルのタスクを定期的に取得します。これにより、大量のタスクが単一のバケットに集中することを回避し、不要なトラバーサルを削減し、処理効率を向上させます。
トランザクショナルタスクについては、Job Schedulerはトランザクションとの強い関連付けとトランザクションコールバックメカニズムを通じて、トランザクショナルタスクの実行結果が期待に一致することを保証し、データの整合性と一貫性を確保できます。
今後の計画
Doris Job Schedulerは、データ処理において不可欠な機能である、強力で柔軟なタスクスケジューリングツールです。データレイク分析や内部ETLなどの一般的な使用例に加えて、Job Schedulerは非同期マテリアライズドビューの実装においても重要な役割を果たします。非同期マテリアライズドビューは事前計算されて保存された結果セットであり、データ更新の頻度はソーステーブルの変更と密接に関連しています。ソーステーブルのデータが頻繁に更新される場合、マテリアライズドビューのデータを最新に保つために定期的な更新が必要です。バージョン2.1では、JOBスケジューリング機能を巧妙に活用して、マテリアライズドビューとソーステーブルデータ間の一貫性を確保し、手動介入のコストを大幅に削減しました。
将来、Doris Job Schedulerは以下の機能もサポートする予定です:
- UIを通じて異なる時間帯に実行されたタスクの分散を表示する機能のサポート。
- JOBワークフローオーケストレーション、すなわちDAG JOBのサポート。これにより、内部データウェアハウスタスクのオーケストレーションを実装でき、Catalog機能と組み合わせることで、データ処理と分析タスクをより効率的に完了できます。
- インポートタスク、UPDATE、DELETE操作のスケジューリングのサポート。