跳到主要内容

Kafka Catalog

概述

Kafka Catalog 通过 Trino Connector 兼容框架,使用 Trino Kafka Connector 来访问 Kafka Topic 数据。

备注
  • 该功能为实验功能,自 3.0.1 版本开始支持。
  • 该功能不依赖 Trino 集群环境,仅使用 Trino 兼容插件。

适用场景

场景支持情况
数据集成读取 Kafka Topic 数据并写入到 Doris 内表
数据写回不支持

版本兼容性

  • Doris 版本:3.0.1 及以上
  • Trino Connector 版本:435
  • Kafka 版本:具体支持的版本请参考 Trino 文档

快速开始

步骤 1:准备 Connector 插件

你可以选择以下两种方式之一来获取 Kafka Connector 插件:

方式一:使用预编译包(推荐)

直接在 这里 找到对应的预编译的插件包并下载解压。

方式二:手动编译

如果需要自定义编译,按照以下步骤操作(需要 JDK 17):

git clone https://github.com/apache/doris-thirdparty.git
cd doris-thirdparty
git checkout trino-435
cd plugin/trino-kafka
mvn clean package -Dmaven.test.skip=true

完成编译后,会在 trino/plugin/trino-kafka/target/ 下得到 trino-kafka-435/ 目录。

步骤 2:部署插件

  1. trino-kafka-435/ 目录放到所有 FE 和 BE 部署路径的 connectors/ 目录下(如果没有该目录,请手动创建):

    ├── bin
    ├── conf
    ├── plugins
    │ ├── connectors
    │ ├── trino-kafka-435
    ...

    也可以通过修改 fe.conftrino_connector_plugin_dir 配置自定义插件路径。如:trino_connector_plugin_dir=/path/to/connectors/

  2. 重启所有 FE 和 BE 节点,以确保 Connector 被正确加载。

步骤 3:创建 Catalog

基础配置

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>,<broekr2>:<port2>',
'trino.kafka.table-names' = 'test_db.topic_name',
'trino.kafka.hide-internal-columns' = 'false'
);

使用配置文件

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>,<broekr2>:<port2>',
'trino.kafka.config.resources' = '/path/to/kafka-client.properties',
'trino.kafka.hide-internal-columns' = 'false'
);

配置默认 Schema

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>,<broekr2>:<port2>',
'trino.kafka.default-schema' = 'default_db',
'trino.kafka.hide-internal-columns' = 'false'
);

步骤 4:查询数据

创建 Catalog 后,可以通过以下三种方式查询 Kafka Topic 数据:

-- 方式 1:切换到 Catalog 后查询
SWITCH kafka;
USE kafka_schema;
SELECT * FROM topic_name LIMIT 10;

-- 方式 2:使用两级路径
USE kafka.kafka_schema;
SELECT * FROM topic_name LIMIT 10;

-- 方式 3:使用全限定名
SELECT * FROM kafka.kafka_schema.topic_name LIMIT 10;

Schema Registry 集成

Kafka Catalog 支持通过 Confluent Schema Registry 来自动获取 Topic 的 Schema 信息,避免手动定义表结构。

配置 Schema Registry

基础认证

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>',
'trino.kafka.table-description-supplier' = 'CONFLUENT',
'trino.kafka.confluent-schema-registry-url' = 'http://<schema-registry-host>:<schema-registry-port>',
'trino.kafka.confluent-schema-registry-auth-type' = 'BASIC_AUTH',
'trino.kafka.confluent-schema-registry.basic-auth.username' = 'admin',
'trino.kafka.confluent-schema-registry.basic-auth.password' = 'admin123',
'trino.kafka.hide-internal-columns' = 'false'
);

完整配置示例

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>',
'trino.kafka.default-schema' = 'nrdp',
'trino.kafka.table-description-supplier' = 'CONFLUENT',
'trino.kafka.confluent-schema-registry-url' = 'http://<schema-registry-host>:<schema-registry-port>',
'trino.kafka.confluent-schema-registry-auth-type' = 'BASIC_AUTH',
'trino.kafka.confluent-schema-registry.basic-auth.username' = 'admin',
'trino.kafka.confluent-schema-registry.basic-auth.password' = 'admin123',
'trino.kafka.config.resources' = '/path/to/kafka-client.properties',
'trino.kafka.confluent-schema-registry-subject-mapping' = 'nrdp.topic1:NRDP.topic1',
'trino.kafka.hide-internal-columns' = 'false'
);

Schema Registry 参数说明

参数名称必填默认值说明
trino.kafka.table-description-supplier-设置为 CONFLUENT 启用 Schema Registry 支持
trino.kafka.confluent-schema-registry-url是*-Schema Registry 服务地址
trino.kafka.confluent-schema-registry-auth-typeNONE认证类型:NONE、BASIC_AUTH、BEARER
trino.kafka.confluent-schema-registry.basic-auth.username-Basic Auth 用户名
trino.kafka.confluent-schema-registry.basic-auth.password-Basic Auth 密码
trino.kafka.confluent-schema-registry-subject-mapping-Subject 名称映射,格式:<db1>.<tbl1>:<topic_name1>,<db2>.<tbl2>:<topic_name2>
提示

使用 Schema Registry 时,Doris 会自动从 Schema Registry 获取 Topic 的 Schema 信息,无需手动创建表结构。

Subject 映射说明

某些情况下,Schema Registry 中注册的 Subject 名称可能和 Kafka 中的 Topic 名称不匹配,导致导致无法查询数据。此时需要通过 confluent-schema-registry-subject-mapping 手动指定映射关系。

