メインコンテンツまでスキップ

Job Scheduler

背景

データ管理の高度化に対する需要が増加する中、スケジュール調整は重要な役割を果たしています。これは通常、以下のシナリオで適用されます:

  • 定期的なデータ更新。定期的なデータインポートやETL操作など、手動介入を削減し、データ処理の効率性と精度を向上させます。

  • 外部データソースとカタログの同期により、マルチソースデータの効率的で正確な統合を対象システムに実現し、複雑なビジネス分析ニーズを満たします。

  • 期限切れ/無効なデータの定期的なクリーンアップにより、ストレージ領域を解放し、過剰な期限切れ/無効なデータがシステムパフォーマンスに与える影響を防止します。

Apache Doris 2.1より前のバージョンでは、通常、外部スケジューリングシステムに依存する必要がありました。ビジネスコードによるスケジューリングやサードパーティスケジューリングツール、分散スケジューリングプラットフォームの導入により、上記の要件を満たしていました。しかし、外部システム自体の制限により、Dorisのスケジューリング戦略とリソース管理の柔軟性に対する要件を満たせない場合があります。さらに、外部スケジューリングシステムが障害を起こした場合、ビジネスリスクが増加するだけでなく、対処するための追加の運用時間と人材が必要になります。

Job Scheduler

上記の問題を解決するため、Apache Dorisはバージョン2.1でJob Scheduler機能を導入し、秒レベルまでの精度でスケジューリングする自律的なタスクスケジューリング機能を実現しました。この機能の導入により、データインポートの整合性と一貫性が保証されるだけでなく、ユーザーは柔軟で便利にスケジューリング戦略を調整できるようになります。同時に、外部システムへの依存を減らすことで、システム障害のリスクと運用コストも削減され、コミュニティユーザーにより統一された信頼性の高いユーザーエクスペリエンスを提供します。

Doris Job Schedulerは、事前設定されたスケジュールに基づくタスク管理システムで、特定の時点または指定された時間間隔で事前定義された操作をトリガーし、自動化されたタスク実行を実現できます。Job Schedulerには以下の機能があります:

  • 効率的なスケジューリング:Job Schedulerは指定された時間間隔内でタスクとイベントを配置し、データ処理の効率性を保証します。タイムホイールアルゴリズムを使用して、イベントが秒レベルまで正確にトリガーできることを保証します。
  • 柔軟なスケジューリング:Job Schedulerは、分、時間、日、週間隔でのスケジューリングなど、複数のスケジューリングオプションを提供します。また、一回限りのスケジューリングと繰り返し(周期的)イベントスケジューリングをサポートし、周期的スケジューリングでは開始時刻と終了時刻を指定できます。
  • イベントプールと高性能処理キュー:Job SchedulerはDisruptorを使用して高性能なプロデューサー・コンシューマーモデルを実装し、タスク実行の過負荷を最大限回避します。
  • 追跡可能なスケジューリングレコード:Job Schedulerは最新のTask実行レコードを保存します(設定可能)。Task実行レコードは簡単なコマンドで確認でき、プロセスの追跡可能性を保証します。
  • 高可用性:Doris自体の高可用性メカニズムを活用することで、Job Schedulerは簡単に自己回復と高可用性を実現できます。

関連ドキュメント: CREATE-JOB

構文の説明

有効なJob文には以下を含める必要があります:

  • キーワードCREATE JOBの後にジョブ名を続ける必要があり、これによりデータベース内でイベントを一意に識別します。

  • ON SCHEDULE句は、Jobのタイプ、トリガー時刻、頻度を指定するために使用されます。

    • AT timestampは一回限りのイベントに使用されます。JOBが指定された日時に一度だけ実行されることを指定し、AT current_timestampは現在の日時を指定します。JOBが作成されると、すぐに実行され、非同期タスク作成にも使用できます。

    • EVERY:周期的なジョブに使用され、ジョブの実行頻度を指定します。キーワードの後に時間間隔(週、日、時間、分)を指定する必要があります。

      • Interval:ジョブ実行の頻度を指定します。1 DAYはジョブが1日に1回実行されることを意味し、1 HOURは1時間に1回、1 MINUTEは1分に1回、1 WEEKは1週間に1回を意味します。

      • EVERY句にはオプションのSTARTS句が含まれます。STARTSの後にはタイムスタンプ値があり、反復の開始時刻を定義し、CURRENT_TIMESTAMPは現在の日時を指定します。JOBが作成されると、すぐに実行されます。

      • EVERY句にはオプションのENDS句が含まれます。ENDSキーワードの後にはタイムスタンプ値があり、JOBイベントが実行を停止する時刻を定義します。

  • DO句は、Jobがトリガーされた時に実行される操作を指定するために使用されます。現在、INSERT文のみがサポートされています。

    CREATE
    JOB
    job_name
    ON SCHEDULE schedule
    [COMMENT 'string']
    DO execute_sql;

    schedule: {
    AT timestamp
    | EVERY interval
    [STARTS timestamp ]
    [ENDS timestamp ]
    }
    interval:
    quantity { WEEK |DAY | HOUR | MINUTE}

