Vector
Vector 是一个采用 Rust 开发的高性能可观测性数据管道,专门用于收集、转换和路由日志、指标以及链路追踪(Trace)数据。为了更好地支持 Doris 生态,Doris 社区为 Vector 开发了 Doris Sink 组件,可将各类数据源的数据通过 Stream Load 高效写入 Apache Doris 进行分析处理。
适用场景
Vector + Doris Sink 主要适用于以下数据接入场景:
- 应用日志采集:将应用、Web 服务器、数据库等产生的多行/单行文本日志实时写入 Doris。
- JSON 结构化日志:采集 NDJSON 格式的事件流(如 GitHub Events、Nginx access log 的 JSON 格式等)。
- 指标与 Trace 数据:将 Prometheus、OpenTelemetry 等来源的可观测性数据落库分析。
- 多源数据汇聚:通过 Vector 的 Source/Transform 能力,将异构数据源转换后统一写入 Doris。
数据链路
典型的数据流如下:
数据源 (Source) → Vector Transform → Doris Sink (Stream Load) → Apache Doris
安装部署
方式一:下载预编译安装包
适用于 x86_64 Linux 环境,下载后解压即可使用:
wget https://apache-doris-releases.oss-cn-beijing.aliyuncs.com/extension/vector-x86_64-unknown-linux-gnu.tar.gz
方式二:从源码编译
适用于需要自定义构建或非 x86_64 平台的场景:
cd ${VECTOR_HOME}
# 可根据部署环境自行选择,Makefile 中提供了多种目标平台选项
make package-x86_64-unknown-linux-gnu
快速开始
下面的最小示例演示如何使用 Vector 将一个文件中的内容通过 Doris Sink 写入 Doris:
[sources.demo]
type = "file"
include = ["/path/to/input.json"]
[sinks.doris]
type = "doris"
inputs = ["demo"]
endpoints = ["http://fe_ip:8030"]
database = "log_db"
table = "demo_table"
[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""
[sinks.doris.encoding]
codec = "json"
启动 Vector:
${VECTOR_HOME}/bin/vector --config vector_config.toml
使用示例
示例一:TEXT 多行日志采集(Doris FE 日志)
该示例演示如何采集 Doris FE 的 Java 程序日志。FE 日志包含时间戳、日志级别、线程名、代码位置、日志内容等字段,且常见带有跨行的 stacktrace 异常信息,需要将主日志与 stacktrace 合并为同一条记录。
1. 数据样例
FE 日志一般位于 Doris 安装目录下的 fe/log/fe.log:
2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]
2. Doris 建表
表结构包含日志的产生时间、采集时间、主机名、日志文件路径、日志类型、日志级别、线程名、代码位置和日志内容等字段,并对常用查询字段建立倒排索引:
CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED,
INDEX idx_path (`path`) USING INVERTED,
INDEX idx_type (`type`) USING INVERTED,
INDEX idx_level (`level`) USING INVERTED,
INDEX idx_thread (`thread`) USING INVERTED,
INDEX idx_position (`position`) USING INVERTED,
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);
3. Vector 配置
# ==================== Sources ====================
[sources.fe_log_input]
type = "file"
include = ["/path/fe/log/fe.log"]
start_at_beginning = true
max_line_bytes = 102400
ignore_older_secs = 0
fingerprint.strategy = "device_and_inode"
# 多行日志处理:以时间戳开头的行视为新日志,其他行合并到前一行(处理 stacktrace)
multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.mode = "halt_before"
multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.timeout_ms = 10000
# ==================== Transforms ====================
# 使用 VRL 解析日志内容
[transforms.parse_log]
inputs = ["fe_log_input"]
type = "remap"
source = '''
# 添加 type 字段
.type = "fe.log"
# 添加 collect_time(使用 Asia/Shanghai 时区,与 log_time 保持一致)
.collect_time = format_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Shanghai")
# 解析日志格式:2024-01-01 12:00:00,123 INFO (thread-name) [position] message
# 使用 (?s) 启用 DOTALL 模式,让 .* 可以匹配换行符(处理多行日志)
parsed, err = parse_regex(.message, r'(?s)^(?P<log_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>[A-Z]+) \((?P<thread>[^\)]+)\) \[(?P<position>[^\]]+)\] (?P<content>.*)')
if err == null {
.log_time = parsed.log_time
.level = parsed.level
.thread = parsed.thread
.position = parsed.position
# 保留完整的原始 message(包括多行堆栈信息)
} else {
# 解析失败时设置默认值,避免 NULL 导致分区错误
.log_time = .collect_time
.level = "UNKNOWN"
.thread = ""
.position = ""
}
# 提取 host 和 path(Vector 自动添加这些元数据)
.host = .host
.path = .file
'''
# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_log"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "doris_log"
label_prefix = "vector_fe_log"
log_request = true
[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""
[sinks.doris.encoding]
codec = "json"
[sinks.doris.framing]
method = "newline_delimited"
[sinks.doris.request]
concurrency = 10
[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"
[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000
4. 启动 Vector
${VECTOR_HOME}/bin/vector --config vector_fe_log.toml
当 log_request = true 时,Vector 会输出每次 Stream Load 请求的参数与响应结果,便于调试:
2025-11-19T10:14:40.822071Z INFO sink{component_kind="sink" component_id=doris component_type=doris}:request{request_id=82}: vector::sinks::doris::service: Doris stream load response received. status_code=200 OK stream_load_status=Successful response={
"TxnId": 169721,
"Label": "vector_fe_log_log_db_doris_log_1763547280791_e2e619ee-4067-4fe8-974e-9f35f0d4e48e",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 7301,
"LoadTimeMs": 30,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 8,
"ReceiveDataTimeMs": 2,
"CommitAndPublishTimeMs": 18
} internal_log_rate_limit=true
示例二:JSON 日志采集(GitHub Events)
该示例演示如何采集 NDJSON 格式的结构化日志,以 GitHub Events Archive 数据为例。
1. 数据样例
GitHub Events Archive 是 GitHub 用户操作事件的归档数据,格式为 JSON。例如下载 2024 年 1 月 1 日 15 点的数据:
wget https://data.gharchive.org/2024-01-01-15.json.gz
实际数据每行是一个完整的 JSON 对象,下面为了便于阅读进行了格式化展示:
{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}
2. Doris 建表
利用 VARIANT 类型直接存储嵌套 JSON 对象,并对查询字段建立倒排索引:
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"inverted_index_storage_format" = "v2",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);
3. Vector 配置
# ==================== Sources ====================
[sources.github_events_reload]
type = "file"
include = ["/path/2024-01-01-15.json"]
read_from = "beginning"
ignore_checkpoints = true
max_line_bytes = 10485760
ignore_older_secs = 0
line_delimiter = "\n"
fingerprint.strategy = "device_and_inode"
# ==================== Transforms ====================
# 解析 JSON 格式的 GitHub Events 数据,VARIANT 类型可直接存储嵌套对象
[transforms.parse_json]
inputs = ["github_events_reload"]
type = "remap"
source = '''
# 解析 JSON 数据(每行是一个完整的 JSON 对象)
. = parse_json!(.message)
# payload 字段转为 JSON 字符串(TEXT 类型)
.payload = encode_json(.payload)
# 只保留表中需要的字段
. = {
"created_at": .created_at,
"id": .id,
"type": .type,
"public": .public,
"actor": .actor,
"repo": .repo,
"payload": .payload
}
'''
# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_json"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "github_events"
label_prefix = "vector_github_events"
log_request = true
[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""
[sinks.doris.encoding]
codec = "json"
[sinks.doris.framing]
method = "newline_delimited"
[sinks.doris.request]
concurrency = 10
[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"
[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000
4. 启动 Vector
vector --config vector_config.toml
配置参数参考
Doris Sink 支持丰富的配置选项,可满足不同场景下的数据写入需求。以下按功能分组列出全部可配置项。
基础配置
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
type | string | - | 固定为 doris |
inputs | array | - | 上游数据源名称列表 |
endpoints | array<string> | - | Doris FE HTTP/HTTPS 地址,必须包含协议与端口,如 ["https://fe1:8030"] |
database | string / 模板 | - | 目标数据库名称,支持 Template |
table | string / 模板 | - | 目标表名称,支持模板 |
label_prefix | string | "vector" | Stream Load 标签前缀,最终标签形式为 {label_prefix}_{database}_{table}_{timestamp}_{uuid} |
认证配置
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
auth.strategy | string | "basic" | 认证策略,目前 Doris 仅支持 Basic Auth |
auth.user | string | - | Doris 用户名 |
auth.password | string | - | Doris 密码,可配合环境变量或密钥管理系统 |
请求与并发配置
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
request.concurrency | string / integer | "adaptive" | 并发策略,支持 "adaptive"、"none"(串行)或正整数并发上限 |
request.timeout_secs | integer | 60 | 单次 Stream Load 请求的超时(秒) |
request.rate_limit_duration_secs | integer | 1 | 速率限制时间窗(秒) |
request.rate_limit_num | integer | i64::MAX | 每个时间窗内允许的请求数,默认近似无限制 |
request.retry_attempts | integer | usize::MAX | Tower 中间件的最大重试次数,缺省表示无限重试 |
request.retry_initial_backoff_secs | integer | 1 | 第一次重试前的等待时间(秒),后续按 Fibonacci 退避 |
request.retry_max_duration_secs | integer | 30 | 单次重试退避的最大等待时长(秒) |
request.retry_jitter_mode | string | "full" | 重试抖动模式,支持 full 或 none |
自适应并发
仅在 request.concurrency = "adaptive" 时生效。
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
request.adaptive_concurrency.initial_concurrency | integer | 1 | 自适应并发的起始值 |
request.adaptive_concurrency.max_concurrency_limit | integer | 200 | 自适应并发的上限,防止过载 |
request.adaptive_concurrency.decrease_ratio | float | 0.9 | 触发降速时使用的缩减比例 |
request.adaptive_concurrency.ewma_alpha | float | 0.4 | RTT 指标的指数移动平均权重 |
request.adaptive_concurrency.rtt_deviation_scale | float | 2.5 | RTT 偏差放大系数,用于忽略正常波动 |
编码与数据格式
Doris Sink 使用 encoding 区块控制事件序列化行为,默认输出 NDJSON(换行分隔的 JSON):
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
encoding.codec | string | "json" | 序列化编码,可选 json、text、csv 等 |
encoding.timestamp_format | string | - | 时间戳输出格式,支持 rfc3339、unix 等 |
encoding.only_fields / encoding.except_fields | array<string> | - | 控制字段白名单或黑名单 |
encoding.framing.method | string | 自动推断 | 自定义帧格式,如 newline_delimited、character_delimited |
Stream Load 头部(headers)
headers 是一个键值对映射,会直接透传为 Doris Stream Load 的 HTTP 头,可使用 Stream Load 支持的全部 header 参数(所有值均需为字符串)。常见设置如下:
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
headers.format | string | "json" | 数据格式,支持 json、csv、parquet 等 |
headers.read_json_by_line | string | "true" | 是否按行读取 JSON(NDJSON) |
headers.strip_outer_array | string | "false" | 是否移除最外层数组 |
headers.column_separator | string | - | CSV 列分隔符(format = csv 时生效) |
headers.columns | string | - | CSV/JSON 映射的列顺序,如 timestamp,client_ip,status_code |
headers.where | string | - | Stream Load where 过滤条件 |
批处理配置
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
batch.max_bytes | integer | 10485760 | 单批最大字节数(10 MB) |
batch.max_events | integer / null | null | 单批最大事件数,默认不限制,以字节数为主 |
batch.timeout_secs | float | 1 | 批次最长等待时间(秒) |
可靠性与安全配置
| 参数名称 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_retries | integer | -1 | Sink 级别的最大重试次数,-1 表示无限制 |
log_request | boolean | false | 是否打印每次 Stream Load 请求与响应(生产环境建议按需开启) |
compression | - | 未支持 | - |
distribution.retry_initial_backoff_secs | integer | 1 | 端点健康检查恢复的初始回退时间(秒) |
distribution.retry_max_duration_secs | integer | 3600 | 健康检查最大回退时长(秒) |
tls.verify_certificate | boolean | true | 启用/禁用上游证书校验 |
tls.verify_hostname | boolean | true | 启用/禁用主机名校验 |
tls.ca_file / tls.crt_file / tls.key_file / tls.key_pass / tls.alpn_protocols / tls.server_name | 各类 | - | 标准 Vector TLS 客户端配置项,用于自定义 CA、双向认证或 SNI |
acknowledgements.enabled | boolean | false | 启用端到端确认,用于与支持 acknowledgements 的 Source 组合 |
最佳实践
- 批次大小:日志类高吞吐场景建议将
batch.max_bytes调至 100 MB 左右,并适当增大batch.timeout_secs,以减少 Stream Load 请求次数。 - 并发策略:稳定上游建议显式设置
request.concurrency为固定整数;流量波动较大时使用"adaptive"让 Vector 自动调节。 - 单 Tablet 写入:在
headers中配置load_to_single_tablet = "true",可在大批量小文件写入时降低 BE 端的资源开销。 - 动态分区 + 时间序列 Compaction:对日志类表启用
dynamic_partition与compaction_policy = "time_series",可显著降低存储与合并开销。 - 生产环境调试:默认关闭
log_request,仅在排查问题时开启,避免日志膨胀。 - TLS 与认证:通过
tls.*配置启用 HTTPS 校验;auth.password建议结合环境变量或密钥管理系统注入,避免明文落盘。