BROKER LOAD
Description
Broker Load is a data import method in Doris, primarily used to import large scale data from remote storage systems such as HDFS or S3. It is initiated through the MySQL API and is an asynchronous import method. The import progress and results can be queried using the SHOW LOAD
statement.
In earlier versions, S3 and HDFS Load relied on the Broker process. Now, data is read directly from the data source without relying on an additional Broker process. Nevertheless, due to the similar syntax, S3 Load, HDFS Load, and Broker Load are collectively referred to as Broker Load.
Syntax
LOAD LABEL [<db_name>.]<load_label>
(
[ { MERGE | APPEND | DELETE } ]
DATA INFILE
(
"<file_path>"[, ...]
)
[ NEGATIVE ]
INTO TABLE `<table_name>`
[ PARTITION ( <partition_name> [ , ... ] ) ]
[ COLUMNS TERMINATED BY "<column_separator>" ]
[ LINES TERMINATED BY "<line_delimiter>" ]
[ FORMAT AS "<file_type>" ]
[ COMPRESS_TYPE AS "<compress_type>" ]
[ (<column_list>) ]
[ COLUMNS FROM PATH AS (<column_name> [ , ... ] ) ]
[ SET (<column_mapping>) ]
[ PRECEDING FILTER <predicate> ]
[ WHERE <predicate> ]
[ DELETE ON <expr> ]
[ ORDER BY <source_sequence> ]
[ PROPERTIES ("<key>"="<value>" [, ...] ) ]
)
WITH BROKER "<broker_name>"
( <broker_properties>
[ , ... ])
[ PROPERTIES (
<load_properties>
[ , ... ]) ]
[COMMENT "<comment>" ];
Required Parameters
1. <db_name>
Specifies the name of the database for import.
2. <load_label>
Each import task needs to specify a unique Label. The job progress can be queried later using this Label.
3. <table_name>
Specifies the table corresponding to the import task.
4. <file_path>
Specifies the file path to be imported. Multiple paths can be specified, and wildcards can be used. The path must ultimately match a file; if it only matches a directory, the import will fail.
5. <broker_name>
Specifies the name of the Broker service to be used. For example, in public - cloud Doris, the Broker service name is
bos
.
6. <broker_properties>
Specifies the information required by the broker. This information is typically used to enable the Broker to access the remote storage system, such as BOS or HDFS.
(
"username" = "user",
"password" = "pass",
...
)
Optional Parameters
1. merge | append | delete
Data merge type. The default is
append
, indicating that this import is a normal append-write operation.merge
anddelete
types are only applicable to tables with the unique key model. Themerge
type needs to be used in conjunction with the[delete on]
statement to mark the delete flag column. Thedelete
type indicates that all data imported this time is deletion data.
2. negative
Indicates "negative" import. This method is only applicable to aggregate data tables with an integer sum aggregation type. It negates the integer values corresponding to the sum aggregation columns in the imported data, which is used to offset incorrect data.
3. <partition_name>
Specifies to import only certain partitions of the table, for example:
partition (p1, p2,...)
. Other data outside the partition range will be ignored.
4. <column_separator>
Specifies the column separator, which is only valid in CSV format and can only specify single-byte separators.
5. <line_delimiter>
Specifies the line separator, which is only valid in CSV format and can only specify single-byte separators.
6. <file_type>
Specifies the file format, supporting
csv
(default),parquet
, andorc
formats.
7. <compress_type>
Specifies the file compression type, supporting
gz
,bz2
, andlz4frame
.
8. <column_list>
Specifies the column order in the original file.
9. columns from path as (<c1>, <c2>,...)
Specifies the columns to be extracted from the import file path.
10. <column_mapping>
Specifies the column conversion function.
11. preceding filter <predicate>
Data is first spliced into the original data rows according to
column list
andcolumns from path as
, then filtered according to the preceding filter condition.
12. where <predicate>
Filters the imported data according to the condition.
13. delete on <expr>
Used in conjunction with the
merge
import mode and is only applicable to tables with the unique key model. It specifies the column representing the delete flag in the imported data and the calculation relationship.
14. <source_sequence>
Only applicable to tables with the unique key model. It specifies the column representing the sequence column in the imported data, mainly to ensure the data order during import.
15. properties ("<key>"="<value>",...)
Specifies the parameters for the import file format, applicable to formats such as CSV, JSON, etc. For example, parameters such as
json_root
,jsonpaths
, andfuzzy_parse
can be specified.
enclose
: Enclosure character; when a CSV data field contains a line separator or column separator, a single-byte character can be specified as the enclosure character to prevent accidental truncation. For example, if the column separator is ",", and the enclosure character is "'", and the data is "a,'b,c'", then "b,c" will be parsed as one field.
Note: Whenenclose
is set to"
,trim_double_quotes
must be set totrue
.
escape
: Escape character, used to escape characters in the field that are the same as the enclosure character. For example, if the data is "a,'b,'c'", the enclosure character is "'", and you want "b,'c" to be parsed as one field, you need to specify a single-byte escape character, such as""
, and then modify the data to "a,'b,'c'".
16. <load_properties>
The optional parameters are as follows and can be added based on the actual environment.
Parameter | Parameter Description |
---|---|
timeout | Import timeout period, with a default of 4 hours and the unit in seconds. |
max_filter_ratio | The maximum tolerable ratio of filterable data (due to reasons such as data irregularities), with a default of zero tolerance and a value range from 0 to 1. |
exec_mem_limit | Import memory limit, with a default of 2GB and the unit in bytes. |
strict_mode | Whether to impose strict restrictions on the data, with a default of false . |
partial_columns | A boolean type. When set to true , it indicates using partial - column updates, with a default value of false . It can only be set when the table model is Unique and uses Merge on Write. |
timezone | Specifies the time zone, which affects some functions affected by the time zone, such as strftime , alignment_timestamp , from_unixtime , etc. For details, please refer to the [Time Zone](https://chatgpt.com/advanced/time - zone.md) documentation. If not specified, "Asia/Shanghai" will be used. |
load_parallelism | Import concurrency. The default is 1. Increasing the import concurrency will start multiple execution plans to execute the import task simultaneously, speeding up the import process. |
send_batch_parallelism | Sets the parallelism for sending batch data. If the value of the parallelism exceeds max_send_batch_parallelism_per_job in the BE configuration, the value of max_send_batch_parallelism_per_job will be used. |
load_to_single_tablet | A boolean type. When set to true , it indicates supporting importing data into a single tablet of the corresponding partition, with a default value of false . The number of tasks in the job depends on the overall concurrency and can only be set when importing an OLAP table with a random bucket. |
priority | Sets the priority of the import task, with options of HIGH/NORMAL/LOW and a default of NORMAL . For import tasks in the PENDING state, tasks with a higher priority will enter the LOADING state first. |
comment | Specifies the remarks information for the import task. |
Access Control Requirements
Users executing this SQL command must have at least the following permissions:
Privilege | Object | Notes |
---|---|---|
LOAD_PRIV | Table | Import permissions for the specified database table. |
Examples
-
Import a batch of data from HDFS. The imported file is
file.txt
, separated by commas, and imported into the tablemy_table
.LOAD LABEL example_db.label1
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file.txt")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
); -
Import data from HDFS using wildcards to match two batches of files and import them into two tables respectively. Use wildcards to match two batches of files,
file - 10*
andfile - 20*
, and import them into the tablesmy_table1
andmy_table2
respectively. Formy_table1
, specify to import into partitionp1
, and import the values of the second and third columns in the source file after adding 1.LOAD LABEL example_db.label2
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-10*")
INTO TABLE `my_table1`
PARTITION (p1)
COLUMNS TERMINATED BY ","
(k1, tmp_k2, tmp_k3)
SET (
k2 = tmp_k2 + 1,
k3 = tmp_k3 + 1
),
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file-20*")
INTO TABLE `my_table2`
COLUMNS TERMINATED BY ","
(k1, k2, k3)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
); -
Import a batch of data from HDFS. Specify the separator as the default Hive separator
\\x01
, and use the wildcard*
to specify all files in all directories under thedata
directory. Use simple authentication and configure namenode HA at the same time.LOAD LABEL example_db.label3
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/user/doris/data/*/*")
INTO TABLE `my_table`
COLUMNS TERMINATED BY "\\x01"
)
WITH BROKER my_hdfs_broker
(
"username" = "",
"password" = "",
"fs.defaultFS" = "hdfs://my_ha",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
); -
Import data in Parquet format and specify the
FORMAT
asparquet
. By default, it is determined by the file suffix.LOAD LABEL example_db.label4
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file")
INTO TABLE `my_table`
FORMAT AS "parquet"
(k1, k2, k3)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
); -
Import data and extract partition fields from the file path. The columns in the
my_table
arek1, k2, k3, city, utc_date
. The directoryhdfs://hdfs_host:hdfs_port/user/doris/data/input/dir/city = beijing
contains the following files:hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csvThe files only contain three columns of data,
k1, k2, k3
, and the two columns of data,city
andutc_date
, will be extracted from the file path.LOAD LABEL example_db.label10
(
DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
INTO TABLE `my_table`
FORMAT AS "csv"
(k1, k2, k3)
COLUMNS FROM PATH AS (city, utc_date)
)
WITH BROKER hdfs
(
"username"="hdfs_user",
"password"="hdfs_password"
); -
Filter the data to be imported. Only rows where
k1 = 1
in the original data andk1 > k2
after conversion will be imported.LOAD LABEL example_db.label6
(
DATA INFILE("hdfs://host:port/input/file")
INTO TABLE `my_table`
(k1, k2, k3)
SET (
k2 = k2 + 1
)
PRECEDING FILTER k1 = 1
WHERE k1 > k2
)
WITH BROKER hdfs
(
"username"="user",
"password"="pass"
); -
Import data, extract the time partition field from the file path, and the time contains
%3A
(in the HDFS path,:
is not allowed, so all:
will be replaced by%3A
).LOAD LABEL example_db.label7
(
DATA INFILE("hdfs://host:port/user/data/*/test.txt")
INTO TABLE `tbl12`
COLUMNS TERMINATED BY ","
(k2,k3)
COLUMNS FROM PATH AS (data_time)
SET (
data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
)
)
WITH BROKER hdfs
(
"username"="user",
"password"="pass"
);The directory contains the following files:
/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
/user/data/data_time=2020-02-18 00%3A00%3A00/test.txtThe table structure is:
data_time DATETIME,
k2 INT,
k3 INT -
Import a batch of data from HDFS, specifying the timeout period and the filtering ratio. Use the broker
my_hdfs_broker
with plain - text authentication. Delete the columns in the original data that match the columns wherev2 > 100
in the imported data, and import other columns normally.LOAD LABEL example_db.label8
(
MERGE DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);Use the
MERGE
method for import.my_table
must be a table with the Unique Key model. When the value of thev2
column in the imported data is greater than 100, the row will be considered a deletion row.The timeout period for the import task is 3600 seconds, and an error rate of up to 10% is allowed.
-
Specify the
source_sequence
column during import to ensure the replacement order in theUNIQUE_KEYS
table:LOAD LABEL example_db.label9
(
DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)my_table
must be a table with the Unique Key model and aSequence Col
must be specified. The data will be ordered according to the values in thesource_sequence
column of the source data. -
Import a batch of data from HDFS, specifying the file format as
json
and settingjson_root
andjsonpaths
:LOAD LABEL example_db.label10
(
DATA INFILE("HDFS://test:port/input/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
PROPERTIES(
"json_root" = "$.item",
"jsonpaths" = "[$.id, $.city, $.code]"
)
)
WITH BROKER HDFS (
"hadoop.username" = "user",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);jsonpaths
can be used in conjunction withcolumn list
andSET (column_mapping)
:LOAD LABEL example_db.label10
(
DATA INFILE("HDFS://test:port/input/file.json")
INTO TABLE `my_table`
FORMAT AS "json"
(id, code, city)
SET (id = id * 10)
PROPERTIES(
"json_root" = "$.item",
"jsonpaths" = "[$.id, $.code, $.city]"
)
)
WITH BROKER HDFS (
"hadoop.username" = "user",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
); -
Import data in CSV format from Tencent Cloud COS.
LOAD LABEL example_db.label10
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
) -
Remove double quotes and skip the first 5 rows when importing CSV data.
LOAD LABEL example_db.label12
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
PROPERTIES("trim_double_quotes" = "true", "skip_lines" = "5")
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)