以下の例:

CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

このステートメントは、毎分一度実行されるmy_jobという名前のジョブを作成します。実行される操作は、db2.tbl2からdb1.tbl1にデータをインポートすることです。

使用例

ワンタイムJobの作成: 2025-01-01 00:00:00に一度実行し、db2.tbl2からdb1.tbl1にデータをインポートします。

CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

指定された終了時間なしで定期的なJobを作成する:2025-01-01 00:00:00から開始し、1日に1回実行し、db2.tbl2からdb1.tbl1にデータをインポートする。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE  create_time >=  days_add(now(),-1);

指定された終了時刻を持つ定期的なJobを作成:2025-01-01 00:00:00から開始し、1日に1回実行して、db2.tbl2からdb1.tbl1にデータをインポートし、2026-01-01 00:10:00に終了する。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE create_time >=  days_add(now(),-1);

Jobを使用した非同期実行の実装: DorisのJobは同期タスクとして作成されますが、その実行プロセスは非同期です。この機能により、Jobは一般的なINSERT INTO SELECT操作などの非同期タスクの実装に非常に適しています。

例えば、db2.tbl2からdb1.tbl1にデータをインポートする必要がある場合、JOBを一回限りのタスクとして指定し、開始時刻を現在時刻に設定するだけです。

CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;

Catalog と Job Scheduler に基づくデータ自動同期

eコマースシナリオを例に、ユーザーは MySQL からビジネスデータを抽出し、このデータを Doris に同期してデータ分析を行い、精密なマーケティング活動をサポートする必要があることがよくあります。Job Scheduler は Multi Catalog データレイク機能と連携して、データソース間の定期的なデータ同期を効率的に完了することができます。

CREATE TABLE IF NOT EXISTS user.activity (
`user_id` INT NOT NULL,
`date` DATE NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT '1970-01-01 00:00:00',
`cost` BIGINT DEFAULT '0',
`max_dwell_time` INT DEFAULT '0',
`min_dwell_time` INT DEFAULT '99999'
);
INSERT INTO user.activity VALUES
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 06:00:00', 20, 10, 10),
(10000, '2017-10-01', 'BeiJing', 20, 0, '2017-10-01 07:00:00', 15, 2, 2),
(10001, '2017-10-01', 'BeiJing', 30, 1, '2017-10-01 17:05:00', 2, 22, 22),
(10002, '2017-10-02', 'ShangHai', 20, 1, '2017-10-02 12:59:00', 200, 5, 5),
(10003, '2017-10-02', 'GuangZhou', 32, 0, '2017-10-02 11:20:00', 30, 11, 11),
(10004, '2017-10-01', 'ShenZhen', 35, 0, '2017-10-01 10:00:00', 100, 3, 3),
(10004, '2017-10-03', 'ShenZhen', 35, 0, '2017-10-03 10:20:00', 11, 6, 6);
user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1BeiJing2002017/10/1 6:00201010
100002017/10/1BeiJing2002017/10/1 7:001522
100012017/10/1BeiJing3012017/10/1 17:0522222
100022017/10/2ShangHai2012017/10/2 12:5920055
100032017/10/2GuangZhou3202017/10/2 11:20301111
100042017/10/1ShenZhen3502017/10/1 10:0010033
100042017/10/3ShenZhen3502017/10/3 10:201166

