メインコンテンツまでスキップ

Flink Doris コネクター

Flink Doris Connector は、Flinkを通じてDorisクラスターからデータを読み取り、データを書き込むために使用されます。また、FlinkCDCも統合されており、MySQLなどの上流データベースとのより便利な完全データベース同期を可能にします。

Flink Connectorを使用すると、以下の操作を実行できます:

  • Dorisからデータを読み取り: Flink ConnectorはBEからの並列読み取りをサポートし、データ取得効率を向上させます。

  • Dorisにデータを書き込み: Flinkでバッチ処理した後、Stream Loadを使用してデータをDorisに一括インポートします。

  • Lookup Joinを使用したディメンションテーブル結合を実行: バッチ処理と非同期クエリにより、ディメンションテーブル結合を高速化します。

  • 完全データベース同期: Flink CDCを使用して、MySQL、Oracle、PostgreSQLなどのデータベース全体を同期でき、自動テーブル作成とDDL操作も含まれます。

バージョン説明

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8-
1.3.01.161.0+8-
1.4.01.15 - 1.171.0+8-
1.5.21.15 - 1.181.0+8-
1.6.11.15 - 1.191.0+8-
24.0.11.15 - 1.201.0+8-
24.1.01.15 - 1.201.0+8-
25.0.01.15 - 1.201.0+8-
25.1.01.15 - 1.201.0+8-
26.0.01.15 - 1.20,2.0 - 2.21.0+8(1.x),17(2.x)-

使用方法

Flink Doris Connectorは、JarまたはMavenの2つの方法で使用できます。

Jar

対応するバージョンのFlink Doris Connector Jarファイルをこちらからダウンロードし、このファイルをFlinkセットアップのclasspathにコピーしてFlink-Doris-Connectorを使用できます。StandaloneモードのFlinkデプロイメントの場合、このファイルをlib/フォルダー下に配置してください。Yarnモードで実行されるFlinkクラスターの場合、ファイルを事前デプロイメントパッケージに配置してください。

Maven

Mavenで使用するには、Pomファイルに以下の依存関係を追加するだけです:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.version}</artifactId>
<version>${connector.version}</version>
</dependency>

例えば:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.1.0</version>
</dependency>

動作原理

Dorisからのデータ読み取り

FlinkConnectorPrinciples-JDBC-Doris

データ読み取り時、Flink Doris ConnectorはFlink JDBC Connectorと比較してより高いパフォーマンスを提供し、使用が推奨されます:

  • Flink JDBC Connector: DorisはMySQLプロトコルと互換性がありますが、Dorisクラスターへの読み書きにFlink JDBC Connectorを使用することは推奨されません。このアプローチは単一のFEノードでの直列読み書き操作を引き起こし、ボトルネックを作成してパフォーマンスに影響を与えます。

  • Flink Doris Connector: Doris 2.1以降、ADBCがFlink Doris Connectorのデフォルトプロトコルです。読み取りプロセスは以下の手順に従います:

    a. Flink Doris Connectorはまず、クエリプランに基づいてFEからTablet ID情報を取得します。

    b. クエリ文を生成します:SELECT * FROM tbs TABLET(id1, id2, id3)

    c. その後、クエリはFEのADBCポートを通じて実行されます。

    d. データはBEから直接返され、FEを迂回して単一点ボトルネックを排除します。

Dorisへのデータ書き込み

データ書き込みにFlink Doris Connectorを使用する場合、Stream Load経由での一括インポート前にFlinkのメモリ内でバッチ処理が実行されます。Doris Flink Connectorは2つのバッチモードを提供し、Flink Checkpointベースのストリーミング書き込みがデフォルトです:

Streaming WriteBatch Write
トリガー条件Flink Checkpointsに依存し、FlinkのCheckpointサイクルに従ってDorisに書き込みコネクター定義の時間またはデータ量閾値に基づく定期的な送信
一貫性Exactly-OnceAt-Least-Once; 主キーモデルでExactly-Onceを保証可能
レイテンシFlink checkpointインターバルにより制限、一般的により高い柔軟な調整が可能な独立したバッチメカニズム
耐障害性と復旧Flink状態復旧と完全に一致外部重複排除ロジックに依存(例:Doris主キー重複排除)

