メインコンテンツまでスキップ

Postgres/MySQL継続的負荷

概要

Job を使用して MySQL、Postgres などの複数テーブルから Doris への全量データおよび増分データの継続的な同期を Streaming Job 経由でサポートします。Doris へのリアルタイム複数テーブルデータ同期が必要なシナリオに適しています。

サポートされているデータソース

  • MySQL
  • Postgres

基本原理

Flink CDC を統合することで、Doris は MySQL、Postgres などから変更ログを読み取ることをサポートし、全量および増分複数テーブルデータ同期を可能にします。初回同期時、Doris は自動的にダウンストリームテーブル(プライマリキーテーブル)を作成し、プライマリキーをアップストリームと一致させ続けます。

注意事項:

  1. 現在、at-least-once セマンティクスのみが保証されています。
  2. プライマリキーテーブルのみが同期でサポートされています。
  3. LOAD 権限が必要です。ダウンストリームテーブルが存在しない場合、CREATE 権限も必要です。

クイックスタート

前提条件

MySQL

my.cnf に以下を追加して MySQL で Binlog を有効にします:

log-bin=mysql-bin
binlog_format=ROW
server-id=1

Postgres

以下をpostgresql.confに追加することで、Postgresで論理レプリケーションを有効にします:

wal_level=logical

インポートジョブの作成

MySQL

CREATE JOB multi_table_sync
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://127.0.0.1:3306",
"driver_url" = "mysql-connector-j-8.0.31.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "test",
"include_tables" = "user_info,order_info",
"offset" = "initial"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1"
)

Postgres

CREATE JOB test_postgres_job
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/postgres",
"driver_url" = "postgresql-42.5.0.jar",
"driver_class" = "org.postgresql.Driver",
"user" = "postgres",
"password" = "postgres",
"database" = "postgres",
"schema" = "public",
"include_tables" = "test_tbls",
"offset" = "latest"
)
TO DATABASE target_test_db (
"table.create.properties.replication_num" = "1"
)

Import ステータスの確認

select * from jobs(type=insert) where ExecuteType = "STREAMING"
Id: 1765332859199
Name: mysql_db_sync
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1')
CreateTime: 2025-12-10 10:19:35
SucceedTaskCount: 1
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: \N
CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"}
EndOffset: \N
LoadStatistic: {"scannedRows":24,"loadBytes":1146,"fileNumber":0,"fileSize":0}
ErrorMsg: \N

Import ジョブの一時停止

PAUSE JOB WHERE jobname = <job_name> ;

Resume Import Job

RESUME JOB where jobName = <job_name> ;

Import Jobの変更

ALTER JOB <job_name>
FROM MYSQL (
"user" = "root",
"password" = "123456"
)
TO DATABASE target_test_db

Import Job の削除

DROP JOB where jobName = <job_name> ;

リファレンスマニュアル

Import コマンド

マルチテーブル同期ジョブを作成するための構文:

CREATE JOB <job_name>
ON STREAMING
[job_properties]
[ COMMENT <comment> ]
FROM <MYSQL|POSTGRES> (
[source_properties]
)
TO DATABASE <target_db> (
[target_properties]
)
ModuleDescription
job_nameジョブ名
job_properties一般的なインポートパラメータ
commentジョブコメント
source_propertiesソース(MySQL/PG)パラメータ
target_propertiesDoris ターゲットDBパラメータ

インポートパラメータ

FE設定パラメータ

ParameterDefaultDescription
max_streaming_job_num1024Streamingジョブの最大数
job_streaming_task_exec_thread_num10StreamingTaskのスレッド数
max_streaming_task_show_count100メモリ内のStreamingTaskレコードの最大数

一般的なジョブインポートパラメータ

ParameterDefaultDescription
max_interval10s新しいデータがない場合のアイドルスケジューリング間隔

ソース設定パラメータ

ParameterDefaultDescription
jdbc_url-JDBC接続文字列(MySQL/PG)
driver_url-JDBCドライバーjarパス
driver_class-JDBCドライバークラス名
user-データベースユーザー名
password-データベースパスワード
database-データベース名
schema-スキーマ名
include_tables-同期するテーブル、カンマ区切り
offsetinitialinitial: フル + 増分、latest: 増分のみ
snapshot_split_size8096各スプリットのサイズ(行数)。フル同期中、テーブルは同期のために複数のスプリットに分割されます。
snapshot_parallelism1フル同期フェーズでの並列度レベル、つまり単一タスクが一度にスケジュールできるスプリットの最大数。