上記のテーブルを例として、ユーザーは総消費金額、最終訪問時刻、性別、都市などの特定の数値条件を満たすユーザーをクエリし、これらの条件を満たすユーザーの情報をDorisにインポートして、その後のターゲティング広告に使用したいと考えています。

  1. まず、Dorisテーブルを作成します

    CREATE TABLE IF NOT EXISTS user_activity
    (
    `user_id` LARGEINT NOT NULL,
    `date` DATE NOT NULL,
    `city` VARCHAR(20),
    `age` SMALLINT,
    `sex` TINYINT,
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
    `cost` BIGINT SUM DEFAULT "0",
    `max_dwell_time` INT MAX DEFAULT "0",
    `min_dwell_time` INT MIN DEFAULT "99999"
    )
    AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
    );
  2. 次に、MySQLデータベースに対応するCatalogを作成します。

    CREATE CATALOG activity PROPERTIES (
    "type"="jdbc",
    "user"="root",
    "password"="123456",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:3306/user?useSSL=false",
    "driver_url" = "mysql-connector-java-5.1.49.jar",
    "driver_class" = "com.mysql.jdbc.Driver"
    );
  3. 最後に、MySQLデータをDorisにインポートします。Catalog + Insert Into方式を使用して、全データセットをインポートします。フルインポートはシステムサービスの変動を引き起こす可能性があるため、通常はビジネスオフピーク時間帯にこの操作を実行することが推奨されます。

  • ワンタイムスケジューリング:以下のコードに示すように、ワンタイムタスクを使用して予定時刻にフルインポートタスクをトリガーし、トリガーを午前3:00に設定します。

    CREATE JOB one_time_load_job
    ON SCHEDULE
    AT '2024-8-10 03:00:00'
    DO
    INSERT INTO user_activity SELECT * FROM activity.user.activity
  • 定期スケジューリング:ユーザーは最新データを定期的に更新するための定期スケジューリングタスクも作成できます。

    CREATE JOB schedule_load
    ON SCHEDULE EVERY 1 DAY
    DO
    INSERT INTO user_activity SELECT * FROM activity.user.activity where last_visit_date >= days_add(now(),-1)

設計と実装

効率的なスケジューリングは多くの場合、大幅なリソース消費を伴い、高精度スケジューリングはさらに要求が厳しくなります。従来のアプローチでは、Javaの組み込みスケジューリング機能(定期的にスレッドにアクセスするスケジュールタスク)を使用するか、さまざまなスケジューリングユーティリティライブラリを使用することが含まれます。しかし、これらの方法には精度とメモリ使用量の面で重大な問題があります。パフォーマンスをより適切に保証しながらリソース消費を削減するため、TimingWheelアルゴリズムとDisruptorを組み合わせて秒レベルのタスクスケジューリングを実現することを選択しました。

具体的には、NettyのHashedWheelTimerを使用してTiming Wheelアルゴリズムを実装します。Job Managerは定期的に(デフォルトで10分ごと)、将来のイベントをタイミングホイールに配置してスケジューリングします。効率的なタスクトリガーを確保し、過度なリソース使用を避けるため、Disruptorを使用して単一プロデューサー、複数コンシューマーモデルを構築します。タイミングホイールはイベントをトリガーするのみで、タスクを直接実行しません。期限切れ時にトリガーする必要があるタスクの場合、Dispatchスレッドに配置され、適切な実行スレッドプールにタスクを配布する責任を負います。すぐに実行する必要があるタスクの場合、対応するタスク実行スレッドプールに直接配信されます。

1回限りのイベントの場合、タスクがスケジュールされた後にイベント定義が削除されます。定期的なイベントの場合、タイミングホイール内のシステムイベントが定期的に次の実行サイクルのタスクを取得します。これにより、大量のタスクが単一のバケットに集中することを回避し、不要な走査を削減してプロセッシング効率を向上させます。

トランザクショナルタスクについては、Job Schedulerはトランザクションとの強い関連付けとトランザクションコールバックメカニズムにより、トランザクショナルタスクの実行結果が期待と一致することを保証でき、データの整合性と一貫性を確保します。

将来の計画

Doris Job Schedulerは、データ処理において不可欠な機能である、強力で柔軟なタスクスケジューリングツールです。データレイク分析や内部ETLなどの一般的な使用例に加えて、Job Schedulerは非同期マテリアライズドビューの実装においても重要な役割を果たします。非同期マテリアライズドビューは事前に計算され保存された結果セットで、データ更新の頻度はソーステーブルの変更と密接に関連しています。ソーステーブルデータが頻繁に更新される場合、マテリアライズドビューのデータを最新に保つために定期的な更新が必要です。バージョン2.1では、JOBスケジューリング機能を巧妙に活用してマテリアライズドビューとソーステーブルデータ間の一貫性を保証し、手動介入のコストを大幅に削減しました。

将来的に、Doris Job Schedulerは以下の機能もサポート予定です:

  • UIを通じて異なる時間帯に実行されたタスクの配布を表示するサポート
  • JOBワークフローオーケストレーション、つまりDAG JOBのサポート。これにより、内部データウェアハウスタスクオーケストレーションを実装でき、Catalog機能と合わせて、データ処理と分析タスクをより効率的に完了させることができます
  • インポートタスク、UPDATE、DELETE操作のスケジューリングサポート