クイックスタート

準備

Flinkクラスターのデプロイ

Standaloneクラスターを例にします:

  1. Flinkインストールパッケージをダウンロードします(例:Flink 1.18.1);
  2. 展開後、Flink Doris Connectorパッケージを<FLINK_HOME>/libに配置します;
  3. <FLINK_HOME>ディレクトリに移動し、bin/start-cluster.shを実行してFlinkクラスターを開始します;
  4. jpsコマンドを使用してFlinkクラスターが正常に開始されたかを確認できます。

Dorisテーブルの初期化

以下のステートメントを実行してDorisテーブルを作成します:

CREATE DATABASE test;

CREATE TABLE test.student (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

INSERT INTO test.student values(1,"James",18);
INSERT INTO test.student values(2,"Emily",28);

CREATE TABLE test.student_trans (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

FlinkSQLタスクの実行

FlinkSQLクライアントの開始

bin/sql-client.sh

FlinkSQLを実行する

CREATE TABLE Student (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

CREATE TABLE StudentTrans (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student_trans',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);

INSERT INTO StudentTrans SELECT id, concat('prefix_',name), age+1 FROM Student;

クエリデータ

mysql> select * from test.student_trans;
+------+--------------+------+
| id | name | age |
+------+--------------+------+
| 1 | prefix_James | 19 |
| 2 | prefix_Emily | 29 |
+------+--------------+------+
2 rows in set (0.02 sec)

シナリオと操作

Dorisからのデータ読み取り

FlinkがDorisからデータを読み取る際、Doris Sourceは現在有界ストリームであり、CDC方式での継続的な読み取りはサポートしていません。DorisからのデータはThriftまたはArrowFlightSQL(バージョン24.0.0以降でサポート)を使用して読み取ることができます。バージョン2.1以降では、ArrowFlightSQLが推奨されるアプローチです。

  • Thrift: BEのThriftインターフェースを呼び出すことでデータを読み取ります。詳細な手順については、Reading Data via Thrift Interfaceを参照してください。
  • ArrowFlightSQL: Doris 2.1をベースとしたこの方法では、Arrow Flight SQLプロトコルを使用して大量のデータを高速で読み取ることができます。詳細については、High-speed Data Transfer via Arrow Flight SQLを参照してください。

FlinkSQLを使用したデータ読み取り

Thrift方式
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030', -- Fe的host:HttpPort
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;
ArrowFlightSQL
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '{fe.conf:http_port}',
'table.identifier' = 'test.student',
'source.use-flight-sql' = 'true',
'source.flight-sql-port' = '{fe.conf:arrow_flight_sql_port}',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;

DataStream APIを使用したデータの読み取り

DataStream APIを使用してデータを読み取る場合、「使用方法」セクションで説明されているように、事前にプログラムのPOMファイルに依存関係を含める必要があります。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");

完全なコードについては、以下を参照してください:DorisSourceDataStream.java

DorisへのデータWriting

FlinkはStream Load方法を使用してDorisにデータを書き込み、streamingとbatch-insertionの両方のモードをサポートしています。

StreamingとBatch-insertionの違い

Connector 1.5.0以降、batch-insertionがサポートされています。Batch-insertionはCheckpointsに依存しません。メモリ内でデータをバッファリングし、バッチパラメータに基づいて書き込みタイミングを制御します。Streaming insertionはCheckpointsの有効化が必要で、Checkpoint期間全体を通じて上流データを継続的にDorisに書き込み、データをメモリ内に継続的に保持しません。

FlinkSQLを使用したデータWriting

テスト用に、FlinkのDatagenを使用して継続的に生成される上流データをシミュレートします。

-- enable checkpoint
SET 'execution.checkpointing.interval' = '30s';

CREATE TABLE student_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '20',
'fields.id.min' = '1',
'fields.id.max' = '100000',
'fields.age.min' = '3',
'fields.age.max' = '30'
);

-- doris sink
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
--'sink.enable.batch-mode' = 'true' Adding this configuration enables batch writing
);

INSERT INTO student_sink SELECT * FROM student_source;

DataStream APIを使用したデータの書き込み

DataStream APIを使用してデータを書き込む場合、異なるシリアライゼーション方法を使用して、Dorisテーブルに書き込む前に上流データをシリアライズできます。

備考

ConnectorにはすでにHttpClient4.5.13バージョンが含まれています。プロジェクトでHttpClientを別途参照する場合は、バージョンの一致を確認する必要があります。

標準文字列形式

上流データがCSVまたはJSON形式の場合、SimpleStringSerializerを直接使用してデータをシリアライズできます。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();

DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

Properties properties = new Properties();
// When the upstream data is in json format, the following configuration needs to be enabled
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");

// When writing csv data from the upstream, the following configurations need to be enabled
//properties.setProperty("format", "csv");
//properties.setProperty("column_separator", ",");

DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setDeletable(false)
//.setBatchMode(true) Enable batch writing
.setStreamLoadProp(properties)
.build();

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisOptions);

List<String> data = new ArrayList<>();
data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");

env.fromCollection(data).sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExample.javaを参照してください。

RowData Format

RowDataはFlinkの内部フォーマットです。上流のデータがRowData形式の場合、RowDataSerializerを使用してデータをシリアライズする必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);

