跳到主要内容

Beats Doris output plugin

Beats 是一个数据采集 Agent,它支持自定义输出插件将数据写入存储系统,Beats Doris output plugin 是输出到 Doris 的插件。

Beats Doris output plugin 支持 Filebeat, Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Heartbeat

Beats Doris output plugin 调用 Doris Stream Load HTTP 接口将数据实时写入 Doris,提供多线程并发,失败重试,自定义 Stream Load 格式和参数,输出写入速度等能力。

使用 Beats Doris output plugin 主要有三个步骤:

  1. 下载或编译包含 Doris output plugin 的 Beats二进制程序
  2. 配置 Beats 输出地址和其他参数
  3. 启动 Beats 将数据实时写入 Doris

安装

从官网下载

https://apache-doris-releases.oss-accelerate.aliyuncs.com/filebeat-doris-2.0.0

从源码编译

在 extension/beats/ 目录下执行

cd doris/extension/beats

go build -o filebeat-doris filebeat/filebeat.go
go build -o metricbeat-doris metricbeat/metricbeat.go
go build -o winlogbeat-doris winlogbeat/winlogbeat.go
go build -o packetbeat-doris packetbeat/packetbeat.go
go build -o auditbeat-doris auditbeat/auditbeat.go
go build -o heartbeat-doris heartbeat/heartbeat.go

参数配置

Beats Doris output plugin 的配置如下:

配置说明
http_hostsStream Load HTTP 地址,格式是字符串数组,可以有一个或者多个元素,每个元素是 host:port。 例如:["http://fe1:8030", "http://fe2:8030"]
userDoris 用户名,该用户需要有doris对应库表的导入权限
passwordDoris 用户的密码
database要写入的 Doris 库名
table要写入的 Doris 表名
label_prefixDoris Stream Load Label 前缀,最终生成的 Label 为 {label_prefix}{db}{table}{yyyymmdd_hhmmss}{uuid} ,默认值是 beats
headersDoris Stream Load 的 headers 参数,语法格式为 YAML map
codec_format_string输出到 Doris Stream Load 的format string,%{[a][b]} 代表输入中的 a.b 字段,参考后续章节的使用示例
bulk_max_sizeDoris Stream Load 的 batch size,默认为100000
max_retriesDoris Stream Load 请求失败重试次数,默认为 -1 无限重试保证数据可靠性
log_request日志中是否输出 Doris Stream Load 请求和响应元数据,用于排查问题,默认为 true
log_progress_interval日志中输出速度的时间间隔,单位是秒,默认为 10,设置为 0 可以关闭这种日志

使用示例

TEXT 日志采集示例

该示例以 Doris FE 的日志为例展示 TEXT 日志采集。

1. 数据

FE 日志文件一般位于 Doris 安装目录下的 fe/log/fe.log 文件,是典型的 Java 程序日志,包括时间戳,日志级别,线程名,代码位置,日志内容等字段。不仅有正常的日志,还有带 stacktrace 的异常日志,stacktrace 是跨行的,日志采集存储需要把主日志和 stacktrace 组合成一条日志。

2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. 建表

表结构包括日志的产生时间,采集时间,主机名,日志文件路径,日志类型,日志级别,线程名,代码位置,日志内容等字段。

CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED COMMENT '',
INDEX idx_path (`path`) USING INVERTED COMMENT '',
INDEX idx_type (`type`) USING INVERTED COMMENT '',
INDEX idx_level (`level`) USING INVERTED COMMENT '',
INDEX idx_thread (`thread`) USING INVERTED COMMENT '',
INDEX idx_position (`position`) USING INVERTED COMMENT '',
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
COMMENT 'OLAP'
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);

3. 配置

filebeat 日志采集的配置文件如 filebeat_doris_log.yml 是 YAML 格式,主要由 4 部分组成,分别对应 ETL 的各个部分:

  1. input 负责读取原始数据
  2. processor 负责做数据转换
  3. queue.mem 配置 filebeat 内部的缓冲队列
  4. output 负责将数据输出
# 1. input 负责读取原始数据
# type: log 是一个 log input plugin,可以配置读取的日志文件路径,通过 multiline 功能将非时间开头的行拼接到上一行后面,实现 stacktrace 和主日志合并的效果。log input 会将日志内容保存在 message 字段中,另外还有一些元数据字段比如 agent.host,log.file.path。
filebeat.inputs:
- type: log
enabled: true
paths:
- /path/to/your/log
# multiline 可以将跨行的日志(比如Java stacktrace)拼接起来
multiline:
type: pattern
# 效果:以 yyyy-mm-dd HH:MM:SS 开头的行认为是一条新的日志,其他都拼接到上一条日志
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}'
negate: true
match: after
skip_newline: true

