負荷内部構造とパフォーマンス最適化
概要
Apache Dorisは、MPP(Massively Parallel Processing)アーキテクチャを採用した高性能分散分析データベースであり、リアルタイムデータ分析、データウェアハウス、ストリーム計算のシナリオで広く使用されています。データロードはDorisの中核機能であり、データ分析のリアルタイム性と正確性に直接影響します。効率的なロード機構により、大規模データが迅速かつ確実にシステムに投入され、後続のクエリをサポートできます。本記事では、Dorisデータロードの内部構造を分析し、主要なプロセス、コンポーネント、トランザクション管理などをカバーし、ロードパフォーマンスに影響する要因を探求し、実用的な最適化方法とベストプラクティスを提供して、ユーザーが適切なロード戦略を選択し、ロードパフォーマンスを最適化できるよう支援します。
データロードの内部構造
ロード内部構造の概要
Dorisのデータロード内部構造は、その分散アーキテクチャ上に構築されており、主にFrontendノード(FE)とBackendノード(BE)が関与します。FEはメタデータ管理、クエリ解析、タスクスケジューリング、トランザクション調整を担当し、BEは実際のデータストレージ、計算、書き込み操作を処理します。Dorisのデータロード設計は、リアルタイム書き込み、ストリーミング同期、バッチロード、外部データソース統合などの多様なビジネスニーズを満たすことを目的としています。その核心概念には以下が含まれます:
- 一貫性と原子性:各ロードタスクは一つのトランザクションとして機能し、データ書き込みの原子性を保証し、部分書き込みを回避します。Label機構により、ロードされたデータの消失や重複を防止します。
- 柔軟性:複数のデータソース(ローカルファイル、HDFS、S3、Kafkaなど)と形式(CSV、JSON、Parquet、ORCなど)をサポートし、異なるシナリオに対応します。
- 効率性:分散アーキテクチャを活用して並列データ処理を行い、複数のBEノードがデータを並列処理してスループットを向上させます。
- 簡潔性:軽量ETL機能を提供し、ユーザーがロード中に直接データクリーニングと変換を実行でき、外部ツールへの依存を削減します。
- 柔軟なモデリング:詳細モデル(Duplicate Key)、主キーモデル(Unique Key)、集約モデル(Aggregate Key)をサポートし、ロード中にデータ集約や重複除去を可能にします。
一般的なロードプロセス
Dorisのデータロードプロセスは、いくつかの直感的なステップに分けることができます。使用するロード方法(Stream Load、Broker Load、Routine Loadなど)に関係なく、核心プロセスは基本的に一致しています。
-
ロードタスクの提出
- ユーザーはクライアント(HTTP、JDBC、MySQLクライアントなど)を通じてロード要求を提出し、データソース(ローカルファイル、Kafka Topics、HDFSファイルパスなど)、ターゲットテーブル、ファイル形式、ロードパラメータ(区切り文字、エラー許容度など)を指定します。
- 各タスクは、タスク識別と冪等性サポート(重複ロード防止)のために、一意のLabelを指定できます。例えば、ユーザーはStream LoadでHTTPヘッダーを通じてLabelを指定します。
- DorisのFrontendノード(FE)は要求を受信し、権限を検証し、ターゲットテーブルの存在を確認し、ロードパラメータを解析します。
-
タスクの割り当てと調整
- FEはデータ分散(テーブルのパーティション分割とバケット化ルールに基づく)を分析し、ロード計画を生成し、Backendノード(BE)をCoordinatorとして選択してタスク全体を調整します。
- ユーザーがBEに直接提出する場合(Stream Loadなど)、BEは直接Coordinatorとして機能できますが、それでもFEからメタデータ(テーブルSchemaなど)を取得する必要があります。
- ロード計画はデータを複数のBEノードに分散し、並列処理を確保して効率を向上させます。
-
データの読み取りと分散
- Coordinator BEはデータソースからデータを読み取ります(例:Kafkaからのメッセージ取得、S3からのファイル読み取り、HTTPデータストリームの直接受信)。
- Dorisはデータ形式を解析し(CSV分割、JSON解析など)、ユーザー定義の軽量ETL操作をサポートします。これには以下が含まれます:
- 事前フィルタリング:生データをフィルタリングして処理オーバーヘッドを削減します。
- 列マッピング:データ列とターゲットテーブル列の対応を調整します。
- データ変換:式を通じてデータを処理します。
- 事後フィルタリング:変換されたデータをフィルタリングします。
- データを解析後、Coordinator BEは、パーティション分割とバケット化ルールに従って、複数の下流Executor BEに分散します。
-
データの書き込み
- データは複数のBEノードに分散され、メモリテーブル(MemTable)に書き込まれ、Key列でソートされます。AggregateまたはUnique Keyモデルでは、DorisはKey(SUM、REPLACEなど)に従って集約または重複除去を実行します。
- MemTableが満杯になると(デフォルト200MB)またはタスクが終了すると、データは非同期でディスクに書き込まれ、列指向ストレージSegmentファイルを形成し、Rowsetを構成します。
- 各BEは割り当てられたデータを独立して処理し、書き込み完了後にCoordinatorにステータスを報告します。
-
トランザクションのコミットとパブリッシング
- CoordinatorはFEに対してトランザクションコミット(Commit)を開始します。FEは大部分のレプリカが正常に書き込まれたことを確認後、BEにデータバージョンのパブリッシュ(Publish Version)を通知します。BE Publishが成功すると、FEはトランザクションをVISIBLEとしてマークし、この時点でデータがクエリ可能になります。
- 失敗した場合、FEはロールバック(Rollback)をトリガーし、一時的なデータを削除して、データの一貫性を確保します。
-
結果の返却
- 同期方式(Stream Load、Insert Intoなど)は直接ロード結果を返し、成功/失敗ステータスとエラー詳細(ErrorURLなど)を含みます。
- 非同期方式(Broker Loadなど)はタスクIDとLabelを提供します。ユーザーはSHOW LOADを通じて進捗、エラー行数、詳細情報を確認できます。
- 操作は監査ログに記録され、後続のトレースに使用されます。
Memtable Forwarding
Memtable forwardingは、Apache Doris 2.1.0で導入された最適化機構で、INSERT INTO…SELECTロード方式のパフォーマンスを大幅に向上させます。公式テストでは、単一レプリカシナリオでロード時間が36%に、3レプリカシナリオで54%に短縮され、全体的なパフォーマンス向上が100%を超えています。従来のプロセスでは、SinkノードはデータをBlock形式にエンコードし、Ping-pong RPCを通じて下流ノードに送信する必要があり、複数のエンコード・デコード操作がオーバーヘッドを増加させていました。Memtable forwardingはこのプロセスを最適化します:SinkノードはMemTableを直接処理し、Segmentデータを生成し、Streaming RPCを通じて送信することで、エンコード/デコードと送信待機を削減し、より正確なメモリバックプレッシャーを提供します。現在、この機能はストレージ・コンピュート統合展開モードのみをサポートしています。
ストレージ・コンピュート分離ロード
ストレージ・コンピュート分離アーキテクチャでは、ロード最適化はデータストレージとトランザクション管理の分離に焦点を当てています:
- データストレージ:BEはデータを永続化しません。MemTable flush後、Segmentファイルは直接共有ストレージ(S3、HDFSなど)にアップロードされ、オブジェクトストレージの高可用性と低コストを活用して弾力的スケーリングをサポートします。BEローカルFile Cacheは非同期でホットデータをキャッシュし、TTLとWarmup戦略を通じてクエリヒット率を向上させます。メタデータ(Tablet、Rowsetメタデータなど)は、BEローカルRocksDBではなく、Meta ServiceがFoundationDBに格納します。
- トランザクション処理:トランザクション管理はFEからMeta Serviceに移行され、FE Edit Logの書き込みボトルネックを排除します。Meta Serviceは標準インターフェース(beginTransaction、commitTransaction)を通じてトランザクションを管理し、FoundationDBのグローバルトランザクション機能に依存して一貫性を確保します。BE coordinatorはMeta Serviceと直接やり取りし、トランザクション状態を記録し、アトミック操作を通じて競合とタイムアウト回復を処理し、同期ロジックを簡素化して高並行小バッチロードスループットを向上させます。
ロード方法
Dorisは複数のロード方法を提供しており、上記の原理を共有していますが、異なるシナリオ向けに最適化されています。ユーザーはデータソースとビジネスニーズに基づいて選択できます:
- Stream Load:HTTPを通じてローカルファイルやデータストリームをロードし、結果を同期で返し、リアルタイム書き込み(アプリケーションデータプッシュなど)に適しています。
- Broker Load:SQLを通じてHDFS、S3などの外部ストレージをロードし、非同期で実行し、大規模バッチロードに適しています。
- Routine Load:Kafkaからデータを継続的に消費し、Exactly-Onceサポート付きの非同期ストリーミングロードで、メッセージキューデータのリアルタイム同期に適しています。
- Insert Into/Select:SQLを通じてDorisテーブルまたは外部ソース(Hive、MySQL、S3 TVFなど)からロードし、ETLジョブと外部データ統合に適しています。
- MySQL Load:MySQL LOAD DATA構文と互換性があり、ローカルCSVファイルをロードし、FEを通じてStream Loadとしてデータを転送し、小規模テストやMySQLユーザー移行に適しています。
Dorisロードパフォーマンスの向上方法
Dorisのロードパフォーマンスは、その分散アーキテクチャとストレージ機構に影響され、核心的な側面はFEメタデータ管理、BE並列処理、MemTableキャッシュフラッシング、トランザクション管理が含まれます。以下の最適化戦略とその効果を、テーブル構造設計、バッチング戦略、バケット設定、メモリ管理、並行制御の観点から、ロード原理と組み合わせて説明します。
テーブル構造設計最適化:分散オーバーヘッドとメモリ圧迫の削減
Dorisのロードプロセスでは、データはFEによって解析され、その後テーブルパーティション分割とバケット化ルールに従ってBEノード上のTablet(データシャード)に分散され、MemTableを通じてBEメモリ内でキャッシュ・ソートされ、その後ディスクにフラッシュしてSegmentファイルを生成します。テーブル構造(パーティション分割、モデル、インデックス)は、データ分散効率、計算負荷、ストレージ断片化に直接影響します。
- パーティション設計:データ範囲の分離、分散とメモリ圧迫の削減
ビジネスクエリパターン(時間、地域など)に従ってパーティション分割することで、ロード中にデータはターゲットパーティションにのみ分散され、無関係なパーティションからのメタデータとファイル処理を回避します。複数のパーティションに同時に書き込むと、多くのTabletがアクティブになり、各Tabletが独立したMemTableを占有し、BEメモリ圧迫を大幅に増加させ、早期Flushをトリガーして多数の小さなSegmentファイルを生成する可能性があります。これはディスクやオブジェクトストレージI/Oオーバーヘッドを増加させるだけでなく、小さなファイルによる頻繁なCompactionと書き込み増幅を引き起こし、パフォーマンスを低下させます。アクティブパーティション数を制限することで(日次ロードなど)、同時にアクティブなTablet数を削減し、メモリ圧迫を緩和し、より大きなSegmentファイルを生成し、Compaction負荷を削減して、並列書き込み効率と後続のクエリパフォーマンスを向上させることができます。
- モデル選択:計算負荷の削減、書き込みの高速化
詳細モデル(Duplicate Key)は生データのみを格納し、集約や重複除去計算は行いません。一方、Aggregateモデルは主キー列による集約が必要で、Unique Keyモデルは重複除去が必要であり、どちらもCPUとメモリ消費を増加させます。重複除去や集約が不要なシナリオでは、詳細モデルを優先することで、BEノード上のMemTableステージでの追加計算(ソート、重複除去など)を回避し、メモリ使用量とCPU圧迫を削減し、データ書き込みプロセスを高速化できます。
- インデックス制御:クエリと書き込みオーバーヘッドのバランス
インデックス(ビットマップインデックス、転置インデックスなど)は、ロード中に同期更新が必要で、書き込み時のメンテナンスコストを増加させます。高頻度クエリフィールドのみにインデックスを作成し、冗長なインデックスを回避することで、BE書き込み中のインデックス更新操作(インデックス構築、検証など)を削減し、CPUとメモリ使用量を削減してロードスループットを向上させることができます。
バッチング最適化:トランザクションとストレージ断片化の削減
Dorisの各ロードタスクは独立したトランザクションであり、FE Edit Log書き込み(メタデータ変更の記録)とBE MemTableフラッシング(Segmentファイル生成)が関与します。高頻度小バッチロード(KB レベルなど)は、頻繁なEdit Log書き込み(FEディスクI/O増加)と頻繁なMemTableフラッシング(多数の小さなSegmentファイル生成、Compaction書き込み増幅トリガー)を引き起こし、パフォーマンスを大幅に低下させます。
- クライアント側バッチング:トランザクション数の削減、メタデータオーバーヘッドの低減
クライアントは数百MBから数GBまでデータを蓄積してから一度にロードし、トランザクション数を削減します。単一の大きなトランザクションが複数の小さなトランザクションを置き換えることで、FE Edit Log書き込み頻度(メタデータ操作の削減)とBE MemTableフラッシュ頻度(小ファイル生成の削減)を削減し、ストレージ断片化と後続のCompactionリソース消費を回避できます。
- サーバー側バッチング(Group Commit):小トランザクションの統合、ストレージ効率の最適化
Group Commitを有効にすると、サーバーは短時間内の複数の小バッチロードを単一のトランザクションに統合し、Edit Log書き込み回数とMemTableフラッシュ頻度を削減します。統合された大きなトランザクションは、より大きなSegmentファイル(小ファイルの削減)を生成し、バックグラウンドCompaction圧迫を緩和し、特に高頻度小バッチシナリオ(ログ記録、IoTデータ書き込みなど)に適しています。
バケット数最適化:負荷と分散効率のバランス
バケット数はTablet数を決定し(各バケットが1つのTabletに対応)、BEノード上のデータ分散に直接影響します。バケット数が少なすぎるとデータスキュー(単一BEの過負荷)を引き起こしやすく、多すぎるとメタデータ管理と分散オーバーヘッド(BEがより多くのTabletのMemTableとSegmentファイルを処理する必要)を増加させます。
- 適切なバケット数設定:バランスの取れたTabletサイズの確保
バケット数は、BEノード数とデータ量に応じて設定すべきで、推奨される単一Tablet圧縮データサイズは1-10GB(計算式:バケット数 = 総データ量 / (1-10GB))です。同時に、バケットキー(ランダム数列など)を調整してデータスキューを回避します。適切なバケット化により、BEノード負荷をバランスさせ、単一ノードの過負荷や複数ノードのリソース浪費を回避し、並列書き込み効率を向上させることができます。
- ランダムバケット化最適化:RPCオーバーヘッドとCompaction圧迫の削減
ランダムバケット化シナリオでは、load_to_single_tablet=trueを有効にすることで、データを直接単一のTabletに書き込み、複数のTabletへの分散をバイパスできます。これにより、Tablet分散を計算するCPUオーバーヘッドとBE間のRPC送信オーバーヘッドを排除し、書き込み速度を大幅に向上させます。同時に、単一のTabletへの集中書き込みは小さなSegmentファイルの生成を削減し、頻繁なCompactionによる書き込み増幅を回避し、BEリソース消費とストレージ断片化を削減して、ロードとクエリ効率を向上させます。
メモリ最適化:フラッシングとリソース影響の削減
データロード中、BEは最初にデータをメモリMemTable(デフォルト200MB)に書き込み、その後満杯時に非同期でディスクにフラッシュしてSegmentファイル(ディスクI/Oトリガー)を生成します。高頻度フラッシングはディスクやオブジェクトストレージ(ストレージ・コンピュート分離シナリオ)のI/O圧迫を増加させ、メモリ不足はMemTable分散(複数パーティション/バケットシナリオで)を引き起こし、頻繁なフラッシングやOOMを容易にトリガーします。
- パーティション順次ロード:メモリ使用の集中
パーティション順序(日次など)でロードし、データ書き込みを単一パーティションに集中させることで、MemTable分散(複数パーティションで各パーティションにMemTable割り当てが必要)とフラッシュ頻度を削減し、メモリ断片化とI/O圧迫を削減します。
- 大規模データバッチロード:リソース影響の削減
大きなファイルや複数ファイルロード(Broker Loadなど)では、バッチング(≤100GB/バッチ)を推奨し、ロードエラー後の高い再試行コストを回避すると同時に、BEメモリとディスクの集中占有を削減します。ローカル大ファイルはstreamloaderツールを使用して自動バッチロードを行うことができます。
並行最適化:スループットとリソース競合のバランス
Dorisの分散アーキテクチャは、複数のBE並列書き込みをサポートします。並行性を高めることでスループットを向上させることができますが、過度な並行性はCPU、メモリ、またはオブジェクトストレージQPS競合(ストレージ・コンピュート分離シナリオでは、S3などのAPIのQPS制限を考慮する必要)を引き起こし、トランザクション競合とレイテンシを増加させます。
- 適切な並行制御:ハードウェアリソースとのマッチング
BEノード数とハードウェアリソース(CPU、メモリ、ディスクI/O)に基づいて並行スレッドを設定します。適度な並行性はBE並列処理能力を十分に活用してスループットを向上させ、過度な並行性はリソース競合によって効率を低下させます。
- 低レイテンシシナリオ:並行性の削減と非同期提出
低レイテンシ要件シナリオ(リアルタイム監視など)では、並行数を削減し(リソース競合の回避)、Group Commitの非同期モード(async_mode)を組み合わせて小トランザクションを統合し、トランザクションコミットレイテンシを削減します。
Dorisデータロードのレイテンシとスループットのトレードオフ
Apache Dorisを使用する際、実際のビジネスシナリオでは、データロードのレイテンシとスループットのバランスを取る必要があることが多くあります:
- 低レイテンシ:ユーザーが最新データをより早く確認できることを意味しますが、小さな書き込みバッチとより高い書き込み頻度により、より頻繁なバックグラウンドCompactionが発生し、より多くのCPU、IO、メモリリソースを消費し、メタデータ管理圧迫を増加させます。
- 高スループット:単一ロードデータ量を増加させることでロード回数を削減し、メタデータ圧迫とバックグラウンドCompactionオーバーヘッドを大幅に削減して、システム全体のパフォーマンスを向上させることができます。ただし、データ書き込みから可視化までのレイテンシが増加します。
そのため、ユーザーはビジネスレイテンシ要件を満たしつつ単一ロードデータ量を最大化して、スループットを向上させ、システムオーバーヘッドを削減することが推奨されます。
テストデータ
Flink End-to-Endレイテンシ
Flink Connectorをバッチングモードで書き込みに使用し、主にend-to-endレイテンシとロードスループットに焦点を当てています。バッチング時間は、Flink Connectorのsink.buffer-flush.intervalパラメータによって制御されます。Flink Connectorの詳細な使用方法については、Flink-Doris-Connectorを参照してください。
マシン構成:
- 1 FE:8コアCPU、16GBメモリ
- 3 BE:16コアCPU、64GBメモリ
データセット:
- TPCH lineitemデータ
異なるバッチング時間と並行レベルでのロードパフォーマンス、テスト結果:
| バッチ時間(秒) | ロード並行性 | バケット数 | スループット(行/秒) | End-to-End平均レイテンシ(秒) | End-to-End P99レイテンシ(秒) |
|---|---|---|---|---|---|
| 0.2 | 1 | 32 | 6073 | 0.211 | 0.517 |
| 1 | 1 | 32 | 31586 | 0.71 | 1.39 |
| 10 | 1 | 32 | 67437 | 5.65 | 10.90 |
| 20 | 1 | 32 | 93769 | 10.962 | 20.682 |
| 60 | 1 | 32 | 125000 | 32.46 | 62.17 |
| 0.2 | 10 | 32 | 9300 | 0.38 | 0.704 |
| 1 | 10 | 32 | 34633 | 0.75 | 1.47 |
| 10 | 10 | 32 | 82023 | 5.44 | 10.43 |
| 20 | 10 | 32 | 139731 | 11.12 | 22.68 |
| 60 | 10 | 32 | 171642 | 32.37 | 61.93 |
異なるバケット数がロードパフォーマンスに与える影響、テスト結果:
| バッチ時間(秒) | ロード並行性 | バケット数 | スループット(行/秒) | End-to-End平均レイテンシ(秒) | End-to-End P99レイテンシ(秒) |
|---|---|---|---|---|---|
| 1 | 10 | 4 | 34722 | 0.86 | 2.28 |
| 1 | 10 | 16 |