メインコンテンツまでスキップ

Spark Doris Connector

Spark Doris Connectorは、Sparkを通じてDorisに保存されたデータの読み込みと、Sparkを通じてDorisへのデータ書き込みをサポートします。

コードリポジトリ:https://github.com/apache/doris-spark-connector

  • RDDDataFrame、およびSpark SQLメソッドを通じたDorisからのバッチデータ読み込みをサポートします。DataFrameまたはSpark SQLの使用を推奨します。
  • DataFrameSpark SQLを使用したDorisへのバッチまたはストリーミングデータ書き込みをサポートします。
  • データ転送量を削減するため、Doris側でのデータフィルタリングをサポートします。

バージョン互換性

ConnectorSparkDorisJavaScala
25.2.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.1.03.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.13.5 - 3.1, 2.41.0 +82.12, 2.11
25.0.03.5 - 3.1, 2.41.0 +82.12, 2.11
1.3.23.4 - 3.1, 2.4, 2.31.0 - 2.1.682.12, 2.11
1.3.13.4 - 3.1, 2.4, 2.31.0 - 2.1.082.12, 2.11
1.3.03.4 - 3.1, 2.4, 2.31.0 - 2.1.082.12, 2.11
1.2.03.2, 3.1, 2.31.0 - 2.0.282.12, 2.11
1.1.03.2, 3.1, 2.31.0 - 1.2.882.12, 2.11
1.0.13.1, 2.30.12 - 0.1582.12, 2.11

使用方法

Maven

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-spark-3.5</artifactId>
<version>25.2.0</version>
</dependency>
ヒント

バージョン 24.0.0 以降、Doris Connector パッケージの命名規則が調整されました:

  1. Scala バージョン情報は含まれなくなりました。
  2. Spark 2.x バージョンについては、spark-doris-connector-spark-2 という名前のパッケージを統一的に使用します。これはデフォルトで Scala 2.11 に基づいてコンパイルされています。Scala 2.12 バージョンが必要な場合は、ご自身でコンパイルしてください。
  3. Spark 3.x バージョンについては、特定の Spark バージョンに応じて spark-doris-connector-spark-3.x という名前のパッケージを使用します。Spark 3.0 の場合、spark-doris-connector-spark-3.1 パッケージを使用できます。

注意事項

  1. 異なる Spark および Scala バージョンに応じて、対応する Connector バージョンに置き換えてください。
  2. 関連するバージョンの jar パッケージはこちらからもダウンロードできます。

コンパイル

ソースコードディレクトリで sh build.sh を実行し、プロンプトに従ってコンパイルに必要な Scala および Spark バージョンを入力します。

コンパイルが成功すると、dist ディレクトリにターゲット jar パッケージが生成されます(例:spark-doris-connector-spark-3.5-25.2.0.jar)。このファイルを SparkClassPath にコピーして Spark-Doris-Connector を使用します。

例えば、Local モードで実行される Spark の場合、このファイルを jars/ フォルダに置きます。Yarn クラスターモードで実行される Spark の場合、このファイルを事前デプロイパッケージに置きます。

例えば、spark-doris-connector-spark-3.5-25.2.0.jar を HDFS にアップロードし、HDFS 上の jar パッケージパスを spark.yarn.jars パラメータに追加します:

# 1. Upload spark-doris-connector-spark-3.5-25.2.0.jar to HDFS
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.2.0.jar /spark-jars/

# 2. Add spark-doris-connector-spark-3.5-25.2.0.jar dependency in the cluster
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.2.0.jar

使用例

バッチ読み取り

RDD

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()

DataFrame

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

pySpark

dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
# show 5 lines data
dorisSparkDF.show(5)

Arrow Flight SQLを使用した読み取り

バージョン24.0.0から、Arrow Flight SQLを使用したデータ読み取りがサポートされています(Dorisバージョン >= 2.1.0が必要)。

