跳到主要内容

订阅Kafka日志

订阅Kafka日志

用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。

Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。

订阅 Kafka 消息

订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。

用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

请注意以下使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式如下:
    • csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
    • Json 格式,详见 导入 Json 格式数据
  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

访问 SSL 认证的 Kafka 集群

例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafkaCREATE FILE 命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:

  • 上传文件

    CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
    CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
    CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

上传完成后,可以通过 SHOW FILES 命令查看已上传的文件。

创建例行导入作业

创建例行导入任务的具体命令,请参阅 ROUTINE LOAD 命令手册。这里给出示例:

  1. 访问无认证的 Kafka 集群

    CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
    COLUMNS TERMINATED BY ","
    PROPERTIES
    (
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    • max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。
  2. 访问 SSL 认证的 Kafka 集群

    CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
    COLUMNS TERMINATED BY ",",
    PROPERTIES
    (
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    )
    FROM KAFKA
    (
    "kafka_broker_list"= "broker1:9091,broker2:9091",
    "kafka_topic" = "my_topic",
    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"
    );

查看导入作业状态

查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。

查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。

作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。

具体命令请参阅 STOP ROUTINE LOADPAUSE ROUTINE LOADRESUME ROUTINE LOAD 命令文档。

更多帮助

关于 ROUTINE LOAD 的更多详细语法和最佳实践,请参阅 ROUTINE LOAD 命令手册。