Routine Load内部構造とベストプラクティス
1. 概要
Routine Loadは、Kafkaデータを継続的に取得してApache Dorisに書き込むために設計されています。ユーザーはRoutine Load Jobを作成することで、指定されたKafka Topicを自動的に購読できます。その主要機能には以下があります:
-
高可用性: 24時間365日中断のないKafkaデータ取得をサポートし、障害後の自動復旧に対応。
-
低遅延: Kafkaメッセージは秒レベルの可視性を実現可能。
-
Exactly-Onceセマンティクス: Kafkaデータの取得において損失や重複を防ぎ、exactly-once処理を実現。
本ドキュメントでは、実装原理の詳細分析、典型的なシナリオでのベストプラクティス、よくある問題のトラブルシューティングアプローチを提供し、ユーザーが迅速に開始し、効率的に運用できるよう支援します。
2. 実装原理
Kafkaデータはストリーミング形式で存在し、DorisはKafkaストリーミングデータを「マイクロバッチ」方式で取得します。routine load jobの作成後、システムは設定された同時実行レベルに基づいてジョブを複数のタスクに分割し、並行実行を行います。各タスクは、Kafkaトピック内の特定のパーティションからのデータ取得を担当します。各タスクは1つのトランザクションに対応し、完了後、新しいタスクが生成されて次のバッチのデータ取得を継続します。以下のセクションでは、job/taskスケジューリング、Exactly-Onceセマンティクスの実装、単一ストリームからの複数テーブル書き込みの3つの観点から説明します。
2.1 JobとTaskのスケジューリング
Routine Loadは2レベルのスケジューリングアプローチを採用しています:
-
Jobスケジューリング: タスク分割、障害復旧、ライフサイクル管理を担当。
-
Taskスケジューリング: 特定のデータ取得、変換、書き込み操作をBEノードに配布して実行することを担当。
2.1.1 Jobスケジューリング
Jobのステートマシン:
| State | 詳細 |
|---|---|
| NEED_SCHEDULE | 初回スケジューリング待機またはリスケジュール要求 |
| RUNNING | 通常の取得処理中 |
| PAUSED | 能動的または異常により一時停止、自動復旧可能 |
| CANCELLED | データベース/テーブル削除などの回復不能エラーにより終了 |
| STOPPED | 手動停止され回復不能 |
異なるjob状態に基づいて、スケジューリングスレッドは各サイクル(10秒)ごとに以下のアクションを実行します:
- NEED_SCHEDULE: トピックメタデータ(パーティション数、開始オフセット)を取得し、以下に従ってタスクを分割:
taskNum = min(topic_partition_num,
desired_concurrent_number,
max_routine_load_task_concurrent_num)
タスクをneedScheduleTasksQueueに配置し、タスクスケジューリングスレッドがスケジューリングを開始するのを待機します。
-
RUNNING: 定期的にトピックメタデータを取得し、パーティション数が変更された場合は即座に再スケジューリングを行います。
-
PAUSED: ジョブの高可用性を確保するため、自動復旧メカニズムが導入されています。予期しない一時停止の場合、Routine Load Schedulerスレッドは自動的にジョブの復旧を試行します。予期しないKafka側の障害やその他の非機能的な状況に対し、自動復旧メカニズムにより、Kafkaの復旧後、手動介入なしでインポートジョブが正常動作を継続できることを保証します。以下の3つの状況では自動復旧は行われないことに注意してください:
- ユーザーが手動でPAUSE ROUTINE LOADコマンドを実行した場合。
- データ品質の問題が存在する場合。
- データベース/テーブルの削除など、復旧不可能な状況。
上記の3つの状況を除き、その他の一時停止されたジョブはすべて自動復旧を試行します。
-
CANCELLED / STOPPED: 遅延リソースクリーンアップ。
2.1.2 タスクスケジューリング
スケジューリング条件
-
タスクがパーティションの終端に到達していない、つまりまだ消費すべきデータが存在し、無効なリソース占有を避けるため。
-
前回の実行でEOFに到達した場合、前回の実行開始から
max_batch_interval以上経過した場合のみ新しいラウンドのスケジューリングが開始されます。これは、消費速度が生産速度を上回る場合にデータを適切にバッチ化し、過度に小さなトランザクションの生成を防ぐことを目的としています。
負荷分散戦略
-
現在実行中のTaskが最も少ないBEノードを優先的に選択します。
-
複数のBEが同じTask数を持つ場合、初期化オーバーヘッドを削減するため、Kafka Consumerをキャッシュしているノードの再利用を優先します。
バッチ境界
現在のタスクは、以下のいずれかの条件が満たされた時に終了します:
-
max_batch_intervalで定義された時間制限に到達。 -
max_batch_rowsで定義された行数に到達。 -
max_batch_sizeで定義されたバイトサイズに到達。 -
Kafka EOFを読み取り、つまりストリームの終端まで消費。
タスク完了後、トランザクションがコミットされ、新しいタスクが即座に生成されて次のスケジューリングサイクル用のキューに配置され、継続的な消費を可能にします。
2.2 Exactly-Once セマンティクス
Routine Loadは「永続的な消費進捗」+「コミット検証」のデュアルメカニズムにより、Kafkaデータの紛失や重複を防止します。
2.2.1 永続的な消費進捗
各タスクはトランザクションコミット時に消費進捗とトランザクション情報をFEのedit logに書き込み、Berkeley DB JEを利用してすべてのFE Followerに同期します。進捗情報はMaster切り替え/再起動後も正確性を保持します。
2.2.2 コミット検証
手動一時停止、master切り替え、またはトピックメタデータ変更によりJobが再スケジューリングされた場合、2つのタスクが同じパーティションを同時に消費する短期間のシナリオが発生する可能性があります。重複書き込みを防ぐため:
-
各Jobはメモリ内で
routineLoadTaskInfoListを維持します。 -
コミット前に、タスクは自身がまだ
routineLoadTaskInfoList内に存在するかを検証し、そうでなければコミットが拒否されます。
2.3 単一ストリームからのマルチテーブル書き込み
マルチテーブル書き込みにより、単一のRoutine Load Jobが複数のターゲットテーブルに同時書き込みを行うことができます。コアプロセスは以下の通りです:
-
計画フェーズ:Job作成時にターゲットテーブルを完全に決定できないため、実行計画は実行時まで遅延され、BEがFE Masterから動的に取得します。
-
データキャッシュ:BEは最初にデータをローカルのマルチテーブルパイプにキャッシュします。200レコードがキャッシュされるか、まだ実行計画を要求していない5つの新しいテーブルがある場合、データのバックログを防ぐため実行計画要求が開始され実行されます。
-
実行計画の再利用:同一トランザクション内では、キャッシュされた実行計画が再利用されます;トランザクション間では、メタデータの適時性を確保するため新しい要求が行われます。
3. ベストプラクティス
Routine Loadのデフォルトパラメータはほとんどのシナリオを満たします。以下の3つの状況では手動チューニングが必要です:
| シナリオ | 推奨パラメータ変更 |
|---|---|
| 低レイテンシ要件 | デフォルト60sからmax_batch_intervalを削減 |
| 小データ量、リソース重視 | desired_concurrent_numberを削減 |
| 高スループット | デフォルト60sからmax_batch_intervalを120-180sに増加 |
4. 一般的な問題のトラブルシューティング
4.1 データのバックログ
SHOW ROUTINE LOAD\Gを通じてタスクステータスを確認:
-
Stateが
RUNNINGかどうか;他のステータスの場合は、ReasonOfStateChangedフィールドで理由を確認。 -
OtherMsgにエラー情報が含まれているかどうか。
-
BEログを使用してスループット制限に到達したかどうかを判断
consumer group doneログを検索し、left_time / left_rows / left_bytesが最初にトリガーされた閾値を示すため、max_batch_sizeまたはmax_batch_rowsを対象を絞って増加させることができます:consumer group done: 894fc32d5b9d3e93-7387a02da6dafd88. consume time(ms)=34004, received rows=2679540, received bytes=2147484043, eos: 0, left_time: 25996, left_rows: 17320460, left_bytes: -395, blocking get time(us): 949236, blocking put time(us): 28730419, id=69616a41fc064f1e-a93ff0ddd217f0a0, job_id=48121487, txn_id=61763720, label=ods_hq_market_unique_jobs_0-48121487-69616a41fc064f1e-a93ff0ddd217f0a0-61763720, elapse(s)=34
上記の例では、left_bytes: -395 は、34秒以内に max_batch_size 制限に達したためバッチが終了したことを示しています。この場合、max_batch_size を適切に増加させて、単一バッチが max_batch_interval 内でフル容量に達できるようにし、スループットを向上させることができます。
- 並行性とスループットの向上
-
desired_concurrent_numberをTopic Partitionの数に合わせて増加させます。 -
max_batch_interval(例:120s〜180s)/max_batch_size/max_batch_rowsを適度に増加させて、単一トランザクションのデータ量を向上させ、単一バッチのデータ量を増加させ、トランザクションオーバーヘッドを削減します。
4.2 タスクの異常停止
Routine Loadには自動復旧メカニズムが組み込まれており、ほとんどの予期しない停止は再試行されます。タスクがPAUSED状態のままで自動復旧できない場合は、SHOW ROUTINE LOAD を実行してトラブルシューティングを行います:
-
PAUSE ROUTINE LOADが手動で実行されたかどうか。 -
データ品質の問題(フォーマットエラー、フィールドの欠落など)が存在するかどうか。
-
Kafkaデータが
out of rangeエラーで期限切れになっているかどうか。