doris.read.modearrowに設定し、doris.read.arrow-flight-sql.portをFEで設定されたArrow Flight SQLポートに設定してください。サーバー設定については、Arrow Flight SQLベースの高速データ転送を参照してください。

val df = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.user", "$YOUR_DORIS_USERNAME")
.option("doris.password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.mode", "arrow")
.option("doris.read.arrow-flight-sql.port", "12345")
.load()

df.show()

バッチ書き込み

DataFrame

val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)

mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// Other options
// Specify columns to write
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
// Starting from version 1.3.0, overwrite write is supported
// .mode(SaveMode.Overwrite)
.save()

Spark SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);

INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...);
-- insert into select
INSERT INTO spark_doris SELECT * FROM YOUR_TABLE;
-- insert overwrite
INSERT OVERWRITE SELECT * FROM YOUR_TABLE;

ストリーミング書き込み

DataFrame

構造化データ書き込み
val df = spark.readStream.format("your_own_stream_source").load()

df.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
Direct Write

データストリームの最初の列がDorisテーブル構造に準拠している場合、同じ列順序のCSVデータや一貫したフィールド名を持つJSONデータなど、doris.sink.streaming.passthroughオプションをtrueに設定することで、この列のデータをDataFrameに変換することなく直接書き込むことができます。

Kafkaソースを例にとると:

書き込み対象のテーブル構造は以下のとおりです:

CREATE TABLE `t2` (
`c0` int NULL,
`c1` varchar(10) NULL,
`c2` date NULL
) ENGINE=OLAP
DUPLICATE KEY(`c0`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

メッセージの値はJSON形式です:{"c0":1,"c1":"a","dt":"2024-01-01"}

val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.load()

// Select value as the first column of DataFrame
kafkaSource.selectExpr("CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
// Setting this option to true will directly write the first column of DataFrame
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
.start()
.awaitTermination()

JSON形式での書き込み

doris.sink.properties.formatjsonに設定します。

val df = spark.readStream.format("your_own_stream_source").load()

df.write.format("doris")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.sink.properties.format", "json")
.save()

Spark Doris Catalog

バージョン24.0.0以降、Spark Catalogを通じたDorisへのアクセスがサポートされています。

Catalog Config

Option NameRequiredComment
spark.sql.catalog.your_catalog_nameYesCatalogプロバイダーのクラス名を設定します。Dorisの場合、有効な値はorg.apache.doris.spark.catalog.DorisTableCatalogのみです。
spark.sql.catalog.your_catalog_name.doris.fenodesYesDoris FEノードをfe_ip:fe_http_portの形式で設定します。
spark.sql.catalog.your_catalog_name.doris.query.portNoDoris FEクエリポートを設定します。このオプションはspark.sql.catalog.your_catalog_name.doris.fe.auto.fetchがtrueの場合は省略できます。
spark.sql.catalog.your_catalog_name.doris.userYesDorisユーザーを設定します。
spark.sql.catalog.your_catalog_name.doris.passwordYesDorisパスワードを設定します。
spark.sql.defaultCatalogNoSpark SQLデフォルトカタログを設定します。
ヒント

DataFrameおよびSpark SQLに適用可能なすべてのコネクターパラメータをカタログに設定できます。
たとえば、json形式でデータを書き込みたい場合は、オプションspark.sql.catalog.your_catalog_name.doris.sink.properties.formatjsonに設定できます。

DataFrame

val conf = new SparkConf()
conf.set("spark.sql.catalog.your_catalog_name", "org.apache.doris.spark.catalog.DorisTableCatalog")
conf.set("spark.sql.catalog.your_catalog_name.doris.fenodes", "192.168.0.1:8030")
conf.set("spark.sql.catalog.your_catalog_name.doris.query.port", "9030")
conf.set("spark.sql.catalog.your_catalog_name.doris.user", "root")
conf.set("spark.sql.catalog.your_catalog_name.doris.password", "")
val spark = builder.config(conf).getOrCreate()
spark.sessionState.catalogManager.setCurrentCatalog("your_catalog_name")

// show all databases
spark.sql("show databases")

// use databases
spark.sql("use your_doris_db")

// show tables in test
spark.sql("show tables")

// query table
spark.sql("select * from your_doris_table")

// write data
spark.sql("insert into your_doris_table values(xxx)")

Spark SQL

必要なパラメータを設定してSpark SQL CLIを開始します:

spark-sql \
--conf "spark.sql.catalog.your_catalog_name=org.apache.doris.spark.catalog.DorisTableCatalog" \
--conf "spark.sql.catalog.your_catalog_name.doris.fenodes=192.168.0.1:8030" \
--conf "spark.sql.catalog.your_catalog_name.doris.query.port=9030" \
--conf "spark.sql.catalog.your_catalog_name.doris.user=root" \
--conf "spark.sql.catalog.your_catalog_name.doris.password=" \
--conf "spark.sql.defaultCatalog=your_catalog_name"

Spark SQL CLIでクエリを実行する:

-- show all databases
show databases;

-- use databases
use your_doris_db;

-- show tables in test
show tables;

-- query table
select * from your_doris_table;

-- write data
insert into your_doris_table values(xxx);
insert into your_doris_table select * from your_source_table;

-- access table with full name
select * from your_catalog_name.your_doris_db.your_doris_table;
insert into your_catalog_name.your_doris_db.your_doris_table values(xxx);
insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table;

Java Example

Javaバージョンの例は参考用としてsamples/doris-demo/spark-demo/の下に提供されています。こちらをご覧ください。

Configuration

General Configuration

KeyDefault ValueComment
doris.fenodes--Doris FE httpアドレス、カンマで区切られた複数のアドレスをサポート
doris.table.identifier--Dorisテーブル名、例:db1.tbl1
doris.user--Dorisへのアクセス用ユーザー名
doris.passwordEmpty stringDorisへのアクセス用パスワード
doris.request.retries3Dorisへ送信されるリクエストのリトライ回数
doris.request.connect.timeout.ms30000Dorisへ送信されるリクエストの接続タイムアウト
doris.request.read.timeout.ms30000Dorisへ送信されるリクエストの読み取りタイムアウト
doris.request.query.timeout.s21600Dorisのクエリタイムアウト、デフォルト値は6時間、-1はタイムアウト制限なしを意味します。
doris.request.tablet.size11つのRDD Partitionに対応するDoris Tabletの数。
この値が小さいほど、より多くのPartitionが生成され、Sparkの並列性が向上しますが、Dorisへの負荷も大きくなります。
doris.read.field--Dorisテーブルから読み取る列名のリスト、カンマ区切り
doris.batch.size4064BEから一度に読み取る最大行数。この値を増加させると、SparkとDoris間の接続数を減らすことができます。
これによりネットワークレイテンシによる追加の時間オーバーヘッドを削減します。
doris.exec.mem.limit8589934592単一クエリのメモリ制限。デフォルトは8GB、バイト単位
doris.write.fields--Dorisテーブルに書き込むフィールドまたはフィールド順序を指定、カンマ区切り。
デフォルトでは、すべてのフィールドがDorisテーブルフィールドの順序で書き込まれます。
doris.sink.batch.size500000BEに一度に書き込む最大行数
doris.sink.max-retries0BEへの書き込み失敗後のリトライ回数。バージョン1.3.0以降、デフォルト値は0で、デフォルトでリトライしないことを意味します。このパラメータが0より大きく設定されている場合、バッチレベルの失敗リトライが実行され、doris.sink.batch.sizeで設定されたサイズのデータがSpark Executorメモリにキャッシュされるため、メモリ割り当てを適切に増加させる必要がある場合があります。
doris.sink.retry.interval.ms10000リトライ回数設定後の各リトライ間の間隔、ms単位。
doris.sink.properties.formatcsvStream Loadのデータ形式。
3つの形式をサポート:csv、json、arrow
詳細パラメータ
doris.sink.properties.*--Stream Loadのインポートパラメータ。
例:
列区切り文字の指定:'doris.sink.properties.column_separator' = ','など。
詳細パラメータ
doris.sink.task.partition.size--Doris書き込みタスクに対応するPartition数。Spark RDDがフィルタリングやその他の操作を経た後、最終的に書き込まれるPartition数は比較的多くなる可能性がありますが、各Partitionに対応するレコード数は比較的少なく、書き込み頻度の増加と計算リソースの無駄を引き起こします。
この値を小さく設定するほど、Doris書き込み頻度を低くでき、Dorisマージ圧力を軽減できます。このパラメータはdoris.sink.task.use.repartitionと組み合わせて使用されます。
doris.sink.task.use.repartitionfalserepartitionメソッドを使用してDoris書き込みPartition数を制御するかどうか。デフォルト値はfalseで、coalesceメソッドを使用して制御します(注意:書き込み前にSpark actionオペレータがない場合、全体の計算並列性が低下する可能性があります)。
trueに設定すると、repartitionメソッドが使用されます(注意:最終的なPartition数を設定できますが、追加でshuffleオーバーヘッドが増加します)。
doris.sink.batch.interval.ms0各バッチSinkの間隔時間、ms単位。
doris.sink.enable-2pcfalse2段階コミットを有効にするかどうか。有効にすると、ジョブ終了時にトランザクションがコミットされ、一部のタスクが失敗した場合、事前コミット状態のすべてのトランザクションがロールバックされます。
doris.sink.auto-redirecttrueStreamLoadリクエストをリダイレクトするかどうか。有効にすると、StreamLoadはBE情報を明示的に取得せずにFEを通じて書き込みます。
doris.enable.httpsfalseFE Httpsリクエストを有効にするかどうか。
doris.https.key-store-path-Httpsキーストアパス。
doris.https.key-store-typeJKSHttpsキーストアタイプ。
doris.https.key-store-password-Httpsキーストアパスワード。
doris.read.modethriftDoris読み取りモード、オプションはthriftarrowです。
doris.read.arrow-flight-sql.port-Doris FEのArrow Flight SQLポート。doris.read.modearrowの場合、Arrow Flight SQLを通じてデータを読み取るために使用されます。サーバー設定については、Arrow Flight SQLベースの高速データ転送を参照してください
doris.sink.label.prefixspark-dorisStream Loadモードで書き込み時のインポートラベルプレフィックス。
doris.thrift.max.message.size2147483647Thriftを通じてデータを読み取る際の最大メッセージサイズ。
doris.fe.auto.fetchfalseFE情報を自動取得するかどうか。trueに設定すると、doris.fenodesで設定されたノードに基づいてすべてのFEノード情報が要求され、追加で複数のノードを設定したり、doris.read.arrow-flight-sql.portdoris.query.portを個別に設定する必要がありません。
doris.read.bitmap-to-stringfalse読み取り時にBitmapタイプを配列インデックスで構成された文字列に変換するかどうか。具体的な結果形式については、関数定義BITMAP_TO_STRINGを参照してください。
doris.read.bitmap-to-base64false読み取り時にBitmapタイプをBase64エンコード文字列に変換するかどうか。具体的な結果形式については、関数定義BITMAP_TO_BASE64を参照してください。
doris.query.port-Doris FEクエリポート、上書き書き込みとCatalogメタデータ取得に使用されます。

SQL and Dataframe Specific Configuration

KeyDefault ValueComment
doris.filter.query.in.max.count10000述語プッシュダウンでのin式の値リストの最大要素数。この数を超えた場合、in式条件フィルタリングはSpark側で処理されます。

Structured Streaming Specific Configuration

KeyDefault ValueComment
doris.sink.streaming.passthroughfalse処理せずに最初の列の値を直接書き込みます。

RDD Specific Configuration

KeyDefault ValueComment
doris.request.auth.user--Dorisへのアクセス用ユーザー名
doris.request.auth.password--Dorisへのアクセス用パスワード
doris.filter.query--読み取りデータをフィルタリングする式。この式はDorisに透過的に送信されます。Dorisはこの式を使用してソースデータフィルタリングを完了します。

Doris to Spark Column Type Mapping

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.DateType
DATETIMEDataTypes.TimestampType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDecimalType
VARCHARDataTypes.StringType
STRINGDataTypes.StringType
JSONDataTypes.StringType
VARIANTDataTypes.StringType
TIMEDataTypes.DoubleType
HLLDataTypes.StringType
BitmapDataTypes.StringType

Spark to Doris Data Type Mapping

Spark TypeDoris Type
BooleanTypeBOOLEAN
ShortTypeSMALLINT
IntegerTypeINT
LongTypeBIGINT
FloatTypeFLOAT
DoubleTypeDOUBLE
DecimalTypeDECIMAL
StringTypeVARCHAR/STRING
DateTypeDATE
TimestampTypeDATETIME
ArrayTypeARRAY
MapTypeMAP/JSON
StructTypeSTRUCT/JSON
ヒント

バージョン24.0.0以降、Bitmapタイプの読み取り戻り値の型はstringで、デフォルトで文字列値"Read unsupported"を返します。

FAQ

  1. Bitmapタイプの書き込み方法は?

    Spark SQLで、INSERT INTOメソッドを通じてデータを書き込む際、DorisのターゲットテーブルにBITMAPまたはHLLタイプのデータが含まれている場合、パラメータdoris.ignore-typeを対応するタイプに設定し、doris.write.fieldsを通じて列をマップして変換する必要があります。使用方法は以下のとおりです:

    BITMAP

    CREATE TEMPORARY VIEW spark_doris
    USING doris
    OPTIONS(
    "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
    "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "user"="$YOUR_DORIS_USERNAME",
    "password"="$YOUR_DORIS_PASSWORD",
    "doris.ignore-type"="bitmap",
    "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
    );

HLL

```sparksql
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD",
"doris.ignore-type"="hll",
"doris.write.fields"="col1,hll_col1=hll_hash(col1)"
);
```
ヒント

バージョン 24.0.0 以降、doris.ignore-type は非推奨となり、書き込み時に追加する必要がありません。

  1. Overwrite 書き込みの使用方法

    バージョン 1.3.0 以降、Overwrite モードでの書き込みがサポートされています(テーブル全体レベルのデータ上書きのみサポート)。具体的な使用方法は以下の通りです:

    DataFrame

    resultDf.format("doris")
    .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
    // your own options
    .mode(SaveMode.Overwrite)
    .save()

SQL

```sparksql
INSERT OVERWRITE your_target_table SELECT * FROM your_source_table;
```

3. Bitmap型の読み取り方法

バージョン24.0.0以降、Arrow Flight SQLを通じて変換されたBitmapデータの読み取りがサポートされています(Dorisバージョン >= 2.1.0が必要)。

**BitmapからString**

`DataFrame`メソッドを例に、`doris.read.bitmap-to-string`を`true`に設定します。具体的な結果フォーマットについては、オプション定義を参照してください。

```scala
spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-string", "true")
.load()
```

Bitmap to Base64

`DataFrame`メソッドを例にして、`doris.read.bitmap-to-base64`を`true`に設定します。具体的な結果形式については、オプション定義を参照してください。

```scala
spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.option("doris.read.bitmap-to-base64", "true")
.load()
```

4. DataFrameを介して書き込む際のエラー: org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.

saveモードをAppendとして追加する必要があります:

```scala
resultDf.format("doris")
.option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
// your own options
.mode(SaveMode.Append)
.save()
```