メインコンテンツまでスキップ

Flink Doris コネクタ

Flink Doris Connectorは、Flinkを通じてDorisクラスターからデータを読み取り、データを書き込むために使用されます。また、FlinkCDCを統合しており、MySQLなどの上流データベースとのより便利なフルデータベース同期を可能にします。

Flink Connectorを使用すると、以下の操作を実行できます:

  • Dorisからデータを読み取り: Flink ConnectorはBEからの並列読み取りをサポートし、データ取得効率を向上させます。

  • Dorisにデータを書き込み: Flinkでバッチ処理した後、Stream Loadを使用してデータをDorisに一括でインポートします。

  • Lookup Joinを使用したディメンションテーブル結合: バッチ処理と非同期クエリによりディメンションテーブル結合を高速化します。

  • フルデータベース同期: Flink CDCを使用して、MySQL、Oracle、PostgreSQLなどのデータベース全体を同期できます。これには自動テーブル作成とDDL操作が含まれます。

バージョン説明

Connector VersionFlink VersionDoris VersionJava VersionScala Version
1.0.31.11,1.12,1.13,1.140.15+82.11,2.12
1.1.11.141.0+82.11,2.12
1.2.11.151.0+8-
1.3.01.161.0+8-
1.4.01.15 - 1.171.0+8-
1.5.21.15 - 1.181.0+8-
1.6.11.15 - 1.191.0+8-
24.0.11.15 - 1.201.0+8-
24.1.01.15 - 1.201.0+8-
25.0.01.15 - 1.201.0+8-
25.1.01.15 - 1.201.0+8-
26.0.01.15 - 1.20,2.0 - 2.21.0+8(1.x),17(2.x)-

使用方法

Flink Doris Connectorは2つの方法で使用できます:JarまたはMaven経由です。

Jar

対応するバージョンのFlink Doris Connector Jarファイルをこちらからダウンロードし、このファイルをFlinkセットアップのclasspathにコピーしてFlink-Doris-Connectorを使用できます。StandaloneモードのFlinkデプロイメントの場合、このファイルをlib/フォルダ下に配置してください。Yarnモードで実行されるFlinkクラスターの場合、ファイルを事前デプロイメントパッケージに配置してください。

Maven

Mavenで使用するには、Pomファイルに以下の依存関係を追加するだけです:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-${flink.version}</artifactId>
<version>${connector.version}</version>
</dependency>

例えば:

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.1.0</version>
</dependency>

動作原理

Dorisからのデータ読み取り

Flink Connector Principles JDBC Doris

データ読み取り時、Flink Doris ConnectorはFlink JDBC Connectorと比較してより高いパフォーマンスを提供し、使用が推奨されます:

  • Flink JDBC Connector: DorisはMySQLプロトコルと互換性がありますが、Dorisクラスターへの読み書きにFlink JDBC Connectorを使用することは推奨されません。このアプローチでは、単一のFEノードでシリアルな読み書き操作が発生し、ボトルネックとなってパフォーマンスに影響を与えます。

  • Flink Doris Connector: Doris 2.1以降、ADBCがFlink Doris Connectorのデフォルトプロトコルです。読み取りプロセスは以下の手順に従います:

    a. Flink Doris Connectorは最初にクエリプランに基づいてFEからTablet ID情報を取得します。

    b. クエリステートメントを生成します:SELECT * FROM tbs TABLET(id1, id2, id3)

    c. クエリはFEのADBCポートを通じて実行されます。

    d. データはBEから直接返され、FEをバイパスして単一点ボトルネックを解消します。

Dorisへのデータ書き込み

データ書き込みにFlink Doris Connectorを使用する場合、Stream Load経由でバルクインポートする前にFlinkのメモリ内でバッチ処理が実行されます。Doris Flink Connectorは2つのバッチモードを提供し、Flink Checkpointベースのストリーミング書き込みがデフォルトです:

ストリーミング書き込みバッチ書き込み
トリガー条件Flink Checkpointに依存し、Flinkのチェックポイントサイクルに従ってDorisに書き込みコネクター定義の時間またはデータ量しきい値に基づく定期送信
一貫性Exactly-OnceAt-Least-Once;プライマリキーモデルでExactly-Onceを保証可能
レイテンシーFlinkチェックポイント間隔により制限され、一般的により高い柔軟な調整が可能な独立バッチメカニズム
耐障害性と復旧Flink状態復旧と完全に一致外部重複排除ロジック(例:Dorisプライマリキー重複排除)に依存