DorisSink.Builder<RowData> builder = DorisSink.builder();

Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
// When writing json data from the upstream, the following configuration needs to be enabled
// properties.setProperty("read_json_by_line", "true");
// properties.setProperty("format", "json");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setDeletable(false).setStreamLoadProp(properties);

// flink rowdata‘s schema
String[] fields = {"id","name", "age"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()};

builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to rowdata
.setType(LoadConstants.CSV)
.setFieldDelimiter(",")
.setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());

// mock rowdata source
DataStream<RowData> source =
env.fromElements("")
.flatMap(
new FlatMapFunction<String, RowData>() {
@Override
public void flatMap(String s, Collector<RowData> out)
throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, 1);
genericRowData.setField(1, StringData.fromString("Michael"));
genericRowData.setField(2, 18);
out.collect(genericRowData);

GenericRowData genericRowData2 = new GenericRowData(3);
genericRowData2.setField(0, 2);
genericRowData2.setField(1, StringData.fromString("David"));
genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});

source.sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExampleRowData.java を参照してください。

Debezium Format

FlinkCDCからのデータやKafkaのDebezium formatなど、Debezium形式の上流データについては、JsonDebeziumSchemaSerializerを使用してデータをシリアライズできます。

// enable checkpoint
env.enableCheckpointing(10000);

Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("").build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix")
.setStreamLoadProp(props)
.setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.sinkTo(builder.build());

完全なコードについては、CDCSchemaChangeExample.javaを参照してください。

マルチテーブル書き込みフォーマット

現在、DorisSinkは単一のSinkで複数のテーブルの同期をサポートしています。データとデータベース/テーブル情報の両方をSinkに渡し、RecordWithMetaSerializerを使用してシリアル化する必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

executionBuilder
.setLabelPrefix("label-doris")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBatchMode(true);

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

RecordWithMeta record = new RecordWithMeta("test", "student_1", "1,David,18");
RecordWithMeta record1 = new RecordWithMeta("test", "student_2", "1,Jack,28");
env.fromCollection(Arrays.asList(record, record1)).sinkTo(builder.build());

完全なコードについては、DorisSinkMultiTableExample.javaを参照してください。

Lookup Join

Lookup Joinを使用すると、FlinkでのディメンションテーブルのJoinを最適化できます。ディメンションテーブルのJoinにFlink JDBC Connectorを使用する場合、以下の問題が発生する可能性があります:

  • Flink JDBC Connectorは同期クエリモードを使用します。これは、上流データ(例:Kafkaからのデータ)がレコードを送信した後、即座にDorisディメンションテーブルにクエリを実行することを意味します。これにより、高並行性シナリオではクエリレイテンシが高くなります。

  • JDBCを介して実行されるクエリは、通常レコードごとのポイントルックアップですが、Dorisでは効率性を高めるためにバッチクエリを推奨しています。

