MaxCompute Catalog
MaxCompute 是阿里云上的企业级 SaaS(Software as a Service)模式云数据仓库。通过 MaxCompute 提供的开放存储 SDK,Doris 可以获取 MaxCompute 的表信息并进行查询和写入操作。
适用场景
| 场景 | 说明 |
|---|---|
| 数据集成 | 读取 MaxCompute 数据并写入到 Doris 内表。 |
| 数据写回 | 通过 INSERT 命令将数据写入 MaxCompute 表。(4.1.0 版本支持) |
使用须知
-
自 2.1.7 版本开始,MaxCompute Catalog 基于 开放存储 SDK 开发,在这之前,基于 Tunnel API 进行开发。
-
开放存储 SDK 的使用有一定的限制,请参照该 文档 中
使用限制的章节。 -
在 Doris 3.1.3 版本之前,MaxCompute 中的 Project 相当于 Doris 中的 Database。3.1.3 版本中,可以通过
mc.enable.namespace.schema参数引入 MaxCompute 的 schema 层级。
配置 Catalog
语法
CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (
'type' = 'max_compute',
{McRequiredProperties},
{McOptionalProperties},
{CommonProperties}
);
-
{McRequiredProperties}属性名 说明 支持的 Doris 版本 mc.default.project想要访问的 MaxCompute 项目名称。可以在 MaxCompute 项目列表 中创建和管理。 mc.access_keyAccessKey。可以在 阿里云控制台 中创建和管理。 mc.secret_keySecretKey。可以在 阿里云控制台 中创建和管理。 mc.regionMaxCompute 开通的地域。可以从 Endpoint 中找到对应的 Region 2.1.7(不含)之前 mc.endpointMaxCompute 开通的地域。请参照下文的如何获取 Endpoint 和 Quota 来配置。 2.1.7(含)之后 -
{McOptionalProperties}属性名 默认值 说明 支持的 Doris 版本 mc.tunnel_endpoint无 参考附录中的「自定义服务地址」。 2.1.7(不含)之前 mc.odps_endpoint无 参考附录中的「自定义服务地址」。 2.1.7(不含)之前 mc.quotapay-as-you-goQuota 名称。请参照下文的「如何获取 Endpoint 和 Quota」来配置。 2.1.7(含)之后 mc.split_strategybyte_size设置 split 的划分方式,可设置为按照字节大小划分 byte_size和按照数据行数划分row_count2.1.7(含)之后 mc.split_byte_size268435456每个 split 读取的文件大小,单位为字节,默认为 256MB,当且仅当 "mc.split_strategy" = "byte_size"时生效2.1.7(含)之后 mc.split_row_count1048576每个 split 读多少行,当且仅当 "mc.split_strategy" = "row_count"时生效2.1.7(含)之后 mc.split_cross_partitionfalse生成的 split 是否跨分区。 2.1.8(含)之后 mc.connect_timeout10s连接 MaxCompute 的超时时间。 2.1.8(含)之后 mc.read_timeout120s读取 MaxCompute 的超时时间。 2.1.8(含)之后 mc.retry_count4超时后的重试次数。 2.1.8(含)之后 mc.datetime_predicate_push_downtrue是否允许下推 timestamp/timestamp_ntz类型的谓词条件。Doris 对这两个类型的同步会丢失精度(9 -> 6)。因此如果原数据精度高于 6 位,则条件下推可能导致结果不准确。2.1.9/3.0.5(含)之后 mc.account_formatname阿里云国际站和中国站的账号系统不一致,对于国际站用户,如出现如 user 'RAM$xxxxxx:xxxxx' is not a valid aliyun account的错误,可指定该参数为id。3.0.9/3.1.1(含)之后 mc.enable.namespace.schemafalse是否支持 MaxCompute 的 schema 层级。详见:https://help.aliyun.com/zh/maxcompute/user-guide/schema-related-operations 3.1.3(含)之后 -
{CommonProperties}CommonProperties 部分用于填写通用属性。请参阅数据目录概述中「通用属性」部分。
支持的 MaxCompute 版本
仅支持公有云版本的 MaxCompute。私有云版本支持请联系 Doris 社区支持。
支持的 MaxCompute 表
-
支持读取分区表、聚簇表、物化视图。
-
不支持读取 MaxCompute 的外部表、逻辑视图、Delta Table。
层级映射
-
mc.enable.namespace.schema为 falseDoris MaxCompute Catalog N/A Database Project Table Table -
mc.enable.namespace.schema为 trueDoris MaxCompute Catalog Project Database Schema Table Table
列类型映射
| MaxCompute Type | Doris Type | Comment |
|---|---|---|
| bolean | boolean | |
| tiny | tinyint | |
| tinyint | tinyint | |
| smallint | smallint | |
| int | int | |
| bigint | bigint | |
| float | float | |
| double | double | |
| decimal(P, S) | decimal(P, S) | 1 <= P <= 38, 0 <= scale <= 18 |
| char(N) | char(N) | |
| varchar(N) | varchar(N) | |
| string | string | |
| date | date | |
| datetime | datetime(3) | 固定映射到精度 3。可以通过 SET [GLOBAL] time_zone = 'Asia/Shanghai' 来指定时区。 |
| timestamp_ntz | datetime(6) | MaxCompute 的 timestamp_ntz 精度为 9,Doris 的 DATETIME 最大精度只有 6,故读取数据时会将多的部分直接截断。 |
| timestamp | datetime(6) | 自 2.1.9/3.0.5 支持。MaxCompute 的 timestamp 精度为 9,Doris 的 DATETIME 最大精度只有 6,故读取数据时会将多的部分直接截断。 |
| array | array | |
| map | map | |
| struct | struct | |
| other | UNSUPPORTED |
基础示例
CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.default.project' = 'project',
'mc.access_key' = 'sk',
'mc.secret_key' = 'ak',
'mc.endpoint' = 'http://service.cn-beijing-vpc.MaxCompute.aliyun-inc.com/api'
);
如使用 2.1.7(不含)之前的版本,请使用如下语句。(建议升级到 2.1.8 后使用)
CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.region' = 'cn-beijing',
'mc.default.project' = 'project',
'mc.access_key' = 'ak',
'mc.secret_key' = 'sk',
'mc.odps_endpoint' = 'http://service.cn-beijing.maxcompute.aliyun-inc.com/api',
'mc.tunnel_endpoint' = 'http://dt.cn-beijing.maxcompute.aliyun-inc.com'
);
支持 Schema:
CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.region' = 'cn-beijing',
'mc.default.project' = 'project',
'mc.access_key' = 'ak',
'mc.secret_key' = 'sk',
'mc.odps_endpoint' = 'http://service.cn-beijing.maxcompute.aliyun-inc.com/api',
'mc.tunnel_endpoint' = 'http://dt.cn-beijing.maxcompute.aliyun-inc.com',
'mc.enable.namespace.schema' = 'true'
);
查询操作
基础查询
-- 1. switch to catalog, use database and query
SWITCH mc_ctl;
USE mc_ctl;
SELECT * FROM mc_tbl LIMIT 10;
-- 2. use mc database directly
USE mc_ctl.mc_db;
SELECT * FROM mc_tbl LIMIT 10;
-- 3. use full qualified name to query
SELECT * FROM mc_ctl.mc_db.mc_tbl LIMIT 10;
写入操作
自 4.1.0 版本开始,Doris 支持对 MaxCompute 表的写入操作。您可以通过标准的 INSERT 语句,将其他数据源的数据通过 Doris 直接写入 MaxCompute 表。
- 该功能为实验功能,自 4.1.0 版本开始支持。
- 支持写入分区表和非分区表。
- 不支持聚簇表、事务表、Delta Table 和外部表的写入。
INSERT INTO
INSERT 操作会将数据以追加的方式写入到目标表中。
例如:
INSERT INTO mc_tbl values (val1, val2, val3, val4);
INSERT INTO mc_tbl SELECT col1, col2 FROM internal.db1.tbl1;
INSERT INTO mc_tbl(col1, col2) values (val1, val2);
INSERT INTO mc_tbl(col1, col2, partition_col1, partition_col2) values (1, 2, "beijing", "2023-12-12");
-- 指定分区写入 (可以仅指定部分分区列,其余分区动态写入)
INSERT INTO mc_tbl PARTITION(ds='20250201') SELECT id, name FROM source_tbl;
INSERT INTO mc_tbl PARTITION(ds='20250101', region='bj') VALUES (1, 'v1'), (2, 'v2');
INSERT OVERWRITE
INSERT OVERWRITE 会使用新的数据完全覆盖原有表中的数据。
INSERT OVERWRITE TABLE mc_tbl VALUES(val1, val2, val3, val4);
INSERT OVERWRITE TABLE mc_tbl(col1, col2) SELECT col1, col2 FROM internal.db1.tbl1;
-- 指定分区写入
INSERT OVERWRITE TABLE mc_tbl PARTITION(ds='20250101') VALUES (10, 'new1');
CTAS
可以通过 CTAS 语句创建 MaxCompute 表并写入数据:
CREATE TABLE mc_tbl AS SELECT * FROM other_table;
库表管理
自 4.1.0 版本,Doris 支持创建和删除 MaxCompute 的库表。
- 该功能为实验功能,自 4.1.0 版本开始支持。
- 支持创建和删除分区表和非分区表。
- 不支持创建聚簇表、事务表、Delta Table 和外部表。
该功能仅在
mc.enable.namespace.schema属性为true时可用。
创建和删除库
可以通过 SWITCH 语句切换到对应的 Catalog 下,执行 CREATE DATABASE 语句:
SWITCH mc;
CREATE DATABASE [IF NOT EXISTS] mc_schema;
也可以使用全限定名创建:
CREATE DATABASE [IF NOT EXISTS] mc.mc_schema;
删除库:
DROP DATABASE [IF EXISTS] mc.mc_schema;
对于 MaxCompute Database,删除后,会同时删除其下的所有表。
创建和删除表
-
创建
Doris 支持在 MaxCompute 中创建分区或非分区表。
例如:
CREATE TABLE mc_schema.mc_tbl1 (
bool_col BOOLEAN,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
decimal_col DECIMAL(18,6),
string_col STRING,
varchar_col VARCHAR(200),
char_col CHAR(50),
date_col DATE,
datetime_col DATETIME,
arr_col ARRAY<STRING>,
map_col MAP<STRING, STRING>,
struct_col STRUCT<f1:STRING, f2:INT>
);
CREATE TABLE mc_schema.mc_tbl2 (
id INT,
val STRING,
ds STRING,
region STRING
)
PARTITION BY (ds, region)(); -
删除
可以通过
DROP TABLE语句删除一个 MaxCompute 表。当前删除表后,会同时删除数据,包括分区数据。例如:
DROP TABLE [IF EXISTS] mc_tbl;
附录
如何获取 Endpoint 和 Quota(适用于 Doris 2.1.7 之后)
-
如果使用数据传输服务独享资源组
请参照该文档中「使用独享数据服务资源组」章节中的「2. 授权」来开启相应的权限,并在「配额(Quota)管理」列表中,查看并复制对应的
QuotaName,指定"mc.quota" = "QuotaName"。此时您可以选择 VPC 或公网来访问 MaxCompute,但是走 VPC 的带宽有保障,公网带宽资源小。 -
如果使用按量付费
请参照该文档中「使用开放存储(按量付费)」的章节,来开启开放存储(Storage API)开关,并给 AK、SK 对应的用户赋予权限。此时
mc.quota为默认值pay-as-you-go,不需要额外指定该值。按量付费情况下,只能使用 VPC 来访问 MaxCompute,无法通过公网访问。只有预付费用户才能通过公网访问 MaxCompute。 -
根据阿里云 Endpoints 文档中的「地域 Endpoint 对照表」来配置
mc.endpoint使用 VPC 访问的用户,需要根据「各地域 Endpoint 对照表(阿里云 VPC 网络连接方式)」表中的「VPC 网络 Endpoint」列来配置
mc.endpoint。使用公网访问的用户,可以选择「各地域 Endpoint 对照表(阿里云经典网络连接方式)」表中的「经典网络 Endpoint」列、或者选择「各地域 Endpoint 对照表(外网连接方式)」表中的「外网 Endpoint」列来配置mc.endpoint。
自定义服务地址 (适用于 Doris 2.1.7 之前)
在 Doris 2.1.7 之前的版本中,使用 Tunnel SDK 与 MaxCompute 交互,因此需要使用以下两个 endpoint 属性:
-
mc.odps_endpoint:MaxCompute Endpoint,用于获取 MaxCompute 元数据(库表信息)。 -
mc.tunnel_endpoint:Tunnel Endpoint,用于读取 MaxCompute 数据。
默认情况下,MaxCompute Catalog 根据 mc.region 和 mc.public_access 去生成 endpoint。
生成后的格式如下:
mc.public_access | mc.odps_endpoint | mc.tunnel_endpoint |
|---|---|---|
| false | http://service.{mc.region}.maxcompute.aliyun-inc.com/api | http://dt.{mc.region}.maxcompute.aliyun-inc.com |
| true | http://service.{mc.region}.maxcompute.aliyun.com/api | http://dt.{mc.region}.maxcompute.aliyun.com |
用户也可以单独指定 mc.odps_endpoint 和 mc.tunnel_endpoint 来自定义服务地址,适用于一些私有部署的 MaxCompute 环境。
MaxCompute Endpoint 和 Tunnel Endpoint 的配置请参见各地域及不同网络连接方式下的 Endpoint。
资源使用控制
用户可以通过调整 parallel_pipeline_task_num、num_scanner_threads 这两个 Session Variable 来调整表级别请求并发数量,以控制数据传输服务中的资源消耗。其对应的并发数量等于 max(parallel_pipeline_task_num * be num * num_scanner_threads)。
需要注意:
-
该方法只能控制单个 Query 中单张表的并发请求数量,无法控制多个 SQL 的资源使用量。
-
降低并发数量意味着会提高 Query 的查询时间。