クイックスタート

準備

Flinkクラスターデプロイメント

Standaloneクラスターを例として:

  1. Flinkインストールパッケージをダウンロードします。例:Flink 1.18.1
  2. 展開後、Flink Doris Connectorパッケージを<FLINK_HOME>/libに配置します;
  3. <FLINK_HOME>ディレクトリに移動し、bin/start-cluster.shを実行してFlinkクラスターを開始します;
  4. jpsコマンドを使用してFlinkクラスターが正常に開始されたかを確認できます。

Dorisテーブルの初期化

以下のステートメントを実行してDorisテーブルを作成します:

CREATE DATABASE test;

CREATE TABLE test.student (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

INSERT INTO test.student values(1,"James",18);
INSERT INTO test.student values(2,"Emily",28);

CREATE TABLE test.student_trans (
`id` INT,
`name` VARCHAR(256),
`age` INT
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

FlinkSQLタスクの実行

FlinkSQLクライアントの開始

bin/sql-client.sh

FlinkSQLの実行

CREATE TABLE Student (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

CREATE TABLE StudentTrans (
id STRING,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.student_trans',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label'
);

INSERT INTO StudentTrans SELECT id, concat('prefix_',name), age+1 FROM Student;

クエリデータ

mysql> select * from test.student_trans;
+------+--------------+------+
| id | name | age |
+------+--------------+------+
| 1 | prefix_James | 19 |
| 2 | prefix_Emily | 29 |
+------+--------------+------+
2 rows in set (0.02 sec)

シナリオと操作

Dorisからのデータ読み取り

FlinkがDorisからデータを読み取る際、Doris Sourceは現在有界ストリームであり、CDC方式での継続的な読み取りをサポートしていません。DorisからのデータはThriftまたはArrowFlightSQL(バージョン24.0.0以降でサポート)を使用して読み取ることができます。バージョン2.1以降では、ArrowFlightSQLが推奨されるアプローチです。

  • Thrift: BEのThriftインターフェースを呼び出してデータを読み取ります。詳細な手順については、Reading Data via Thrift Interfaceを参照してください。
  • ArrowFlightSQL: Doris 2.1に基づき、この方法ではArrow Flight SQLプロトコルを使用して大量のデータを高速で読み取ることができます。詳細については、High-speed Data Transfer via Arrow Flight SQLを参照してください。

FlinkSQLを使用したデータ読み取り

Thrift方式
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030', -- Fe的host:HttpPort
'table.identifier' = 'test.student',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;
ArrowFlightSQL
CREATE TABLE student (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '{fe.conf:http_port}',
'table.identifier' = 'test.student',
'source.use-flight-sql' = 'true',
'source.flight-sql-port' = '{fe.conf:arrow_flight_sql_port}',
'username' = 'root',
'password' = ''
);

SELECT * FROM student;

DataStream APIを使用したデータの読み取り

DataStream APIを使用してデータを読み取る場合は、「使用方法」セクションで説明されているように、事前にプログラムのPOMファイルに依存関係を含める必要があります。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");

完全なコードについては、DorisSourceDataStream.javaを参照してください。

DorisへのデータWrite

FlinkはStream Load方式を使用してDorisにデータを書き込み、ストリーミングとバッチ挿入の両方のモードをサポートしています。

ストリーミングとバッチ挿入の違い

Connector 1.5.0以降、バッチ挿入がサポートされています。バッチ挿入はCheckpointに依存せず、データをメモリにバッファリングし、バッチパラメータに基づいて書き込みタイミングを制御します。ストリーミング挿入ではCheckpointを有効にする必要があり、Checkpoint期間全体を通じて上流データを継続的にDorisに書き込み、データをメモリに継続的に保持することはありません。

FlinkSQLを使用したデータWrite

テスト用に、FlinkのDatagenを使用して、継続的に生成される上流データをシミュレートします。

-- enable checkpoint
SET 'execution.checkpointing.interval' = '30s';

CREATE TABLE student_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '20',
'fields.id.min' = '1',
'fields.id.max' = '100000',
'fields.age.min' = '3',
'fields.age.max' = '30'
);

-- doris sink
CREATE TABLE student_sink (
id INT,
name STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = '10.16.10.6:28737',
'table.identifier' = 'test.student',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
--'sink.enable.batch-mode' = 'true' Adding this configuration enables batch writing
);

INSERT INTO student_sink SELECT * FROM student_source;

DataStream APIを使用したデータ書き込み

DataStream APIを使用してデータを書き込む際、異なるシリアライゼーション手法を使用して、Dorisテーブルに書き込む前にアップストリームデータをシリアライズできます。

備考

ConnectorにはすでにHttpClient4.5.13バージョンが含まれています。プロジェクトでHttpClientを個別に参照する場合は、バージョンの整合性を確保する必要があります。

標準String形式

アップストリームデータがCSVまたはJSON形式の場合、SimpleStringSerializerを直接使用してデータをシリアライズできます。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();

DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

Properties properties = new Properties();
// When the upstream data is in json format, the following configuration needs to be enabled
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");

// When writing csv data from the upstream, the following configurations need to be enabled
//properties.setProperty("format", "csv");
//properties.setProperty("column_separator", ",");

DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setDeletable(false)
//.setBatchMode(true) Enable batch writing
.setStreamLoadProp(properties)
.build();

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisOptions);

List<String> data = new ArrayList<>();
data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");

env.fromCollection(data).sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExample.javaを参照してください。

RowData Format

RowDataはFlinkの内部フォーマットです。上流データがRowData形式の場合、RowDataSerializerを使用してデータをシリアライズする必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setParallelism(1);

DorisSink.Builder<RowData> builder = DorisSink.builder();

Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
// When writing json data from the upstream, the following configuration needs to be enabled
// properties.setProperty("read_json_by_line", "true");
// properties.setProperty("format", "json");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setDeletable(false).setStreamLoadProp(properties);

// flink rowdata‘s schema
String[] fields = {"id","name", "age"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()};

builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to rowdata
.setType(LoadConstants.CSV)
.setFieldDelimiter(",")
.setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());

