メインコンテンツまでスキップ

Insert Into Select

INSERT INTO文は、Dorisクエリの結果を別のテーブルにインポートすることをサポートしています。INSERT INTOは同期インポート方式であり、インポートの実行後にインポート結果が返されます。インポートが成功したかどうかは、返された結果に基づいて判断できます。INSERT INTOはインポートタスクの原子性を保証します。つまり、すべてのデータが正常にインポートされるか、まったくインポートされないかのいずれかです。

適用シナリオ

  1. ユーザーがDorisテーブルの既存データに対してETLを実行し、新しいDorisテーブルにインポートしたい場合、INSERT INTO SELECT構文が適用できます。
  2. Multi-カタログ外部テーブルメカニズムと組み合わせて、MySQLやHiveシステムのテーブルをMulti-カタログ経由でマッピングできます。その後、INSERT INTO SELECT構文を使用して外部テーブルからDorisテーブルにデータをインポートできます。
  3. table Value Functions(TVFs)を利用して、ユーザーはオブジェクトストレージやHDFS上のファイルに保存されたデータを、自動列型推論機能付きのテーブルとして直接クエリできます。その後、INSERT INTO SELECT構文を使用して外部テーブルからDorisテーブルにデータをインポートできます。

実装

INSERT INTOを使用する際、インポートジョブはMySQLプロトコルを使用してFEノードに開始・送信される必要があります。FEは実行プランを生成し、これにはクエリ関連のオペレーターが含まれ、最後のオペレーターはOlapTableSinkです。OlapTableSinkオペレーターは、クエリ結果をターゲットテーブルに書き込む責任があります。実行プランはその後、実行のためにBEノードに送信されます。DorisはCoordinatorとして1つのBEノードを指定し、このノードがデータを受信して他のBEノードに配布します。

開始方法

INSERT INTOジョブはMySQLプロトコルを使用して送信・転送されます。以下の例では、MySQLコマンドラインインターフェースを通じてINSERT INTOを使用してインポートジョブを送信する方法を示します。

詳細な構文はINSERT INTOドキュメントで確認できます。

準備

INSERT INTOは、ターゲットテーブルに対するINSERT権限が必要です。GRANTコマンドを使用してユーザーアカウントに権限を付与できます。

INSERT INTOジョブの作成

  1. ソーステーブルを作成する