Flink Doris ConnectorでディメンションテーブルのJoinにLookup Joinを使用することで、以下の利点が得られます:

  • 上流データのバッチキャッシュにより、レコードごとのクエリによる高レイテンシとデータベース負荷を回避します。

  • Joinクエリの非同期実行により、データスループットを改善し、Dorisへのクエリ負荷を軽減します。

CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);

create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

フルデータベース同期

Flink Doris ConnectorはFlink CDCFlink CDC Documentation)を統合しており、MySQLなどのリレーショナルデータベースをDorisに同期することを容易にします。この統合には、自動テーブル作成、スキーマ変更なども含まれます。同期をサポートするデータベースには次が含まれます:MySQL、Oracle、PostgreSQL、SQLServer、MongoDB、DB2。

Note
  1. フルデータベース同期を使用する場合、$FLINK_HOME/libディレクトリに対応するFlink CDCの依存関係(Fat Jar)を追加する必要があります。例:flink-sql-connector-mysql-cdc-version.jarflinksqlconnectororaclecdc{version}.jar**、**flink-sql-connector-oracle-cdc-{version}.jar。FlinkCDCバージョン3.1以降は以前のバージョンと互換性がありません。依存関係は以下のリンクからダウンロードできます:FlinkCDC 3.xFlinkCDC 2.x
  2. Connector 24.0.0以降のバージョンでは、必要なFlink CDCバージョンは3.1以上である必要があります。ここからダウンロードできます。Flink CDCを使用してMySQLやOracleを同期する場合、$FLINK_HOME/libの下に関連するJDBCドライバーも追加する必要があります。

MySQL全データベース同期

Flinkクラスターを開始した後、以下のコマンドを直接実行できます:

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

Oracle データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

PostgreSQL データベース全体同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

SQLServer データベース全体同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

DB2 データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
db2-sync-database \
--database db2_test \
--db2-conf hostname=127.0.0.1 \
--db2-conf port=50000 \
--db2-conf username=db2inst1 \
--db2-conf password=doris123456 \
--db2-conf database-name=testdb \
--db2-conf schema-name=DB2INST1 \
--including-tables "FULL_TYPES|CUSTOMERS" \
--single-sink true \
--use-new-schema-change true \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

MongoDB 全データベース同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.18-24.0.1.jar \
mongodb-sync-database \
--database doris_db \
--schema-change-mode debezium_structure \
--mongodb-conf hosts=127.0.0.1:27017 \
--mongodb-conf username=flinkuser \
--mongodb-conf password=flinkpwd \
--mongodb-conf database=test \
--mongodb-conf scan.startup.mode=initial \
--mongodb-conf schema.sample-percent=0.2 \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--sink-conf sink.enable-2pc=false \
--table-conf replication_num=1

AWS Aurora MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

AWS RDS MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

使用方法

パラメータ設定

一般設定項目

Keyデフォルト値必須コメント
fenodes--YDoris FE httpアドレス。複数のアドレスをサポートし、カンマで区切る必要があります。
benodes--NDoris BE httpアドレス。複数のアドレスをサポートし、カンマで区切る必要があります。
jdbc-url--NJDBC接続情報、例:jdbc:mysql://127.0.0.1:9030。
table.identifier--YDorisテーブル名、例:db.tbl。
username--YDorisにアクセスするためのユーザー名。
password--YDorisにアクセスするためのパスワード。
auto-redirectTRUENStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFE経由で書き込みを行い、明示的にBE情報を取得しなくなります。
doris.request.retries3NDorisへのリクエスト送信の再試行回数。
doris.request.connect.timeout30sNDorisへのリクエスト送信の接続タイムアウト。
doris.request.read.timeout30sNDorisへのリクエスト送信の読み取りタイムアウト。

ソース設定

Keyデフォルト値必須コメント
doris.request.query.timeout21600sNDorisクエリのタイムアウト。デフォルト値は6時間です。
doris.request.tablet.size1N1つのPartitionに対応するDoris Tabletsの数。この値を小さく設定するほど、より多くのPartitionが生成され、Flink側の並列性を向上させることができます。ただし、Dorisにより多くの負荷をかけることにもなります。
doris.batch.size4064NBEから一度に読み取る行の最大数。この値を増やすことで、FlinkとDoris間で確立される接続数を減らし、ネットワーク遅延による追加の時間オーバーヘッドを削減できます。
doris.exec.mem.limit8192mbN単一クエリのメモリ制限。デフォルトは8GB、バイト単位。
source.use-flight-sqlFALSEN読み取りにArrow Flight SQLを使用するかどうか。
source.flight-sql-port-NArrow Flight SQLを読み取りに使用する場合のFEのarrow_flight_sql_port。