// mock rowdata source
DataStream<RowData> source =
env.fromElements("")
.flatMap(
new FlatMapFunction<String, RowData>() {
@Override
public void flatMap(String s, Collector<RowData> out)
throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, 1);
genericRowData.setField(1, StringData.fromString("Michael"));
genericRowData.setField(2, 18);
out.collect(genericRowData);

GenericRowData genericRowData2 = new GenericRowData(3);
genericRowData2.setField(0, 2);
genericRowData2.setField(1, StringData.fromString("David"));
genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});

source.sinkTo(builder.build());
env.execute("doris test");

完全なコードについては、DorisSinkExampleRowData.javaを参照してください。

Debezium Format

FlinkCDCからのデータやKafkaのDebezium formatなど、Debezium形式の上流データについては、JsonDebeziumSchemaSerializerを使用してデータをシリアライズできます。

// enable checkpoint
env.enableCheckpointing(10000);

Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("").build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix")
.setStreamLoadProp(props)
.setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.sinkTo(builder.build());

完全なコードについては、CDCSchemaChangeExample.javaを参照してください。

マルチテーブル書き込みフォーマット

現在、DorisSinkは単一のSinkで複数のテーブルの同期をサポートしています。データとデータベース/テーブル情報の両方をSinkに渡し、RecordWithMetaSerializerを使用してシリアライズする必要があります。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("10.16.10.6:28737")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

executionBuilder
.setLabelPrefix("label-doris")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBatchMode(true);

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

RecordWithMeta record = new RecordWithMeta("test", "student_1", "1,David,18");
RecordWithMeta record1 = new RecordWithMeta("test", "student_2", "1,Jack,28");
env.fromCollection(Arrays.asList(record, record1)).sinkTo(builder.build());

完全なコードについては、DorisSinkMultiTableExample.javaを参照してください。

Lookup Join

Lookup Joinを使用することで、FlinkでのディメンションテーブルのJOINを最適化できます。ディメンションテーブルのJOINにFlink JDBC Connectorを使用する場合、以下の問題が発生する可能性があります:

  • Flink JDBC Connectorは同期クエリモードを使用するため、上流データ(Kafkaなど)がレコードを送信した後、即座にDorisディメンションテーブルをクエリします。これにより、高同時実行シナリオでクエリレイテンシが高くなります。

  • JDBCを介して実行されるクエリは通常レコード毎のポイントルックアップですが、Dorisでは効率向上のためバッチクエリを推奨しています。

