Export
本文档将介绍如何使用EXPORT
命令导出 Doris 中存储的数据。
有关EXPORT
命令的详细介绍,请参考:EXPORT
概述
Export
是 Doris 提供的一种将数据异步导出的功能。该功能可以将用户指定的表或分区的数据,以指定的文件格式,导出到目标存储系统中,包括对象存储、HDFS 或本地文件系统。
Export
是一个异步执行的命令,命令执行成功后,立即返回结果,用户可以通过Show Export
命令查看该 Export 任务的详细信息。
关于如何选择 SELECT INTO OUTFILE
和 EXPORT
,请参阅 导出综述。
EXPORT
当前支持导出以下类型的表或视图
- Doris 内表
- Doris 逻辑视图
- Doris Catalog 表
EXPORT
目前支持以下导出格式
- Parquet
- ORC
- csv
- csv_with_names
- csv_with_names_and_types
不支持压缩格式的导出。
示例:
mysql> EXPORT TABLE tpch1.lineitem TO "s3://my_bucket/path/to/exp_"
-> PROPERTIES(
-> "format" = "csv",
-> "max_file_size" = "2048MB"
-> )
-> WITH s3 (
-> "s3.endpoint" = "${endpoint}",
-> "s3.region" = "${region}",
-> "s3.secret_key"="${sk}",
-> "s3.access_key" = "${ak}"
-> );
提交作业后,可以通过 SHOW EXPORT 命令查询导出作业状态,结果举例如下:
mysql> show export\G
*************************** 1. row ***************************
JobId: 143265
Label: export_0aa6c944-5a09-4d0b-80e1-cb09ea223f65
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":[],"parallelism":5,"data_consistency":"partition","format":"csv","broker":"S3","column_separator":"\t","line_delimiter":"\n","max_file_size":"2048MB","delete_existing_files":"","with_bom":"false","db":"tpch1","tbl":"lineitem"}
Path: s3://ftw-datalake-test-1308700295/test_ycs_activeDefense_v10/test_csv/exp_
CreateTime: 2024-06-11 18:01:18
StartTime: 2024-06-11 18:01:18
FinishTime: 2024-06-11 18:01:31
Timeout: 7200
ErrorMsg: NULL
OutfileInfo: [
[
{
"fileNumber": "1",
"totalRows": "6001215",
"fileSize": "747503989bytes",
"url": "s3://my_bucket/path/to/exp_6555cd33e7447c1-baa9568b5c4eb0ac_*"
}
]
]
1 row in set (0.00 sec)
show export
命令返回的结果各个列的含义如下:
- JobId:作业的唯一 ID
- Label:该导出作业的标签,如果 Export 没有指定,则系统会默认生成一个。
- State:作业状态:
- PENDING:作业待调度
- EXPORTING:数据导出中
- FINISHED:作业成功
- CANCELLED:作业失败
- Progress:作业进度。该进度以查询计划为单位。假设一共 10 个线程,当前已完成 3 个,则进度为 30%。
- TaskInfo:以 Json 格式展示的作业信息:
- db:数据库名
- tbl:表名
- partitions:指定导出的分区。
空
列表 表示所有分区。 - column_separator:导出文件的列分隔符。
- line_delimiter:导出文件的行分隔符。
- tablet num:涉及的总 Tablet 数量。
- broker:使用的 broker 的名称。
- coord num:查询计划的个数。
- max_file_size:一个导出文件的最大大小。
- delete_existing_files:是否删除导出目录下已存在的文件及目录。
- columns:指定需要导出的列名,空值代表导出所有列。
- format:导出的文件格式
- Path:远端存储上的导出路径。
- CreateTime/StartTime/FinishTime:作业的创建时间、开始调度时间和结束时间。
- Timeout:作业超时时间。单位是秒。该时间从 CreateTime 开始计算。
- ErrorMsg:如果作业出现错误,这里会显示错误原因。
- OutfileInfo:如果作业导出成功,这里会显示具体的
SELECT INTO OUTFILE
结果信息。
提交 Export 作业后,在 Export 任务成功或失败之前可以通过 CANCEL EXPORT 命令取消导出作业。取消命令举例如下:
CANCEL EXPORT FROM tpch1 WHERE LABEL like "%export_%";
导出文件列类型映射
Export
支持导出数据为 Parquet、ORC 文件格式。Parquet、ORC 文件格式拥有自己的数据类型,Doris 的导出功能能够自动将 Doris 的数据类型导出为 Parquet、ORC 文件格式的对应数据类型,具体映射关系请参阅导出综述文档的 "导出文件列类型映射" 部分。
示例
导出到 HDFS
将 db1.tbl1 表的 p1 和 p2 分区中的col1
列和col2
列数据导出到 HDFS 上,设置导出作业的 label 为 mylabel
。导出文件格式为 csv(默认格式),列分割符为,
,导出作业单个文件大小限制为 512MB。
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"max_file_size" = "512MB",
"columns" = "col1,col2"
)
with HDFS (
"fs.defaultFS"="hdfs://hdfs_host:port",
"hadoop.username" = "hadoop"
);
如果 HDFS 开启了高可用,则需要提供 HA 信息,如:
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://HDFS8000871/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"max_file_size" = "512MB",
"columns" = "col1,col2"
)
with HDFS (
"fs.defaultFS" = "hdfs://HDFS8000871",
"hadoop.username" = "hadoop",
"dfs.nameservices" = "your-nameservices",
"dfs.ha.namenodes.your-nameservices" = "nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1" = "ip:port",
"dfs.namenode.rpc-address.HDFS8000871.nn2" = "ip:port",
"dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
如果 Hadoop 集群开启了高可用并且启用了 Kerberos 认证,可以参考如下 SQL 语句:
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
TO "hdfs://HDFS8000871/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"max_file_size" = "512MB",
"columns" = "col1,col2"
)
with HDFS (
"fs.defaultFS"="hdfs://hacluster/",
"hadoop.username" = "hadoop",
"dfs.nameservices"="hacluster",
"dfs.ha.namenodes.hacluster"="n1,n2",
"dfs.namenode.rpc-address.hacluster.n1"="192.168.0.1:8020",
"dfs.namenode.rpc-address.hacluster.n2"="192.168.0.2:8020",
"dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.kerberos.principal"="hadoop/_HOST@REALM.COM"
"hadoop.security.authentication"="kerberos",
"hadoop.kerberos.principal"="doris_test@REALM.COM",
"hadoop.kerberos.keytab"="/path/to/doris_test.keytab"
);
导出到 S3
将 s3_test 表中的所有数据导出到 s3 上,导出格式为 csv,以不可见字符 \x07
作为行分隔符。
EXPORT TABLE s3_test TO "s3://bucket/a/b/c"
PROPERTIES (
"line_delimiter" = "\\x07"
) WITH s3 (
"s3.endpoint" = "xxxxx",
"s3.region" = "xxxxx",
"s3.secret_key"="xxxx",
"s3.access_key" = "xxxxx"
)
导出到本地文件系统
export 数据导出到本地文件系统,需要在 fe.conf 中添加
enable_outfile_to_local=true
并且重启 FE。
将 test 表中的所有数据导出到本地存储:
-- parquet 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"format" = "parquet"
);
-- orc 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"format" = "orc"
);
-- csv_with_names 格式,以‘AA’为列分割符,‘zz’为行分割符
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "csv_with_names",
"column_separator"="AA",
"line_delimiter" = "zz"
);
-- csv_with_names_and_types 格式
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "csv_with_names_and_types"
);
注意: 导出到本地文件系统的功能不适用于公有云用户,仅适用于私有化部署的用户。并且默认用户对集群节点有完全的控制权限。Doris 对于用户填写的导出路径不会做合法性检查。如果 Doris 的进程用户对该路径无写权限,或路径不存在,则会报错。同时处于安全性考虑,如果该路径已存在同名的文件,则也会导出失败。 Doris 不会管理导出到本地的文件,也不会检查磁盘空间等。这些文件需要用户自行管理,如清理等。
指定分区导出
导出作业支持仅导出 Doris 内表的部分分区,如仅导出 test 表的 p1 和 p2 分区
EXPORT TABLE test
PARTITION (p1,p2)
TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2"
);
导出时过滤数据
导出作业支持导出时根据谓词条件过滤数据,仅导出符合条件的数据,如仅导出满足 k1 < 50
条件的数据
EXPORT TABLE test
WHERE k1 < 50
TO "file:///home/user/tmp/"
PROPERTIES (
"columns" = "k1,k2",
"column_separator"=","
);
导出外表数据
导出作业支持 Doris Catalog 外表数据:
-- 创建一个 catalog
CREATE CATALOG `tpch` PROPERTIES (
"type" = "trino-connector",
"trino.connector.name" = "tpch",
"trino.tpch.column-naming" = "STANDARD",
"trino.tpch.splits-per-node" = "32"
);
-- 导出 Catalog 外表数据
EXPORT TABLE tpch.sf1.lineitem TO "file:///path/to/exp_"
PROPERTIES(
"parallelism" = "5",
"format" = "csv",
"max_file_size" = "1024MB"
);
当前 Export 导出 Catalog 外表数据不支持并发导出,即使指定 parallelism 大于 1,仍然是单线程导出。
最佳实践
导出一致性
Export
导出支持 partition / tablets 两种粒度。data_consistency
参数用来指定以何种粒度切分希望导出的表,none
代表 Tablets 级别,partition
代表 Partition 级别。
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
"format" = "parquet",
"data_consistency" = "partition",
"max_file_size" = "512MB"
);
若设置"data_consistency" = "partition"
,Export 任务底层构造的多个SELECT INTO OUTFILE
语句都会导出不同的 partition。
若设置"data_consistency" = "none"
,Export 任务底层构造的多个SELECT INTO OUTFILE
语句都会导出不同的 tablets,但是这些不同的 tablets 有可能属于相同的 partition。
关于 Export 底层构造 SELECT INTO OUTFILE
的逻辑,可参阅附录部分。
导出作业并发度
Export 可以设置不同的并发度来并发导出数据。指定并发度为 5:
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "512MB",
"parallelism" = "5"
);
关于 Export 并发导出的原理,可参阅附录部分。
导出前清空导出目录
EXPORT TABLE test TO "file:///home/user/tmp"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "512MB",
"delete_existing_files" = "true"
);
如果设置了 "delete_existing_files" = "true"
,导出作业会先将/home/user/
目录下所有文件及目录删除,然后导出数据到该目录下。
注意: 若要使用 delete_existing_files 参数,还需要在 fe.conf 中添加配置
enable_delete_existing_files = true
并重启 fe,此时 delete_existing_files 才会生效。delete_existing_files = true 是一个危险的操作,建议只在测试环境中使用。
设置导出文件的大小
导出作业支持设置导出文件的大小,如果单个文件大小超过设定值,则会按照指定大小分成多个文件导出。
EXPORT TABLE test TO "file:///home/user/tmp/"
PROPERTIES (
"format" = "parquet",
"max_file_size" = "512MB"
);
通过设置 "max_file_size" = "512MB"
,则单个导出文件的最大大小为 512MB。
注意事项
内存限制
通常一个 Export 作业的查询计划只有
扫描-导出
两部分,不涉及需要太多内存的计算逻辑。所以通常 2GB 的默认内存限制可以满足需求。但在某些场景下,比如一个查询计划,在同一个 BE 上需要扫描的 Tablet 过多,或者 Tablet 的数据版本过多时,可能会导致内存不足。可以调整 session 变量
exec_mem_limit
来调大内存使用限制。导出数据量
不建议一次性导出大量数据。一个 Export 作业建议的导出数据量最大在几十 GB。过大的导出会导致更多的垃圾文件和更高的重试成本。如果表数据量过大,建议按照分区导出。
另外,Export 作业会扫描数据,占用 IO 资源,可能会影响系统的查询延迟。
导出文件的管理
如果 Export 作业运行失败,已经生成的文件不会被删除,需要用户手动删除。
数据一致性
目前在 export 时只是简单检查 tablets 版本是否一致,建议在执行 export 过程中不要对该表进行导入数据操作。
导出超时
若导出的数据量很大,超过导出的超时时间,则 Export 任务会失败。此时可以在 Export 命令中指定
timeout
参数来增加超时时间并重试 Export 命令。导出失败
在 Export 作业运行过程中,如果 FE 发生重启或切主,则 Export 作业会失败,需要用户重新提交。可以通过
show export
命令查看 Export 任务状态。导出分区数量
一个 Export Job 允许导出的分区数量最大为 2000,可以在 fe.conf 中添加参数
maximum_number_of_export_partitions
并重启 FE 来修改该设置。并发导出
在并发导出时,请注意合理地配置线程数量和并行度,以充分利用系统资源并避免性能瓶颈。在导出过程中,可以实时监控进度和性能指标,以便及时发现问题并进行优化调整。
数据完整性
导出操作完成后,建议验证导出的数据是否完整和正确,以确保数据的质量和完整性。
附录
并发导出原理
Export 任务的底层是执行SELECT INTO OUTFILE
SQL 语句。用户发起一个 Export 任务后,Doris 会根据 Export 要导出的表构造出一个或多个 SELECT INTO OUTFILE
执行计划,随后将这些SELECT INTO OUTFILE
执行计划提交给 Doris 的 Job Schedule 任务调度器,Job Schedule 任务调度器会自动调度这些任务并执行。
默认情况下,Export 任务是单线程执行的。为了提高导出的效率,Export 命令可以设置一个 parallelism
参数来并发导出数据。设置parallelism
大于 1 后,Export 任务会使用多个线程并发的去执行 SELECT INTO OUTFILE
查询计划。parallelism
参数实际就是指定执行 EXPORT 作业的线程数量。
一个 Export 任务构造一个或多个 SELECT INTO OUTFILE
执行计划的具体逻辑是:
选择导出的数据的一致性模型
根据
data_consistency
参数来决定导出的一致性,这个只和语义有关,和并发度无关,用户要先根据自己的需求,选择一致性模型。确定并发度
根据
parallelism
参数确定由多少个线程来运行这些SELECT INTO OUTFILE
执行计划。parallelism 决定了最大可能的线程数。注意:即使 Export 命令设置了
parallelism
参数,该 Export 任务的实际并发线程数量还与 Job Schedule 有关。Export 任务设置多并发后,每一个并发线程都是 Job Schedule 提供的,所以如果此时 Doris 系统任务较繁忙,Job Schedule 的线程资源较紧张,那么有可能分给 Export 任务的实际线程数量达不到parallelism
个数,影响 Export 的并发导出。此时可以通过减轻系统负载或调整 FE 配置async_task_consumer_thread_num
增加 Job Schedule 的总线程数量来缓解这个问题。确定每一个 outfile 语句的任务量
每一个线程会根据
maximum_tablets_of_outfile_in_export
以及数据实际的分区数 / buckets 数来决定要拆分成多少个 outfile。maximum_tablets_of_outfile_in_export
是 FE 的配置,默认值为 10。该参数用于指定 Export 任务切分出来的单个 OutFile 语句中允许的最大 partitions / buckets 数量。修改该配置需要重启 FE。举例:假设一张表共有 20 个 partition,每个 partition 都有 5 个 buckets,那么该表一共有 100 个 buckets。设置
data_consistency = none
以及maximum_tablets_of_outfile_in_export = 10
。parallelism = 5
情况下Export 任务将把该表的 100 个 buckets 分成 5 份,每个线程负责 20 个 buckets。每个线程负责的 20 个 buckets 又将以 10 个为单位分成 2 组,每组 buckets 各由一个 outfile 查询计划负责。所以最终该 Export 任务有 5 个线程并发执行,每个线程负责 2 个 outfile 语句,每个线程负责的 outfile 语句串行的被执行。
parallelism = 3
情况下Export 任务将把该表的 100 个 buckets 分成 3 份,3 个线程分别负责 34、33、33 个 buckets。每个线程负责的 buckets 又将以 10 个为单位分成 4 组(最后一组不足 10 个 buckets),每组 buckets 各由一个 outfile 查询计划负责。所以该 Export 任务最终有 3 个线程并发执行,每个线程负责 4 个 outfile 语句,每个线程负责的 outfile 语句串行的被执行。
parallelism = 120
情况下由于该表 buckets 只有 100 个,所以系统会将
parallelism
强制设为 100,并以parallelism = 100
去执行。Export 任务将把该表的 100 个 buckets 分成 100 份,每个线程负责 1 个 buckets。每个线程负责的 1 个 buckets 又将以 10 个为单位分成 1 组(该组实际就只有 1 个 buckets),每组 buckets 由一个 outfile 查询计划负责。所以最终该 Export 任务有 100 个线程并发执行,每个线程负责 1 个 outfile 语句,每个 outfile 语句实际只导出 1 个 buckets。
当前版本若希望 Export 有一个较好的性能,建议设置以下参数:
- 打开 session 变量
enable_parallel_outfile
。 - 设置 Export 的
parallelism
参数为较大值,使得每一个线程只负责一个SELECT INTO OUTFILE
查询计划。 - 设置 FE 配置
maximum_tablets_of_outfile_in_export
为较小值,使得每一个SELECT INTO OUTFILE
查询计划导出的数据量较小。