-- 将 schema.topic 映射到 Schema Registry 中的 SCHEMA.topic Subject
'trino.kafka.confluent-schema-registry-subject-mapping' = '<db1>.<tbl1>:<topic_name1>'

其中 db1tbl1 为在 Doris 中实际看到的 Database 和 Table 名称。topic_name1 为 Kafka 中实际的 Topic 名称(大小写敏感)。

多个映射可以用逗号分隔:

'trino.kafka.confluent-schema-registry-subject-mapping' = '<db1>.<tbl1>:<topic_name1>,<db2>.<tbl2>:<topic_name2>'

配置说明

Catalog 配置参数

创建 Kafka Catalog 的基本语法如下:

CREATE CATALOG [IF NOT EXISTS] catalog_name PROPERTIES (
'type' = 'trino-connector', -- 必填,固定值
'trino.connector.name' = 'kafka', -- 必填,固定值
{TrinoProperties}, -- Trino Connector 相关属性
{CommonProperties} -- 通用属性
);

TrinoProperties 参数

TrinoProperties 用于配置 Trino Kafka Connector 的专有属性,这些属性以 trino. 为前缀。常用参数包括:

参数名称必填默认值说明
trino.kafka.nodes-Kafka Broker 节点地址列表,格式:host1:port1,host2:port2
trino.kafka.table-names-要映射的 Topic 列表,格式:schema.topic1,schema.topic2
trino.kafka.default-schemadefault默认 Schema 名称
trino.kafka.hide-internal-columnstrue是否隐藏 Kafka 内部列(如 _partition_id_partition_offset 等)
trino.kafka.config.resources-Kafka 客户端配置文件路径
trino.kafka.table-description-supplier-表结构提供方式,设置为 CONFLUENT 使用 Schema Registry
trino.kafka.confluent-schema-registry-url-Schema Registry 服务地址

更多 Kafka Connector 配置参数请参考 Trino 官方文档

CommonProperties 参数

CommonProperties 用于配置 Catalog 的通用属性,例如元数据刷新策略、权限控制等。详细说明请参阅数据目录概述中「通用属性」部分。

Kafka 客户端配置

当需要配置 Kafka 客户端的高级参数(如安全认证、SSL 等)时,可以通过配置文件方式指定。创建一个配置文件(例如 kafka-client.properties):

# ============================================
# Kerberos/SASL 认证配置
# ============================================
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

# JAAS 配置 - 使用 keytab 方式
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
useTicketCache=false \
serviceName="kafka" \
keyTab="/opt/trino/security/keytabs/kafka.keytab" \
principal="kafka@EXAMPLE.COM";

# ============================================
# Avro 反序列化器配置
# ============================================
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

然后在创建 Catalog 时指定该配置文件:

CREATE CATALOG kafka PROPERTIES (
'type' = 'trino-connector',
'trino.connector.name' = 'kafka',
'trino.kafka.nodes' = '<broekr1>:<port1>',
'trino.kafka.config.resources' = '/path/to/kafka-client.properties'
);

数据类型映射

在使用 Kafka Catalog 时,数据类型会按照以下规则进行映射:

Kafka/Avro TypeTrino TypeDoris Type说明
booleanbooleanboolean
intintegerint
longbigintbigint
floatrealfloat
doubledoubledouble
bytesvarbinarystring需要使用 HEX(col) 函数查询
stringvarcharstring
arrayarrayarray
mapmapmap
recordrowstruct复杂嵌套结构
enumvarcharstring
fixedvarbinarystring
null--
提示
  • 对于 bytes 类型,如果需要以十六进制格式显示,请使用 HEX() 函数包裹列名。
  • Kafka Catalog 支持的数据类型取决于所使用的序列化格式(JSON、Avro、Protobuf 等)和 Schema Registry 配置。

Kafka 内部列

Kafka Connector 提供了一些内部列,用于访问 Kafka 消息的元数据信息:

列名类型说明
_partition_idbigint消息所在的分区 ID
_partition_offsetbigint消息在分区中的偏移量
_message_timestamptimestamp消息时间戳
_keyvarchar消息的 Key
_key_corruptbooleanKey 是否损坏
_key_lengthbigintKey 的字节长度
_messagevarchar消息的原始内容
_message_corruptboolean消息是否损坏
_message_lengthbigint消息的字节长度
_headersmap消息头信息

默认情况下,这些内部列是隐藏的。如果需要查询这些列,需要在创建 Catalog 时设置:

'trino.kafka.hide-internal-columns' = 'false'

查询示例:

SELECT 
_partition_id,
_partition_offset,
_message_timestamp,
*
FROM kafka.schema.topic_name
LIMIT 10;

使用限制

  1. 只读访问:Kafka Catalog 仅支持读取数据,不支持写入(INSERT、UPDATE、DELETE)操作。

  2. Table Names 配置:当不使用 Schema Registry 时,需要通过 trino.kafka.table-names 参数显式指定要访问的 Topic 列表。

  3. Schema 定义

    • 使用 Schema Registry 时,Schema 信息自动从 Schema Registry 获取。
    • 不使用 Schema Registry 时,需要手动创建表定义或使用 Trino 的 Topic 描述文件。
  4. 数据格式:支持的数据格式取决于 Topic 使用的序列化方式(JSON、Avro、Protobuf 等),具体请参考 Trino 官方文档

  5. 性能考虑

    • Kafka Catalog 会实时读取 Kafka 数据,大量数据查询可能影响性能。
    • 建议结合 LIMIT 子句或时间过滤条件限制扫描的数据量。

功能调试

可以参考 这里 快速构建 Kafka 环境机型功能验证。

参考资料