Flink Doris ConnectorでディメンションテーブルのJOINにLookup Joinを使用することで、以下の利点が得られます:

  • 上流データのバッチキャッシュにより、レコード毎のクエリによる高レイテンシとデータベース負荷を回避します。

  • JOINクエリの非同期実行により、データスループットを向上させ、Dorisへのクエリ負荷を軽減します。

CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);

create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

全データベース同期

Flink Doris ConnectorはFlink CDC (Flink CDC Documentation)を統合し、MySQLなどのリレーショナルデータベースをDorisに同期することを容易にします。この統合には、自動テーブル作成、スキーマ変更なども含まれます。同期がサポートされているデータベースには、MySQL、Oracle、PostgreSQL、SQLServer、MongoDB、およびDB2があります。

Note
  1. 全データベース同期を使用する場合、$FLINK_HOME/libディレクトリに対応するFlink CDC依存関係(Fat Jar)を追加する必要があります。例:flink-sql-connector-mysql-cdc-version.jarflinksqlconnectororaclecdc{version}.jar**、**flink-sql-connector-oracle-cdc-{version}.jar。FlinkCDCバージョン3.1以降は以前のバージョンと互換性がありません。依存関係は以下のリンクからダウンロードできます:FlinkCDC 3.xFlinkCDC 2.x
  2. Connector 24.0.0以降のバージョンでは、必要なFlink CDCバージョンは3.1以上である必要があります。ここからダウンロードできます。Flink CDCを使用してMySQLおよびOracleを同期する場合、$FLINK_HOME/libの下に関連するJDBCドライバも追加する必要があります。

MySQL全データベース同期

Flinkクラスターを開始した後、以下のコマンドを直接実行できます:

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

Oracle データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

PostgreSQL 全データベース同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

SQLServer データベース全体の同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.16-24.0.1.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

DB2 データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
db2-sync-database \
--database db2_test \
--db2-conf hostname=127.0.0.1 \
--db2-conf port=50000 \
--db2-conf username=db2inst1 \
--db2-conf password=doris123456 \
--db2-conf database-name=testdb \
--db2-conf schema-name=DB2INST1 \
--including-tables "FULL_TYPES|CUSTOMERS" \
--single-sink true \
--use-new-schema-change true \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

MongoDB 全データベース同期

<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.18-24.0.1.jar \
mongodb-sync-database \
--database doris_db \
--schema-change-mode debezium_structure \
--mongodb-conf hosts=127.0.0.1:27017 \
--mongodb-conf username=flinkuser \
--mongodb-conf password=flinkpwd \
--mongodb-conf database=test \
--mongodb-conf scan.startup.mode=initial \
--mongodb-conf schema.sample-percent=0.2 \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--sink-conf sink.enable-2pc=false \
--table-conf replication_num=1

AWS Aurora MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

AWS RDS MySQL データベース全体同期

<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.18-25.0.0.jar \
mysql-sync-database \
--database testwd \
--mysql-conf hostname=xxx.ap-southeast-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--mysql-conf server-time-zone=UTC \
--including-tables "student" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password= \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1

使用手順

パラメータ設定

一般設定項目