Doris ターゲットDBパラメータ

ParameterDefaultDescription
table.create.properties.*-作成時のテーブルプロパティ、例:replication_num
load.strict_mode-厳密モードを有効にするかどうか。デフォルトで無効。
load.max_filter_ratio-サンプリングウィンドウ内で許可される最大フィルタリング比率。0から1(包括的)の間である必要があります。デフォルト値は0で、ゼロ許容を示します。サンプリングウィンドウはmax_interval * 10に等しいです。このウィンドウ内で、エラー行と総行数の比率がmax_filter_ratioを超えた場合、スケジュールされたジョブは一時停止され、データ品質の問題に対処するための手動介入が必要になります。

インポートステータス

Job

ジョブを送信した後、以下のSQLを実行してジョブステータスを確認できます:

select * from jobs(type=insert) where ExecuteType = "STREAMING"
*************************** 1. row ***************************
Id: 1765332859199
Name: mysql_db_sync
Definer: root
ExecuteType: STREAMING
RecurringStrategy: \N
Status: RUNNING
ExecuteSql: FROM MYSQL('include_tables'='user_info','database'='test','driver_class'='com.mysql.cj.jdbc.Driver','driver_url'='mysql-connector-j-8.0.31.jar','offset'='initial','jdbc_url'='jdbc:mysql://127.0.0.1:3306','user'='root' ) TO DATABASE target_test_db ('table.create.properties.replication_num'='1')
CreateTime: 2025-12-10 10:19:35
SucceedTaskCount: 2
FailedTaskCount: 0
CanceledTaskCount: 0
Comment:
Properties: \N
CurrentOffset: {"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","splitId":"binlog-split","row":"1","event":"2","server_id":"1"}
EndOffset: {"ts_sec":"0","file":"binlog.000003","pos":"157","kind":"SPECIFIC","gtids":"","row":"0","event":"0"}
LoadStatistic: {"scannedRows":3,"loadBytes":232,"fileNumber":0,"fileSize":0}
ErrorMsg: \N
結果列説明
IDジョブID
NAMEジョブ名
Definerジョブ定義者
ExecuteTypeジョブタイプ: ONE_TIME/RECURRING/STREAMING/MANUAL
RecurringStrategy繰り返し戦略、Streamingの場合は空
Statusジョブステータス
ExecuteSqlジョブのInsert SQL文
CreateTimeジョブ作成時間
SucceedTaskCount成功したタスク数
FailedTaskCount失敗したタスク数
CanceledTaskCountキャンセルされたタスク数
Commentジョブコメント
Propertiesジョブプロパティ
CurrentOffset現在のオフセット、Streamingジョブのみ
EndOffsetソースからの最大終了オフセット、Streamingのみ
LoadStatisticジョブ統計情報
ErrorMsgジョブエラーメッセージ
JobRuntimeMsgジョブランタイム情報

Task

各Taskのステータスを確認するには、以下のSQLを実行できます:

select * from tasks(type='insert') where jobId='1765336137066'
*************************** 1. row ***************************
TaskId: 1765336137066
JobId: 1765332859199
JobName: mysql_db_sync
Label: 1765332859199_1765336137066
Status: SUCCESS
ErrorMsg: \N
CreateTime: 2025-12-10 11:09:06
StartTime: 2025-12-10 11:09:16
FinishTime: 2025-12-10 11:09:18
TrackingUrl: \N
LoadStatistic: {"scannedRows":1,"loadBytes":333}
User: root
FirstErrorMsg:
RunningOffset: {"endOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9521","kind":"SPECIFIC","row":"1","event":"2","server_id":"1"},"startOffset":{"ts_sec":"1765284495","file":"binlog.000002","pos":"9350","kind":"SPECIFIC","row":"1","splitId":"binlog-split","event":"2","server_id":"1"},"splitId":"binlog-split"}
Result ColumnDescription
TaskIdタスクID
JobIDジョブID
JobNameジョブ名
Labelタスクラベル
Statusタスクステータス
ErrorMsgタスクエラーメッセージ
CreateTimeタスク作成時間
StartTimeタスク開始時間
FinishTimeタスク終了時間
LoadStatisticタスク統計
Userタスク実行者
RunningOffset現在のオフセット、Streamingジョブのみ