CREATE TABLE testdb.test_table(
user_id BIGINT NOT NULL COMMENT "User ID",
name VARCHAR(20) COMMENT "User name",
age INT COMMENT "User age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
  1. 任意のロード方法を使用してソーステーブルにデータをインポートします。(ここでは例としてINSERT INTO VALUESを使用します)。
INSERT INTO testdb.test_table (user_id, name, age)
VALUES (1, "Emily", 25),
(2, "Benjamin", 35),
(3, "Olivia", 28),
(4, "Alexander", 60),
(5, "Ava", 17);
  1. 上記の操作に基づいて、ターゲットテーブルとして新しいテーブルを作成します(ソーステーブルと同じスキーマで)。
CREATE TABLE testdb.test_table2 LIKE testdb.test_table;
  1. INSERT INTO SELECTを使用して新しいテーブルにデータを取り込む。
INSERT INTO testdb.test_table2
SELECT * FROM testdb.test_table WHERE age < 30;
Query OK, 3 rows affected (0.544 sec)
{'label':'label_9c2bae970023407d_b2c5b78b368e78a7', 'status':'VISIBLE', 'txnId':'9084'}
  1. インポートされたデータを表示します。
MySQL> SELECT * FROM testdb.test_table2 ORDER BY age;
+---------+--------+------+
| user_id | name | age |
+---------+--------+------+
| 5 | Ava | 17 |
| 1 | Emily | 25 |
| 3 | Olivia | 28 |
+---------+--------+------+
3 rows in set (0.02 sec)
  1. JOBを使用してINSERT操作を非同期で実行できます。

  2. ソースはtvfまたはcatalog内のテーブルにできます。

INSERT INTOジョブの表示

完了したINSERT INTOタスクを表示するには、SHOW LOADコマンドを使用できます。

MySQL> SHOW LOAD FROM testdb;
+--------+-----------------------------------------+----------+--------------------+--------+---------+----------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+-----------------------------------------------------------------------------------------------------------------------+---------------+--------------+------+---------+
| JobId | Label | State | Progress | Type | EtlInfo | TaskInfo | ErrorMsg | CreateTime | EtlStartTime | EtlFinishTime | LoadStartTime | LoadFinishTime | URL | JobDetails | TransactionId | ErrorTablets | User | Comment |
+--------+-----------------------------------------+----------+--------------------+--------+---------+----------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+-----------------------------------------------------------------------------------------------------------------------+---------------+--------------+------+---------+
| 376416 | label_3e52da787aab4222_9126d2fce8f6d1e5 | FINISHED | Unknown id: 376416 | INSERT | NULL | cluster:N/A; timeout(s):26200; max_filter_ratio:0.0; priority:NORMAL | NULL | 2024-02-27 01:22:17 | 2024-02-27 01:22:17 | 2024-02-27 01:22:17 | 2024-02-27 01:22:17 | 2024-02-27 01:22:18 | | {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"LoadBytes":0,"All backends":{},"FileNumber":0,"FileSize":0} | 9081 | {} | root | |
| 376664 | label_9c2bae970023407d_b2c5b78b368e78a7 | FINISHED | Unknown id: 376664 | INSERT | NULL | cluster:N/A; timeout(s):26200; max_filter_ratio:0.0; priority:NORMAL | NULL | 2024-02-27 01:39:37 | 2024-02-27 01:39:37 | 2024-02-27 01:39:37 | 2024-02-27 01:39:37 | 2024-02-27 01:39:38 | | {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"LoadBytes":0,"All backends":{},"FileNumber":0,"FileSize":0} | 9084 | {} | root | |
+--------+-----------------------------------------+----------+--------------------+--------+---------+----------------------------------------------------------------------+----------+---------------------+---------------------+---------------------+---------------------+---------------------+------+-----------------------------------------------------------------------------------------------------------------------+---------------+--------------+------+---------+

INSERT INTO jobs のキャンセル

現在実行中のINSERT INTO jobは、Ctrl-Cでキャンセルできます。

マニュアル

構文

INSERT INTOの構文は以下の通りです:

  1. INSERT INTO SELECT

INSERT INTO SELECTは、クエリ結果をターゲットテーブルに書き込むために使用されます。

INSERT INTO target_table SELECT ... FROM source_table;

上記のSELECT文は通常のSELECTクエリと似ており、WHEREやJOINなどの操作が可能です。

パラメータ設定

FE Config

名前デフォルト値説明
insert_load_default_timeout_second14400s (4時間)インポートタスクのタイムアウト(秒単位)。インポートタスクがこのタイムアウト期間内に完了しない場合、システムによってキャンセルされ、CANCELLEDとしてマークされます。

Session Variable

名前デフォルト値説明
insert_timeout14400s (4時間)SQL文としてのINSERT INTOのタイムアウト(秒単位)。
enable_insert_stricttrueこれがtrueに設定されている場合、タスクに無効なデータが含まれているとINSERT INTOは失敗します。falseに設定されている場合、INSERT INTOは無効な行を無視し、少なくとも1行が正常にインポートされれば、インポートは成功したと見なされます。バージョン2.1.4まで。INSERT INTOではエラー率を制御できないため、このパラメータはデータ品質を厳密にチェックするか、無効なデータを完全に無視するかのいずれかに使用されます。データが無効になる一般的な理由には、ソースデータの列長が宛先列長を超えている、列タイプの不一致、パーティションの不一致、列順序の不一致などがあります。
insert_max_filter_ratio1.0バージョン2.1.5以降。enable_insert_strictがfalseの場合のみ有効。INSERT INTO FROM S3/HDFS/LOCAL()を使用する際のエラー許容度を制御するために使用されます。デフォルト値は1.0で、すべてのエラーが許容されることを意味します。0から1の間の小数値にすることができます。エラー行数がこの比率を超えると、INSERTタスクが失敗することを意味します。

戻り値

INSERT INTOはSQL文であり、異なるクエリ結果に基づいて異なる結果を返します:

空の結果セット

INSERT INTOのSELECT文のクエリ結果セットが空の場合、戻り値は次のようになります:

mysql> INSERT INTO tbl1 SELECT * FROM empty_tbl;
Query OK, 0 rows affected (0.02 sec)

Query OKは実行が成功したことを示します。0 rows affectedはデータがインポートされなかったことを意味します。

空でない結果セットと成功したINSERT

mysql> INSERT INTO tbl1 SELECT * FROM tbl2;
Query OK, 4 rows affected (0.38 sec)
{'label':'INSERT_8510c568-9eda-4173-9e36-6adc7d35291c', 'status':'visible', 'txnId':'4005'}

mysql> INSERT INTO tbl1 WITH LABEL my_label1 SELECT * FROM tbl2;
Query OK, 4 rows affected (0.38 sec)
{'label':'my_label1', 'status':'visible', 'txnId':'4005'}

mysql> INSERT INTO tbl1 SELECT * FROM tbl2;
Query OK, 2 rows affected, 2 warnings (0.31 sec)
{'label':'INSERT_f0747f0e-7a35-46e2-affa-13a235f4020d', 'status':'visible', 'txnId':'4005'}

mysql> INSERT INTO tbl1 SELECT * FROM tbl2;
Query OK, 2 rows affected, 2 warnings (0.31 sec)
{'label':'INSERT_f0747f0e-7a35-46e2-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}

Query OKは実行が成功したことを示します。4 rows affectedは合計4行のデータがインポートされたことを示します。2 warningsはフィルタリングされた行数を示します。

さらに、JSON文字列が返されます:

{'label':'my_label1', 'status':'visible', 'txnId':'4005'}
{'label':'INSERT_f0747f0e-7a35-46e2-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
{'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}

パラメータの説明:

ParameterDescription
TxnIdインポートトランザクションのID
Labelインポートジョブのラベル:「INSERT INTO tbl WITH LABEL label...」を使用して指定できます
Statusインポートされたデータの可視性:可視である場合は「visible」と表示されます。そうでない場合は「committed」と表示されます。「committed」状態では、インポートは完了していますが、データの可視化に遅延が生じる可能性があります。この場合、再試行する必要はありません。visible:インポートが成功し、データが可視です。committed:インポートは完了していますが、データの可視化に遅延が生じる可能性があります。この場合、再試行する必要はありません。Label Already Exists:指定されたラベルがすでに存在するため、別のラベルに変更する必要があります。Fail:インポートが失敗しました。
Errエラーメッセージ

フィルタリングされた行を表示するには、SHOW LOAD文を使用できます。

SHOW LOAD WHERE label="xxx";

このステートメントの結果には、エラーデータをクエリするために使用できるURLが含まれます。詳細については、下記の「エラー行を表示する」セクションを参照してください。

データの非表示状態は一時的であり、データは最終的に表示されるようになります。

SHOW TRANSACTIONステートメントを使用して、データのバッチの可視性ステータスを確認できます。

SHOW TRANSACTION WHERE id=4005;

結果のTransactionStatus列がvisibleの場合、そのデータが表示可能であることを示しています。

{'label':'my_label1', 'status':'visible', 'txnId':'4005'}
{'label':'INSERT_f0747f0e-7a35-46e2-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
{'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}

空でない結果セットだがINSERTに失敗

実行の失敗は、データが正常にインポートされなかったことを意味します。エラーメッセージが返されます:

mysql> INSERT INTO tbl1 SELECT * FROM tbl2 WHERE k1 = "a";
ERROR 1064 (HY000): all partitions have no load data. url: http://10.74.167.16:8042/api/_load_error_log?file=_shard_2/error_loginsert_stmt_ba8bb9e158e4879-ae8de8507c0bf8a2_ba8bb9e158e4879_ae8de8507c0bf8a2

ERROR 1064 (HY000): all partitions have no load dataは失敗の根本原因を示しています。エラーメッセージで提供されるURLを使用してエラーデータの場所を特定できます。詳細については、以下の「エラー行の表示」セクションを参照してください。

ベストプラクティス

データサイズ

INSERT INTOはデータ量に制限を課さず、大規模なデータインポートをサポートできます。ただし、大量のデータをインポートする場合は、システムのINSERT INTOタイムアウト設定を調整してインポートタイムアウト >= データ量``/``推定インポート速度となるようにすることが推奨されます。

  1. FE設定パラメータinsert_load_default_timeout_second
  2. 環境パラメータinsert_timeout

エラー行の表示

INSERT INTOの結果にURLフィールドが含まれている場合、以下のコマンドを使用してエラー行を表示できます:

SHOW LOAD WARNINGS ON "url";

例:

SHOW LOAD WARNINGS ON "http://ip:port/api/_load_error_log?file=_shard_13/error_loginsert_stmt_d2cac0a0a16d482d-9041c949a4b71605_d2cac0a0a16d482d_9041c949a4b71605";

エラーの一般的な原因には、ソースデータの列長が宛先列長を超過する、列タイプの不一致、パーティションの不一致、列順序の不一致などがあります。

環境変数enable_insert_strictを設定することで、INSERT INTOがエラー行を無視するかどうかを制御できます。

Multi-Catalogによる外部データの取り込み

Dorisは外部テーブルの作成をサポートしています。作成後、外部テーブルのデータはINSERT INTO SELECTを使用してDoris内部テーブルにインポートするか、SELECT文を使用して直接クエリできます。

Multi-Catalog機能により、DorisはApache Hive、Apache Iceberg、Apache Hudi、Apache Paimon (Incubating)、Elasticsearch、MySQL、Oracle、SQL Serverなど、様々な主要なデータレイクやデータベースへの接続をサポートしています。

Multi-Catalogの詳細については、Lakehouse overviewを参照してください。

以下では、Hive外部テーブルからDoris内部テーブルへのデータインポートについて説明します。

Hive Catalogの作成

CREATE CATALOG hive PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://172.0.0.1:9083',
'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:8088',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:8088',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);

データの取り込み

  1. Dorisでデータインポート用のターゲットテーブルを作成します。
CREATE TABLE `target_tbl` (
`k1` decimal(9, 3) NOT NULL COMMENT "",
`k2` char(10) NOT NULL COMMENT "",
`k3` datetime NOT NULL COMMENT "",
`k5` varchar(20) NOT NULL COMMENT "",
`k6` double NOT NULL COMMENT ""
)
COMMENT "Doris Table"
DISTRIBUTED BY HASH(k1) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
);
  1. Dorisテーブルの作成に関する詳細な手順については、CREATE TABLEを参照してください。

  2. データのインポート(hive.db1.source_tblテーブルからtarget_tblテーブルへ)。

INSERT INTO target_tbl SELECT k1,k2,k3 FROM  hive.db1.source_tbl limit 100;

INSERT コマンドは同期コマンドです。結果を返した場合、インポートが成功したことを示します。

注意事項

  • 外部データソースとDorisクラスタが通信できることを確認してください。これには、BEノードと外部データソース間の相互ネットワークアクセシビリティが含まれます。

TVF によるデータ取り込み

DorisはTable Value Functions(TVFs)を通じて、オブジェクトストレージやHDFSに保存されたファイルを直接テーブルとしてクエリおよび分析でき、自動カラムタイプ推論をサポートします。詳細情報については、Lakehouse/TVF ドキュメントを参照してください。

自動カラムタイプ推論

DESC FUNCTION s3 (
"URI" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style"="true"
);
+---------------+--------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-------+---------+-------+
| p_partkey | INT | Yes | false | NULL | NONE |
| p_name | TEXT | Yes | false | NULL | NONE |
| p_mfgr | TEXT | Yes | false | NULL | NONE |
| p_brand | TEXT | Yes | false | NULL | NONE |
| p_type | TEXT | Yes | false | NULL | NONE |
| p_size | INT | Yes | false | NULL | NONE |
| p_container | TEXT | Yes | false | NULL | NONE |
| p_retailprice | DECIMAL(9,0) | Yes | false | NULL | NONE |
| p_comment | TEXT | Yes | false | NULL | NONE |
+---------------+--------------+------+-------+---------+-------+

この S3 TVF の例では、ファイルパス、接続情報、および認証情報が指定されています。

DESC FUNCTION 構文を使用して、このファイルのスキーマを表示できます。

Parquet ファイルについては、Doris がファイル内のメタデータに基づいて列の型を自動的に推論することがわかります。

現在、Doris は Parquet、ORC、CSV、および JSON 形式の解析と列の型推論をサポートしています。

INSERT INTO SELECT 構文と組み合わせて使用することで、ファイルを Doris テーブルに迅速にインポートして、より高速な分析を行うことができます。

// 1. Create Doris internal table
CREATE TABLE IF NOT EXISTS test_table
(
id int,
name varchar(50),
age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");

// 2. Insert data by S3 Table Value Function
INSERT INTO test_table (id,name,age)
SELECT cast(id as INT) as id, name, cast (age as INT) as age
FROM s3(
"uri" = "http://127.0.0.1:9312/test2/test.snappy.parquet",
"s3.access_key"= "ak",
"s3.secret_key" = "sk",
"format" = "parquet",
"use_path_style" = "true");

注意事項

  • S3 / hdfs TVFで指定されたURIがどのファイルにも一致しない場合、または一致したすべてのファイルが空の場合、S3 / hdfs TVFは空の結果セットを返します。このような場合、DESC FUNCTIONを使用してファイルのスキーマを表示すると、無視できるダミーカラム__dummy_colが表示されます。
  • TVFに指定された形式がCSVで、読み取られるファイルが空ではないがファイルの最初の行が空の場合、エラーが表示されます:The first line is empty, can not parse column numbers。これは、ファイルの最初の行からスキーマを解析できないためです。

より詳細なヘルプ

INSERT INTOのより詳細な構文については、INSERT INTOコマンドマニュアルを参照してください。また、MySQLクライアントのコマンドラインでHELP INSERTと入力することで、さらなる情報を得ることもできます。