Keyデフォルト値必須コメント
fenodes--YDoris FE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。
benodes--NDoris BE httpアドレス。複数のアドレスがサポートされており、カンマで区切る必要があります。
jdbc-url--NJDBC接続情報(例:jdbc:mysql://127.0.0.1:9030)。
table.identifier--YDorisテーブル名(例:db.tbl)。
username--YDorisにアクセスするためのユーザー名。
password--YDorisにアクセスするためのパスワード。
auto-redirectTRUENStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはFE経由で書き込みを行い、明示的にBE情報を取得しなくなります。
doris.request.retries3NDorisへのリクエスト送信のリトライ回数。
doris.request.connect.timeout30sNDorisへのリクエスト送信時の接続タイムアウト。
doris.request.read.timeout30sNDorisへのリクエスト送信時の読み取りタイムアウト。

Source設定

Keyデフォルト値必須コメント
doris.request.query.timeout21600sNDorisクエリのタイムアウト。デフォルト値は6時間です。
doris.request.tablet.size1N1つのPartitionに対応するDoris Tabletsの数。この値を小さく設定するほど、より多くのPartitionが生成され、Flink側での並列性を高めることができます。ただし、Dorisにより多くの負荷がかかります。
doris.batch.size4064NBEから一度に読み取る行の最大数。この値を増やすことで、FlinkとDoris間で確立される接続数を減らし、ネットワークレイテンシによって発生する追加の時間オーバーヘッドを削減できます。
doris.exec.mem.limit8192mbN単一クエリのメモリ制限。デフォルトは8GB、バイト単位。
source.use-flight-sqlFALSEN読み取りにArrow Flight SQLを使用するかどうか。
source.flight-sql-port-NArrow Flight SQLを使用して読み取る際のFEのarrow_flight_sql_port。

DataStream固有の設定

Keyデフォルト値必須コメント
doris.read.field--NDorisテーブルを読み取るためのカラム名のリスト。複数のカラムはカンマで区切る必要があります。
doris.filter.query--N読み取りデータをフィルタリングするための式。この式はDorisに渡されます。Dorisはこの式を使用してソースデータのフィルタリングを完了します。例:age=18。

Sink設定

Keyデフォルト値必須コメント
sink.label-prefix--YStream loadインポートに使用されるラベルプレフィックス。2pcシナリオでは、FlinkのEOSセマンティクスを保証するためにグローバルに一意である必要があります。
sink.properties.*--NStream Loadのインポートパラメータ。例:'sink.properties.column_separator' = ', 'はカラム区切り文字を定義し、'sink.properties.escape_delimiters' = 'true'は\x01のような区切り文字としての特殊文字がバイナリ0x01に変換されることを意味します。JSON形式のインポートの場合、'sink.properties.format' = 'json'、'sink.properties.read_json_by_line' = 'true'。詳細なパラメータについてはこちらを参照してください。Group Commitモードの場合、例:'sink.properties.group_commit' = 'sync_mode'はグループコミットを同期モードに設定します。Flinkコネクタはバージョン1.6.2からインポート設定グループコミットをサポートしています。詳細な使用方法と制限については、group commitを参照してください。
sink.enable-deleteTRUEN削除を有効にするかどうか。このオプションはDorisテーブルでバッチ削除機能が有効になっている必要があり(Doris 0.15+バージョンではデフォルトで有効)、Uniqueモデルのみサポートします。
sink.enable-2pcTRUEN2段階コミット(2pc)を有効にするかどうか。デフォルトはtrueで、Exactly-Onceセマンティクスを保証します。2段階コミットの詳細については、こちらを参照してください。
sink.buffer-size1MBN書き込みデータキャッシュバッファのサイズ(バイト単位)。変更は推奨されず、デフォルト設定を使用できます。
sink.buffer-count3N書き込みデータキャッシュバッファの数。変更は推奨されず、デフォルト設定を使用できます。
sink.max-retries3NCommit失敗後の最大リトライ回数。デフォルトは3回。
sink.enable.batch-modeFALSENDorisへの書き込みにバッチモードを使用するかどうか。有効にすると、書き込みタイミングはCheckpointに依存せず、sink.buffer-flush.max-rows、sink.buffer-flush.max-bytes、sink.buffer-flush.intervalなどのパラメータによって制御されます。同時に、有効にするとExactly-onceセマンティクスは保証されませんが、UniqモデルのサポートによりIdempotencyを実現できます。
sink.flush.queue-size2Nバッチモードでのキャッシュキューのサイズ。
sink.buffer-flush.max-rows500000Nバッチモードで単一バッチで書き込む行の最大数。
sink.buffer-flush.max-bytes100MBNバッチモードで単一バッチで書き込むバイトの最大数。
sink.buffer-flush.interval10sNバッチモードでキャッシュを非同期でフラッシュする間隔。
sink.ignore.update-beforeTRUENupdate-beforeイベントを無視するかどうか。デフォルトでは無視されます。

Lookup Join設定

Keyデフォルト値必須コメント
lookup.cache.max-rows-1Nlookupキャッシュの最大行数。デフォルト値は-1で、キャッシュが有効でないことを意味します。
lookup.cache.ttl10sNlookupキャッシュの最大時間。デフォルトは10秒。
lookup.max-retries1Nlookupクエリ失敗後のリトライ回数。
lookup.jdbc.asyncFALSEN非同期lookupを有効にするかどうか。デフォルトはfalse。
lookup.jdbc.read.batch.size128N非同期lookupでの各クエリの最大バッチサイズ。
lookup.jdbc.read.batch.queue-size256N非同期lookup中の中間バッファキューのサイズ。
lookup.jdbc.read.thread-size3N各タスクでのlookup用jdbcスレッドの数。

データベース全体同期設定

構文

<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.6.1.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]

