跳到主要内容

MaxCompute Catalog

MaxCompute 是阿里云上的企业级 SaaS(Software as a Service)模式云数据仓库。通过 MaxCompute 提供的开放存储 SDK,Doris 可以获取 MaxCompute 的表信息并进行查询和写入操作。

适用场景

场景说明
数据集成读取 MaxCompute 数据并写入到 Doris 内表。
数据写回通过 INSERT 命令将数据写入 MaxCompute 表。(4.1.0 版本支持)

使用须知

  1. 自 2.1.7 版本开始,MaxCompute Catalog 基于 开放存储 SDK 开发,在这之前,基于 Tunnel API 进行开发。

  2. 开放存储 SDK 的使用有一定的限制,请参照该 文档使用限制 的章节。

  3. 在 Doris 3.1.3 版本之前,MaxCompute 中的 Project 相当于 Doris 中的 Database。3.1.3 版本中,可以通过 mc.enable.namespace.schema 参数引入 MaxCompute 的 schema 层级。

配置 Catalog

语法

CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (
'type' = 'max_compute',
{McRequiredProperties},
{McOptionalProperties},
{CommonProperties}
);
  • {McRequiredProperties}

    属性名说明支持的 Doris 版本
    mc.default.project想要访问的 MaxCompute 项目名称。可以在 MaxCompute 项目列表 中创建和管理。
    mc.access_keyAccessKey。可以在 阿里云控制台 中创建和管理。
    mc.secret_keySecretKey。可以在 阿里云控制台 中创建和管理。
    mc.regionMaxCompute 开通的地域。可以从 Endpoint 中找到对应的 Region2.1.7(不含)之前
    mc.endpointMaxCompute 开通的地域。请参照下文的如何获取 Endpoint 和 Quota 来配置。2.1.7(含)之后
  • {McOptionalProperties}

    属性名默认值说明支持的 Doris 版本
    mc.tunnel_endpoint参考附录中的「自定义服务地址」。2.1.7(不含)之前
    mc.odps_endpoint参考附录中的「自定义服务地址」。2.1.7(不含)之前
    mc.quotapay-as-you-goQuota 名称。请参照下文的「如何获取 Endpoint 和 Quota」来配置。2.1.7(含)之后
    mc.split_strategybyte_size设置 split 的划分方式,可设置为按照字节大小划分 byte_size 和按照数据行数划分 row_count2.1.7(含)之后
    mc.split_byte_size268435456每个 split 读取的文件大小,单位为字节,默认为 256MB,当且仅当 "mc.split_strategy" = "byte_size" 时生效2.1.7(含)之后
    mc.split_row_count1048576每个 split 读多少行,当且仅当 "mc.split_strategy" = "row_count" 时生效2.1.7(含)之后
    mc.split_cross_partitionfalse生成的 split 是否跨分区。2.1.8(含)之后
    mc.connect_timeout10s连接 MaxCompute 的超时时间。2.1.8(含)之后
    mc.read_timeout120s读取 MaxCompute 的超时时间。2.1.8(含)之后
    mc.retry_count4超时后的重试次数。2.1.8(含)之后
    mc.datetime_predicate_push_downtrue是否允许下推 timestamp/timestamp_ntz 类型的谓词条件。Doris 对这两个类型的同步会丢失精度(9 -> 6)。因此如果原数据精度高于 6 位,则条件下推可能导致结果不准确。2.1.9/3.0.5(含)之后
    mc.account_formatname阿里云国际站和中国站的账号系统不一致,对于国际站用户,如出现如 user 'RAM$xxxxxx:xxxxx' is not a valid aliyun account 的错误,可指定该参数为 id3.0.9/3.1.1(含)之后
    mc.enable.namespace.schemafalse是否支持 MaxCompute 的 schema 层级。详见:https://help.aliyun.com/zh/maxcompute/user-guide/schema-related-operations3.1.3(含)之后
  • {CommonProperties}

    CommonProperties 部分用于填写通用属性。请参阅数据目录概述中「通用属性」部分。

支持的 MaxCompute 版本

仅支持公有云版本的 MaxCompute。私有云版本支持请联系 Doris 社区支持。

支持的 MaxCompute 表

  • 支持读取分区表、聚簇表、物化视图。

  • 不支持读取 MaxCompute 的外部表、逻辑视图、Delta Table。

层级映射

  • mc.enable.namespace.schema 为 false

    DorisMaxCompute
    CatalogN/A
    DatabaseProject
    TableTable
  • mc.enable.namespace.schema 为 true

    DorisMaxCompute
    CatalogProject
    DatabaseSchema
    TableTable

列类型映射