DataStream固有の設定

Keyデフォルト値必須コメント
doris.read.field--NDorisテーブルを読み取るためのカラム名リスト。複数のカラムはカンマで区切る必要があります。
doris.filter.query--N読み取りデータをフィルタリングするための式。この式はDorisに渡されます。Dorisはこの式を使用してソースデータのフィルタリングを完了します。例:age=18。

Sink設定

Keyデフォルト値必須コメント
sink.label-prefix--YStream loadインポートに使用されるラベルプレフィックス。2pcシナリオでは、FlinkのEOSセマンティクスを保証するためにグローバルにユニークである必要があります。
sink.properties.*--NStream Loadのインポートパラメータ。例:'sink.properties.column_separator' = ', 'はカラム区切り文字を定義し、'sink.properties.escape_delimiters' = 'true'は\x01などの区切り文字としての特殊文字がバイナリ0x01に変換されることを意味します。JSON形式のインポートの場合、'sink.properties.format' = 'json'、'sink.properties.read_json_by_line' = 'true'。詳細なパラメータについては、こちらを参照してください。Group Commitモードの場合、例:'sink.properties.group_commit' = 'sync_mode'はgroup commitを同期モードに設定します。Flinkコネクタはバージョン1.6.2からインポート設定group commitをサポートしています。詳細な使用方法と制限については、group commitを参照してください。
sink.enable-deleteTRUEN削除を有効にするかどうか。このオプションではDorisテーブルでバッチ削除機能が有効になっている必要があり(Doris 0.15+バージョンではデフォルトで有効)、Uniqueモデルのみをサポートします。
sink.enable-2pcTRUEN2フェーズコミット(2pc)を有効にするかどうか。デフォルトはtrueで、Exactly-Onceセマンティクスを保証します。2フェーズコミットの詳細については、こちらを参照してください。
sink.buffer-size1MBN書き込みデータキャッシュバッファのサイズ、バイト単位。変更は推奨されず、デフォルト設定を使用できます。
sink.buffer-count3N書き込みデータキャッシュバッファの数。変更は推奨されず、デフォルト設定を使用できます。
sink.max-retries3NCommit失敗後の最大再試行回数。デフォルトは3回です。
sink.enable.batch-modeFALSENDorisへの書き込みにバッチモードを使用するかどうか。有効にすると、書き込みタイミングはCheckpointに依存せず、sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes、sink.buffer-flush.intervalなどのパラメータによって制御されます。同時に、有効にするとExactly-onceセマンティクスは保証されませんが、UniqモデルのサポートによりIdempotencyを実現できます。
sink.flush.queue-size2Nバッチモードでのキャッシュキューのサイズ。
sink.buffer-flush.max-rows500000Nバッチモードで単一バッチで書き込まれる最大行数。
sink.buffer-flush.max-bytes100MBNバッチモードで単一バッチで書き込まれる最大バイト数。
sink.buffer-flush.interval10sNバッチモードでキャッシュを非同期にフラッシュする間隔。
sink.ignore.update-beforeTRUENupdate-beforeイベントを無視するかどうか。デフォルトは無視します。

Lookup Join設定

Keyデフォルト値必須コメント
lookup.cache.max-rows-1Nlookupキャッシュの最大行数。デフォルト値は-1で、キャッシュが有効でないことを意味します。
lookup.cache.ttl10sNlookupキャッシュの最大時間。デフォルトは10秒です。
lookup.max-retries1Nlookupクエリ失敗後の再試行回数。
lookup.jdbc.asyncFALSEN非同期lookupを有効にするかどうか。デフォルトはfalseです。
lookup.jdbc.read.batch.size128N非同期lookupでの各クエリの最大バッチサイズ。
lookup.jdbc.read.batch.queue-size256N非同期lookup中の中間バッファキューのサイズ。
lookup.jdbc.read.thread-size3N各タスクでlookup用のjdbcスレッド数。