設定

KeyComment
--job-nameFlinkタスクの名前。オプションです。
--databaseDorisに同期するデータベースの名前。
--table-prefixDorisテーブルのプレフィックス名。例:--table-prefix ods_。
--table-suffixDorisテーブルのサフィックス名。プレフィックスと同様です。
--including-tables同期する必要があるMySQLテーブル。複数のテーブルは|で区切ることができ、正規表現がサポートされています。例:--including-tables table1。
--excluding-tables同期する必要がないテーブル。使用方法は--including-tablesと同じです。
--mysql-confMySQL CDCSourceの設定。例:--mysql-conf hostname=127.0.0.1。MySQL-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-nameは必須です。同期するデータベースとテーブルに主キーのないテーブルが含まれている場合、scan.incremental.snapshot.chunk.key-columnを設定する必要があり、非null型のフィールドを1つだけ選択できます。例:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...、異なるデータベースとテーブルの列はカンマで区切ります。
--oracle-confOracle CDCSourceの設定。例:--oracle-conf hostname=127.0.0.1。Oracle-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--postgres-confPostgres CDCSourceの設定。例:--postgres-conf hostname=127.0.0.1。Postgres-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-name、slot.nameは必須です。
--sqlserver-confSQLServer CDCSourceの設定。例:--sqlserver-conf hostname=127.0.0.1。SQLServer-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--db2-confSQLServer CDCSourceの設定。例:--db2-conf hostname=127.0.0.1。DB2-CDCのすべての設定はこちらで確認できます。その中で、hostname、username、password、database-name、schema-nameは必須です。
--sink-confDoris Sinkのすべての設定はこちらで確認できます。
--mongodb-confMongoDB CDCSourceの設定。例:--mongodb-conf hosts=127.0.0.1:27017。Mongo-CDCのすべての設定はこちらで確認できます。その中で、hosts、username、password、databaseは必須です。--mongodb-conf schema.sample-percentは、MongoDBデータを自動的にサンプリングしてDorisにテーブルを作成するための設定で、デフォルト値は0.2です。
--table-confDorisテーブルの設定項目、つまりpropertiesに含まれる内容(table-bucketsはpropertiesの属性ではありません)。例:--table-conf replication_num=1、--table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"は、正規表現の順序で異なるテーブルのバケット数を指定することを意味します。マッチしない場合は、BUCKETS AUTOメソッドを使用してテーブルが作成されます。
--schema-change-modeスキーマ変更を解析するモード。debezium_structureとsql_parserがあります。デフォルトではdebezium_structureモードが使用されます。debezium_structureモードは、上流CDCがデータを同期する際に使用されるデータ構造を解析し、この構造を解析してDDL変更操作を判定します。sql_parserモードは、上流CDCがデータを同期する際のDDL文を解析してDDL変更操作を判定するため、この解析モードはより正確です。使用例:--schema-change-mode debezium_structure。この機能は24.0.0以降のバージョンで利用可能になります。
--single-sinkすべてのテーブルを同期するために単一のSinkを使用するかどうか。有効にすると、上流で新しく作成されたテーブルを自動的に識別し、テーブルを自動作成することもできます。
--multi-to-one-origin複数の上流テーブルを同じテーブルに書き込む場合のソーステーブルの設定。例:--multi-to-one-origin "a_.*|b_.*"、#208を参照してください
--multi-to-one-targetmulti-to-one-originと組み合わせて使用する、ターゲットテーブルの設定。例:--multi-to-one-target "a|b"
--create-table-onlyテーブルの構造のみを同期するかどうか。

DorisからFlinkへのデータ型マッピング

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
STRINGSTRING
DECIMALV2DECIMAL
ARRAYARRAY
MAPSTRING
JSONSTRING
VARIANTSTRING
IPV4STRING
IPV6STRING

FlinkからDorisへのデータ型マッピング