MaxCompute TypeDoris TypeComment
boleanboolean
tinytinyint
tinyinttinyint
smallintsmallint
intint
bigintbigint
floatfloat
doubledouble
decimal(P, S)decimal(P, S)1 <= P <= 38, 0 <= scale <= 18
char(N)char(N)
varchar(N)varchar(N)
stringstring
datedate
datetimedatetime(3)固定映射到精度 3。可以通过 SET [GLOBAL] time_zone = 'Asia/Shanghai' 来指定时区。
timestamp_ntzdatetime(6)MaxCompute 的 timestamp_ntz 精度为 9,Doris 的 DATETIME 最大精度只有 6,故读取数据时会将多的部分直接截断。
timestampdatetime(6)自 2.1.9/3.0.5 支持。MaxCompute 的 timestamp 精度为 9,Doris 的 DATETIME 最大精度只有 6,故读取数据时会将多的部分直接截断。
arrayarray
mapmap
structstruct
otherUNSUPPORTED

基础示例

CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.default.project' = 'project',
'mc.access_key' = 'sk',
'mc.secret_key' = 'ak',
'mc.endpoint' = 'http://service.cn-beijing-vpc.MaxCompute.aliyun-inc.com/api'
);

如使用 2.1.7(不含)之前的版本,请使用如下语句。(建议升级到 2.1.8 后使用)

CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.region' = 'cn-beijing',
'mc.default.project' = 'project',
'mc.access_key' = 'ak',
'mc.secret_key' = 'sk',
'mc.odps_endpoint' = 'http://service.cn-beijing.maxcompute.aliyun-inc.com/api',
'mc.tunnel_endpoint' = 'http://dt.cn-beijing.maxcompute.aliyun-inc.com'
);

支持 Schema:

CREATE CATALOG mc_catalog PROPERTIES (
'type' = 'max_compute',
'mc.region' = 'cn-beijing',
'mc.default.project' = 'project',
'mc.access_key' = 'ak',
'mc.secret_key' = 'sk',
'mc.odps_endpoint' = 'http://service.cn-beijing.maxcompute.aliyun-inc.com/api',
'mc.tunnel_endpoint' = 'http://dt.cn-beijing.maxcompute.aliyun-inc.com',
'mc.enable.namespace.schema' = 'true'
);

查询操作

基础查询

-- 1. switch to catalog, use database and query
SWITCH mc_ctl;
USE mc_ctl;
SELECT * FROM mc_tbl LIMIT 10;

-- 2. use mc database directly
USE mc_ctl.mc_db;
SELECT * FROM mc_tbl LIMIT 10;

-- 3. use full qualified name to query
SELECT * FROM mc_ctl.mc_db.mc_tbl LIMIT 10;

写入操作

自 4.1.0 版本开始,Doris 支持对 MaxCompute 表的写入操作。您可以通过标准的 INSERT 语句,将其他数据源的数据通过 Doris 直接写入 MaxCompute 表。

备注
  • 该功能为实验功能,自 4.1.0 版本开始支持。
  • 支持写入分区表和非分区表。
  • 不支持聚簇表、事务表、Delta Table 和外部表的写入。

INSERT INTO

INSERT 操作会将数据以追加的方式写入到目标表中。

例如:

INSERT INTO mc_tbl values (val1, val2, val3, val4);
INSERT INTO mc_tbl SELECT col1, col2 FROM internal.db1.tbl1;

INSERT INTO mc_tbl(col1, col2) values (val1, val2);
INSERT INTO mc_tbl(col1, col2, partition_col1, partition_col2) values (1, 2, "beijing", "2023-12-12");

-- 指定分区写入 (可以仅指定部分分区列,其余分区动态写入)
INSERT INTO mc_tbl PARTITION(ds='20250201') SELECT id, name FROM source_tbl;
INSERT INTO mc_tbl PARTITION(ds='20250101', region='bj') VALUES (1, 'v1'), (2, 'v2');

INSERT OVERWRITE

INSERT OVERWRITE 会使用新的数据完全覆盖原有表中的数据。

INSERT OVERWRITE TABLE mc_tbl VALUES(val1, val2, val3, val4);
INSERT OVERWRITE TABLE mc_tbl(col1, col2) SELECT col1, col2 FROM internal.db1.tbl1;

-- 指定分区写入
INSERT OVERWRITE TABLE mc_tbl PARTITION(ds='20250101') VALUES (10, 'new1');

CTAS

可以通过 CTAS 语句创建 MaxCompute 表并写入数据:

CREATE TABLE mc_tbl AS SELECT * FROM other_table;

库表管理

自 4.1.0 版本,Doris 支持创建和删除 MaxCompute 的库表。

备注
  • 该功能为实验功能,自 4.1.0 版本开始支持。
  • 支持创建和删除分区表和非分区表。
  • 不支持创建聚簇表、事务表、Delta Table 和外部表。

该功能仅在 mc.enable.namespace.schema 属性为 true 时可用。

创建和删除库