全データベース同期設定

構文

<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.6.1.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]

設定

KeyComment
--job-nameFlinkタスクの名前で、オプションです。
--databaseDorisに同期するデータベースの名前。
--table-prefixDorisテーブルのプレフィックス名。例:--table-prefix ods_。
--table-suffixDorisテーブルのサフィックス名。プレフィックスと同様です。
--including-tables同期が必要なMySQLテーブル。複数のテーブルは|で区切ることができ、正規表現がサポートされています。例:--including-tables table1。
--excluding-tables同期が不要なテーブル。使用方法は--including-tablesと同じです。
--mysql-confMySQL CDCSourceの設定。例:--mysql-conf hostname=127.0.0.1。MySQL-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-nameは必須です。同期するデータベースとテーブルに主キーなしのテーブルが含まれている場合、scan.incremental.snapshot.chunk.key-columnを設定する必要があり、非null型フィールドを1つだけ選択できます。例:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...。異なるデータベースとテーブルの列はコンマで区切られます。
--oracle-confOracle CDCSourceの設定。例:--oracle-conf hostname=127.0.0.1。Oracle-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--postgres-confPostgres CDCSourceの設定。例:--postgres-conf hostname=127.0.0.1。Postgres-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-name、slot.nameは必須です。
--sqlserver-confSQLServer CDCSourceの設定。例:--sqlserver-conf hostname=127.0.0.1。SQLServer-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--db2-confSQLServer CDCSourceの設定。例:--db2-conf hostname=127.0.0.1。DB2-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--mongodb-confMongoDB CDCSourceの設定。例:--mongodb-conf hosts=127.0.0.1:27017。Mongo-CDCのすべての設定はこちらで確認できます。その中で、hosts、username、password、databaseは必須です。--mongodb-conf schema.sample-percentは、Dorisでテーブルを作成するためにMongoDBデータを自動的にサンプリングする設定で、デフォルト値は0.2です。
--sink-confDoris Sinkのすべての設定はこちらで確認できます。
--table-confDorisテーブルの設定項目、つまりpropertiesに含まれる内容(table-bucketsはpropertiesの属性ではありません)。例:--table-conf replication_num=1、--table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"は正規表現の順序で異なるテーブルのバケット数を指定することを意味します。一致しない場合、BUCKETS AUTOメソッドを使用してテーブルが作成されます。
--schema-change-modeスキーマ変更を解析するモード。debezium_structureとsql_parserが含まれます。デフォルトではdebezium_structureモードが使用されます。debezium_structureモードは、上流CDCがデータを同期する際に使用されるデータ構造を解析し、この構造を解析してDDL変更操作を判断します。sql_parserモードは、上流CDCがデータを同期する際のDDL文を解析してDDL変更操作を判断するため、この解析モードはより正確です。使用例:--schema-change-mode debezium_structure。この機能は24.0.0以降のバージョンで利用可能になります。
--single-sinkすべてのテーブルを同期するために単一のSinkを使用するかどうか。有効にすると、上流で新しく作成されたテーブルも自動的に識別し、テーブルを自動作成できます。
--multi-to-one-origin複数の上流テーブルが同じテーブルに書き込まれる際のソーステーブルの設定。例:--multi-to-one-origin "a_.*|b_.*"。#208を参照してください。
--multi-to-one-targetmulti-to-one-originと組み合わせて使用し、ターゲットテーブルの設定。例:--multi-to-one-target "a|b"
--create-table-onlyテーブル構造のみを同期するかどうか。

DorisからFlinkのデータ型マッピング

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
DECIMALV2DECIMAL
ARRAYARRAY
MAPSTRING
JSONSTRING
VARIANTSTRING
IPV4STRING
IPV6STRING

FlinkからDorisのデータ型マッピング

Flink TypeDoris Type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
CHARCHAR
VARCHARVARCHAR/STRING
STRINGSTRING
DATEDATE
TIMESTAMPDATETIME
TIMESTAMP_LTZDATETIME
ARRAYARRAY
MAPMAP/JSON
ROWSTRUCT/JSON