Flink TypeDoris Type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
CHARCHAR
VARCHARVARCHAR/STRING
STRINGSTRING
DATEDATE
TIMESTAMPDATETIME
TIMESTAMP_LTZDATETIME
ARRAYARRAY
MAPMAP/JSON
ROWSTRUCT/JSON

監視メトリクス

Flinkは、Flinkクラスターの指標を監視するための複数のMetricsを提供しています。以下は、Flink Doris Connectorに新しく追加された監視メトリクスです。

NameMetric TypeDescription
totalFlushLoadBytesCounterフラッシュおよびインポートされた総バイト数。
flushTotalNumberRowsCounterインポートおよび処理された総行数。
totalFlushLoadedRowsCounter正常にインポートされた総行数。
totalFlushTimeMsCounter正常なインポートが完了するまでにかかった総時間。
totalFlushSucceededNumberCounterインポートが正常に完了した回数。
totalFlushFailedNumberCounterインポートが失敗した回数。
totalFlushFilteredRowsCounterデータ品質が不適格な行の総数。
totalFlushUnselectedRowsCounterwhere条件によってフィルタリングされた行の総数。
beginTxnTimeMsHistogramFeにトランザクションの開始をリクエストするのにかかった時間(ミリ秒)。
putDataTimeMsHistogramFeにインポートデータ実行計画の取得をリクエストするのにかかった時間。
readDataTimeMsHistogramデータを読み取るのにかかった時間。
writeDataTimeMsHistogramデータ書き込み操作を実行するのにかかった時間。
commitAndPublishTimeMsHistogramFeにトランザクションのコミットと公開をリクエストするのにかかった時間。
loadTimeMsHistogramインポートが完了するまでにかかった時間。

ベストプラクティス

FlinkSQLによるCDC経由でのMySQLデータへの高速接続

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);

-- Supports synchronizing insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- Synchronize delete events
'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;

Flinkは部分的な列更新を実行する

CREATE TABLE doris_sink (
id INT,
name STRING,
bank STRING,
age int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age', -- Columns that need to be updated
'sink.properties.partial_columns' = 'true' -- Enable partial column updates
);
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.bitmap_test',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)

FlinkCDC キーカラムの更新

一般的に、ビジネスデータベースでは、数値がテーブルの主キーとしてよく使用されます。例えば、Studentテーブルでは、番号(id)が主キーとして使用されます。しかし、ビジネスの発展に伴い、データに対応する番号が変更される場合があります。このシナリオでは、Flink CDC + Doris Connectorを使用してデータを同期する際、Doris内の主キーカラムのデータを自動的に更新することができます。

原理

Flink CDCの基盤となる収集ツールはDebeziumです。Debeziumは内部的にopフィールドを使用して対応する操作を識別します。opフィールドの値は c、u、d、r であり、それぞれ create、update、delete、read に対応します。主キーカラムの更新については、Flink CDCは下流にDELETEおよびINSERTイベントを送信し、データがDorisに同期された後、Doris内の主キーカラムのデータが自動的に更新されます。

使用方法

Flinkプログラムは上記のCDC同期例を参照できます。タスクの送信が成功した後、MySQL側で主キーカラムを更新するステートメントを実行し(例:update student set id = '1002' where id = '1001')、その後Doris内のデータを変更できます。

Flinkによる指定カラムに基づくデータ削除

一般的に、Kafka内のメッセージは特定のフィールドを使用して操作タイプをマークします。例えば、{"op_type":"delete",data:{...}}のようなデータです。このような種類のデータについては、op_type=deleteのデータを削除したいと考えられます。

DorisSinkは、デフォルトでRowKindに従ってイベントのタイプを区別します。通常、CDCの場合、イベントタイプを直接取得でき、隠しカラム __DORIS_DELETE_SIGN__ に値を割り当てて削除の目的を達成できます。しかし、Kafkaの場合、ビジネスロジックに従って判断し、隠しカラムの値を明示的に渡す必要があります。

-- For example, the upstream data:{"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);

CREATE TABLE DORIS_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = '',
'sink.enable-delete' = 'false', -- false means not to obtain the event type from RowKind
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Explicitly specify the import columns of streamload
);

INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;

一般的に、MySQLなどの上流データソースを同期する際、上流でフィールドの追加や削除を行った場合、DorisでSchema Change操作を同期する必要があります。