# 2. processors 部分负责数据转换
processors:
# 用 js script 插件将日志中的 \t 替换成空格,避免JSON解析报错
- script:
lang: javascript
source: >
function process(event) {
var msg = event.Get("message");
msg = msg.replace(/\t/g, " ");
event.Put("message", msg);
}
# 用 dissect 插件做简单的日志解析
- dissect:
# 2024-06-08 18:26:25,481 INFO (report-thread|199) [ReportHandler.cpuReport():617] begin to handle
tokenizer: "%{day} %{time} %{log_level} (%{thread}) [%{position}] %{content}"
target_prefix: ""
ignore_failure: true
overwrite_keys: true

# 3. 内部的缓冲队列总条数,flush batch 条数,flush 时间间隔
queue.mem:
events: 1000000
flush.min_events: 100000
flush.timeout: 10s

# 4. output 部分负责数据输出
# doris output 将数据输出到 Doris,使用的是 Stream Load HTTP 接口。通过 headers 参数指定了 Stream Load 的数据格式为 JSON,通过 codec_format_string 参数用类似 printf 的方式格式化输出到 Doris 的数据。比如下面的例子基于 filebeat 内部的字段 format 出一个 JSON,这些字段可以是 filebeat 内置字段如 agent.hostname,也可以是 processor 比如 dissect 生产的字段如 day,通过 %{[a][b]} 的方式引用,,Stream Load 会自动将 JSON 字段写入对应的 Doris 表的字段。
output.doris:
fenodes: [ "http://fehost1:http_port", "http://fehost2:http_port", "http://fehost3:http_port" ]
user: "your_username"
password: "your_password"
database: "your_db"
table: "your_table"
# output string format
## %{[agent][hostname]} %{[log][file][path]} 是filebeat自带的metadata
## 常用的 filebeat metadata 还是有采集时间戳 %{[@timestamp]}
## %{[day]} %{[time]} 是上面 dissect 解析得到字段
codec_format_string: '{"ts": "%{[day]} %{[time]}", "host": "%{[agent][hostname]}", "path": "%{[log][file][path]}", "message": "%{[message]}"}'
headers:
format: "json"
read_json_by_line: "true"
load_to_single_tablet: "true"

4. 运行 Filebeat


./filebeat-doris -f config/filebeat_doris_log.yml

# log_request 为 true 时日志会输出每次 Stream Load 的请求参数和响应结果

doris stream load response:
{
"TxnId": 45464,
"Label": "logstash_log_db_doris_log_20240708_223532_539_6c20a0d1-dcab-4b8e-9bc0-76b46a929bd1",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 452,
"NumberLoadedRows": 452,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 277230,
"LoadTimeMs": 1797,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 18,
"ReadDataTimeMs": 9,
"WriteDataTimeMs": 1758,
"CommitAndPublishTimeMs": 18
}

# 默认每隔 10s 会日志输出速度信息,包括自启动以来的数据量(MB 和 ROWS),总速度(MB/s 和 R/S),最近 10s 速度
total 11 MB 18978 ROWS, total speed 0 MB/s 632 R/s, last 10 seconds speed 1 MB/s 1897 R/s

JSON 日志采集示例

该样例以 github events archive 的数据为例展示 JSON 日志采集。

1. 数据

github events archive 是 github 用户操作事件的归档数据,格式是 JSON,可以从 https://www.gharchive.org/ 下载,比如下载 2024年1月1日15点的数据。

wget https://data.gharchive.org/2024-01-01-15.json.gz

下面是一条数据样例,实际一条数据一行,这里为了方便展示进行了格式化。

{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}

2. Doris 建表

CREATE DATABASE log_db;
USE log_db;


CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);

3. Filebeat 配置

这个配置文件和之前 TEXT 日志采集不同的有下面几点:

  1. 没有用 processors,因为不需要额外的处理转换
  2. output 中的 codec_format_string 很简单,直接输出整个 message,也就是原始内容
# input
filebeat.inputs:
- type: log
enabled: true
paths:
- /path/to/your/log

# queue and batch
queue.mem:
events: 1000000
flush.min_events: 100000
flush.timeout: 10s

# output
output.doris:
fenodes: [ "http://fehost1:http_port", "http://fehost2:http_port", "http://fehost3:http_port" ]
user: "your_username"
password: "your_password"
database: "your_db"
table: "your_table"
# output string format
## 直接把原始文件每一行的 message 原样输出,由于 headers 指定了 format: "json",Stream Load 会自动解析 JSON 字段写入对应的 Doris 表的字段。
codec_format_string: '%{[message]}'
headers:
format: "json"
read_json_by_line: "true"
load_to_single_tablet: "true"

4. 运行 Filebeat

./filebeat-doris -f config/filebeat_github_events.yml