監視メトリクス

Flinkは、Flinkクラスターの指標を監視するための複数のMetricsを提供しています。以下は、Flink Doris Connectorで新しく追加された監視メトリクスです。

NameMetric TypeDescription
totalFlushLoadBytesCounterフラッシュされインポートされた総バイト数。
flushTotalNumberRowsCounterインポートされ処理された総行数。
totalFlushLoadedRowsCounter正常にインポートされた総行数。
totalFlushTimeMsCounter正常なインポートが完了するまでの総時間。
totalFlushSucceededNumberCounterインポートが正常に完了した回数。
totalFlushFailedNumberCounterインポートが失敗した回数。
totalFlushFilteredRowsCounterデータ品質が不適格な総行数。
totalFlushUnselectedRowsCounterwhere条件でフィルタリングされた総行数。
beginTxnTimeMsHistogramFeにトランザクション開始をリクエストするのにかかる時間(ミリ秒)。
putDataTimeMsHistogramFeにインポートデータ実行プランの取得をリクエストするのにかかる時間。
readDataTimeMsHistogramデータを読み込むのにかかる時間。
writeDataTimeMsHistogramデータ書き込み操作を実行するのにかかる時間。
commitAndPublishTimeMsHistogramFeにトランザクションのコミットと公開をリクエストするのにかかる時間。
loadTimeMsHistogramインポートが完了するまでの時間。

ベストプラクティス

FlinkSQLでCDC経由でMySQLデータに迅速接続

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);

-- Supports synchronizing insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- Synchronize delete events
'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;

Flinkが部分的なカラム更新を実行する

CREATE TABLE doris_sink (
id INT,
name STRING,
bank STRING,
age int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age', -- Columns that need to be updated
'sink.properties.partial_columns' = 'true' -- Enable partial column updates
);
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.bitmap_test',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)

FlinkCDCによる主キーカラムの更新

一般的に、業務データベースでは、テーブルの主キーとして数値がよく使用されます。例えば、Studentテーブルでは、番号(id)が主キーとして使用されます。しかし、業務の発展に伴い、データに対応する番号が変更される場合があります。このシナリオでは、Flink CDC + Doris Connectorを使用してデータを同期する際、Dorisの主キーカラムのデータは自動的に更新することができます。

原理

Flink CDCの基盤となる収集ツールはDebeziumです。Debezium内部では、opフィールドを使用して対応する操作を識別します。opフィールドの値はc、u、d、rで、それぞれcreate、update、delete、readに対応します。主キーカラムの更新については、Flink CDCはDELETEとINSERTイベントを下流に送信し、データがDorisに同期された後、Dorisの主キーカラムのデータが自動的に更新されます。

使用方法

FlinkプログラムはこGCDC同期の例を参照できます。タスクの送信が成功した後、MySQL側で主キーカラムを更新するステートメントを実行し(例:update student set id = '1002' where id = '1001')、その後Dorisのデータを変更することができます。

指定されたカラムに基づくFlinkデータ削除

一般的に、Kafkaのメッセージは特定のフィールドを使用して操作タイプをマークします。例:{"op_type":"delete",data:{...}}。この種のデータに対して、op_type=deleteのデータを削除することが希望されます。

DorisSinkは、デフォルトでRowKindに従ってイベントのタイプを区別します。通常、CDCの場合、イベントタイプを直接取得でき、隠しカラム __DORIS_DELETE_SIGN__ に値を割り当てて削除の目的を達成できます。しかし、Kafkaの場合、業務ロジックに従って判断し、隠しカラムの値を明示的に渡す必要があります。

-- For example, the upstream data:{"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);

CREATE TABLE DORIS_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = '',
'sink.enable-delete' = 'false', -- false means not to obtain the event type from RowKind
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Explicitly specify the import columns of streamload
);

INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;

一般的に、MySQLなどの上流データソースを同期する際、上流でフィールドを追加または削除した場合、DorisでSchema Change操作を同期する必要があります。

このシナリオでは、通常、DataStream APIのプログラムを作成し、DorisSinkが提供するJsonDebeziumSchemaSerializerシリアライザーを使用して、自動的にSchemaChangeを実行する必要があります。詳細については、CDCSchemaChangeExample.javaを参照してください。