このシナリオでは、通常DataStream APIのプログラムを作成し、DorisSinkが提供するJsonDebeziumSchemaSerializerシリアライザーを使用してSchemaChangeを自動的に実行する必要があります。詳細については、CDCSchemaChangeExample.javaを参照してください。

Connectorが提供する全データベース同期ツールでは、追加設定は不要で、上流のDDLが自動的に同期され、DorisでSchemaChange操作が実行されます。

よくある質問 (FAQ)

  1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

    Exactly-Onceシナリオでは、Flink Jobは最新のCheckpoint/Savepointから再起動する必要があります。そうしないと上記のエラーが報告されます。Exactly-Onceが不要な場合、この問題は2PC送信を無効にする(sink.enable-2pc=false)か、異なるsink.label-prefixに変更することでも解決できます。

  2. errCode = 2, detailMessage = transaction [19650] not found

    これはCommitステージで発生します。checkpointに記録されたトランザクションIDがFE側で期限切れになっています。この時点で再度コミットしようとすると、上記のエラーが発生します。この時点では、checkpointから開始することはできません。その後、fe.confstreaming_label_keep_max_second設定を変更して期限切れ時間を延長できます。デフォルトの期限切れ時間は12時間です。dorisバージョン2.0以降では、fe.conflabel_num_threshold設定(デフォルト2000)によっても制限され、これを増加させるか-1に変更できます(-1は時間によってのみ制限されることを意味します)。

  3. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

    これは同一データベースへの並行インポートが100を超えているためです。fe.confmax_running_txn_num_per_dbパラメータを調整することで解決できます。詳細については、max_running_txn_num_per_dbを参照してください。

    同時に、labelの頻繁な変更とタスクの再起動もこのエラーを引き起こす可能性があります。2pcシナリオ(Duplicate/Aggregateモデル用)では、各タスクのlabelは一意である必要があります。そして、checkpointから再起動する際、Flinkタスクは事前コミットが成功したがまだコミットされていないトランザクションを積極的に中止します。labelの頻繁な変更と再起動により、中止できない事前コミット成功トランザクションが大量に発生し、トランザクションを占有します。Uniqueモデルでは、2pcを無効にしてべき等書き込みを実現することもできます。

  4. tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235

    これは通常Connectorバージョン1.1.0以前で発生し、書き込み頻度が高すぎてバージョン数が過多になることが原因です。sink.batch.sizesink.batch.intervalパラメータを設定してStreamloadの頻度を下げることができます。Connectorバージョン1.1.0以降では、デフォルトの書き込みタイミングはCheckpointによって制御され、Checkpoint間隔を増加させることで書き込み頻度を下げることができます。

  5. Flinkインポート時にダーティデータをスキップするにはどうすればよいですか?

    Flinkがデータをインポートする際、フィールド形式や長さの問題などダーティデータがあると、StreamLoadがエラーを報告します。この時、Flinkは再試行を続けます。このようなデータをスキップする必要がある場合、StreamLoadのstrict modeを無効にする(strict_mode=falsemax_filter_ratio=1を設定)か、Sink演算子の前でデータをフィルタリングできます。

  6. FlinkマシンとBEマシン間のネットワークが接続されていない場合、どのように設定すればよいですか?

    FlinkがDorisへの書き込みを開始すると、DorisはBEに書き込み操作をリダイレクトします。この時、返されるアドレスはBEの内部ネットワークIPで、show backendsコマンドで確認できるIPです。この時FlinkとDorisにネットワーク接続がない場合、エラーが報告されます。この場合、benodesでBEの外部ネットワークIPを設定できます。

  7. stream load error: HTTP/1.1 307 Temporary Redirect

    Flinkは最初にFEにリクエストし、307を受信後にリダイレクト後にBEにリクエストします。FEがFullGC/高負荷/ネットワーク遅延状態にある場合、HttpClientはデフォルトで一定期間(3秒)内にレスポンスを待たずにデータを送信します。リクエストボディがデフォルトでInputStreamであるため、307レスポンスを受信した際、データを再生できずに直接エラーが報告されます。この問題を解決する方法は3つあります:1. Connector25.1.0以上にアップグレードしてデフォルト時間を増加;2. auto-redirect=falseに変更してBEに直接リクエストを開始(一部のクラウドシナリオには適用不可);3. unique keyモデルでbatch modeを有効化。