可以通过 SWITCH 语句切换到对应的 Catalog 下,执行 CREATE DATABASE 语句:

SWITCH mc;
CREATE DATABASE [IF NOT EXISTS] mc_schema;

也可以使用全限定名创建:

CREATE DATABASE [IF NOT EXISTS] mc.mc_schema;

删除库:

DROP DATABASE [IF EXISTS] mc.mc_schema;
警告

对于 MaxCompute Database,删除后,会同时删除其下的所有表。

创建和删除表

  • 创建

    Doris 支持在 MaxCompute 中创建分区或非分区表。

    例如:

    CREATE TABLE mc_schema.mc_tbl1 (
    bool_col BOOLEAN,
    int_col INT,
    bigint_col BIGINT,
    float_col FLOAT,
    double_col DOUBLE,
    decimal_col DECIMAL(18,6),
    string_col STRING,
    varchar_col VARCHAR(200),
    char_col CHAR(50),
    date_col DATE,
    datetime_col DATETIME,
    arr_col ARRAY<STRING>,
    map_col MAP<STRING, STRING>,
    struct_col STRUCT<f1:STRING, f2:INT>
    );

    CREATE TABLE mc_schema.mc_tbl2 (
    id INT,
    val STRING,
    ds STRING,
    region STRING
    )
    PARTITION BY (ds, region)();
  • 删除

    可以通过 DROP TABLE 语句删除一个 MaxCompute 表。当前删除表后,会同时删除数据,包括分区数据。

    例如:

    DROP TABLE [IF EXISTS] mc_tbl;

附录

如何获取 Endpoint 和 Quota(适用于 Doris 2.1.7 之后)

  1. 如果使用数据传输服务独享资源组

    请参照该文档中「使用独享数据服务资源组」章节中的「2. 授权」来开启相应的权限,并在「配额(Quota)管理」列表中,查看并复制对应的 QuotaName,指定 "mc.quota" = "QuotaName"。此时您可以选择 VPC 或公网来访问 MaxCompute,但是走 VPC 的带宽有保障,公网带宽资源小。

  2. 如果使用按量付费

    请参照该文档中「使用开放存储(按量付费)」的章节,来开启开放存储(Storage API)开关,并给 AK、SK 对应的用户赋予权限。此时 mc.quota 为默认值 pay-as-you-go,不需要额外指定该值。按量付费情况下,只能使用 VPC 来访问 MaxCompute,无法通过公网访问。只有预付费用户才能通过公网访问 MaxCompute。

  3. 根据阿里云 Endpoints 文档中的「地域 Endpoint 对照表」来配置 mc.endpoint

    使用 VPC 访问的用户,需要根据「各地域 Endpoint 对照表(阿里云 VPC 网络连接方式)」表中的「VPC 网络 Endpoint」列来配置 mc.endpoint。使用公网访问的用户,可以选择「各地域 Endpoint 对照表(阿里云经典网络连接方式)」表中的「经典网络 Endpoint」列、或者选择「各地域 Endpoint 对照表(外网连接方式)」表中的「外网 Endpoint」列来配置 mc.endpoint

自定义服务地址 (适用于 Doris 2.1.7 之前)

在 Doris 2.1.7 之前的版本中,使用 Tunnel SDK 与 MaxCompute 交互,因此需要使用以下两个 endpoint 属性:

  • mc.odps_endpoint:MaxCompute Endpoint,用于获取 MaxCompute 元数据(库表信息)。

  • mc.tunnel_endpoint:Tunnel Endpoint,用于读取 MaxCompute 数据。

默认情况下,MaxCompute Catalog 根据 mc.regionmc.public_access 去生成 endpoint。

生成后的格式如下:

mc.public_accessmc.odps_endpointmc.tunnel_endpoint
falsehttp://service.{mc.region}.maxcompute.aliyun-inc.com/apihttp://dt.{mc.region}.maxcompute.aliyun-inc.com
truehttp://service.{mc.region}.maxcompute.aliyun.com/apihttp://dt.{mc.region}.maxcompute.aliyun.com

用户也可以单独指定 mc.odps_endpointmc.tunnel_endpoint 来自定义服务地址,适用于一些私有部署的 MaxCompute 环境。

MaxCompute Endpoint 和 Tunnel Endpoint 的配置请参见各地域及不同网络连接方式下的 Endpoint

资源使用控制

用户可以通过调整 parallel_pipeline_task_numnum_scanner_threads 这两个 Session Variable 来调整表级别请求并发数量,以控制数据传输服务中的资源消耗。其对应的并发数量等于 max(parallel_pipeline_task_num * be num * num_scanner_threads)

需要注意:

  1. 该方法只能控制单个 Query 中单张表的并发请求数量,无法控制多个 SQL 的资源使用量。

  2. 降低并发数量意味着会提高 Query 的查询时间。