Connectorが提供するデータベース全体同期ツールでは、追加設定は不要で、上流のDDLが自動的に同期され、DorisでSchemaChange操作が実行されます。

よくある質問(FAQ)

  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

    Exactly-Onceシナリオでは、Flink Jobは最新のCheckpoint/Savepointから再起動する必要があります。そうしないと上記のエラーが報告されます。Exactly-Onceが不要な場合、2PC送信を無効にする(sink.enable-2pc=false)か、異なるsink.label-prefixに変更することでもこの問題を解決できます。

  2. errCode = 2, detailMessage = transaction [19650] not found

    これはCommit段階で発生します。checkpointに記録されたトランザクションIDがFE側で期限切れになっています。この時点で再度コミットすると、上記のエラーが発生します。この時点では、checkpointから開始することは不可能です。その後、fe.confstreaming_label_keep_max_second設定を変更して有効期限を延長できます。デフォルトの有効期限は12時間です。dorisバージョン2.0以降では、fe.conflabel_num_threshold設定(デフォルト2000)によっても制限され、これを増加させるか-1に変更できます(-1は時間によってのみ制限されることを意味します)。

  3. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

    これは同一データベースへの並行インポートが100を超えたためです。fe.confのパラメーターmax_running_txn_num_per_dbを調整することで解決できます。具体的な詳細については、max_running_txn_num_per_dbを参照してください。

    同時に、labelの頻繁な変更とタスクの再起動もこのエラーを引き起こす可能性があります。2pcシナリオ(Duplicate/Aggregateモデルの場合)では、各タスクのlabelは一意である必要があります。checkpointから再起動する際、Flinkタスクは正常にpre-commitされたがまだコミットされていないトランザクションを積極的に中止します。頻繁なlabel変更と再起動により、中止できない大量のpre-commit成功トランザクションが発生し、トランザクションを占有します。Uniqueモデルでは、2pcを無効にして冪等書き込みを実現することも可能です。

  4. tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

    これは通常、Connectorバージョン1.1.0以前で発生し、書き込み頻度が高すぎることでバージョン数が過剰になることが原因です。sink.batch.sizesink.batch.intervalパラメーターを設定してStreamloadの頻度を下げることができます。Connectorバージョン1.1.0以降では、デフォルトの書き込みタイミングはCheckpointによって制御され、Checkpoint間隔を増加させることで書き込み頻度を下げることができます。

  5. Flinkインポート時にダーティーデータをスキップする方法は?

    Flinkがデータをインポートする際、フィールド形式や長さの問題などのダーティーデータがあると、StreamLoadがエラーを報告します。この時、Flinkは継続的に再試行します。このようなデータをスキップする必要がある場合、StreamLoadのstrictモードを無効にする(strict_mode=falsemax_filter_ratio=1を設定)か、Sinkオペレーター前でデータをフィルタリングできます。

  6. FlinkマシンとBEマシン間のネットワークが接続されていない場合の設定方法は?

    FlinkがDorisへの書き込みを開始すると、DorisはBEへ書き込み操作をリダイレクトします。この時、返されるアドレスはBEの内部ネットワークIP、つまりshow backendsコマンドで確認できるIPです。この時点でFlinkとDorisの間にネットワーク接続がない場合、エラーが報告されます。この場合、benodesでBEの外部ネットワークIPを設定できます。

  7. stream load error: HTTP/1.1 307 Temporary Redirect

    Flinkは最初にFEにリクエストし、307を受信した後、リダイレクト後にBEにリクエストします。FEがFullGC/高負荷/ネットワーク遅延状態にある場合、HttpClientはデフォルトで一定期間(3秒)内に応答を待たずにデータを送信します。リクエストボディはデフォルトでInputStreamのため、307レスポンスを受信した際、データを再生できず直接エラーが報告されます。この問題を解決する方法は3つあります:1. Connector25.1.0以上にアップグレードしてデフォルト時間を増加させる、2. auto-redirect=falseに変更してBEに直接リクエストを開始する(一部のクラウドシナリオには適用不可)、3. unique keyモデルでbatchモードを有効にする。