跳到主要内容

Vector

Vector 是一个采用 Rust 开发的高性能可观测性数据管道,专门用于收集、转换和路由日志、指标以及链路追踪(Trace)数据。为了更好地支持 Doris 生态,Doris 社区为 Vector 开发了 Doris Sink 组件,可将各类数据源的数据通过 Stream Load 高效写入 Apache Doris 进行分析处理。

适用场景

Vector + Doris Sink 主要适用于以下数据接入场景:

  • 应用日志采集:将应用、Web 服务器、数据库等产生的多行/单行文本日志实时写入 Doris。
  • JSON 结构化日志:采集 NDJSON 格式的事件流(如 GitHub Events、Nginx access log 的 JSON 格式等)。
  • 指标与 Trace 数据:将 Prometheus、OpenTelemetry 等来源的可观测性数据落库分析。
  • 多源数据汇聚:通过 Vector 的 Source/Transform 能力,将异构数据源转换后统一写入 Doris。

数据链路

典型的数据流如下:

数据源 (Source) → Vector Transform → Doris Sink (Stream Load) → Apache Doris

安装部署

方式一:下载预编译安装包

适用于 x86_64 Linux 环境,下载后解压即可使用:

wget https://apache-doris-releases.oss-cn-beijing.aliyuncs.com/extension/vector-x86_64-unknown-linux-gnu.tar.gz

方式二:从源码编译

适用于需要自定义构建或非 x86_64 平台的场景:

cd ${VECTOR_HOME}

# 可根据部署环境自行选择,Makefile 中提供了多种目标平台选项
make package-x86_64-unknown-linux-gnu

快速开始

下面的最小示例演示如何使用 Vector 将一个文件中的内容通过 Doris Sink 写入 Doris:

[sources.demo]
type = "file"
include = ["/path/to/input.json"]

[sinks.doris]
type = "doris"
inputs = ["demo"]
endpoints = ["http://fe_ip:8030"]
database = "log_db"
table = "demo_table"

[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""

[sinks.doris.encoding]
codec = "json"

启动 Vector:

${VECTOR_HOME}/bin/vector --config vector_config.toml

使用示例

示例一:TEXT 多行日志采集(Doris FE 日志)

该示例演示如何采集 Doris FE 的 Java 程序日志。FE 日志包含时间戳、日志级别、线程名、代码位置、日志内容等字段,且常见带有跨行的 stacktrace 异常信息,需要将主日志与 stacktrace 合并为同一条记录。

1. 数据样例

FE 日志一般位于 Doris 安装目录下的 fe/log/fe.log

2024-07-08 21:18:01,432 INFO (Statistics Job Appender|61) [StatisticsJobAppender.runAfterCatalogReady():70] Stats table not available, skip
2024-07-08 21:18:53,710 WARN (STATS_FETCH-0|208) [StmtExecutor.executeInternalQuery():3332] Failed to run internal SQL: OriginStatement{originStmt='SELECT * FROM __internal_schema.column_statistics WHERE part_id is NULL ORDER BY update_time DESC LIMIT 500000', idx=0}
org.apache.doris.common.UserException: errCode = 2, detailMessage = tablet 10031 has no queryable replicas. err: replica 10032's backend 10008 does not exist or not alive
at org.apache.doris.planner.OlapScanNode.addScanRangeLocations(OlapScanNode.java:931) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.planner.OlapScanNode.computeTabletInfo(OlapScanNode.java:1197) ~[doris-fe.jar:1.2-SNAPSHOT]

2. Doris 建表

表结构包含日志的产生时间、采集时间、主机名、日志文件路径、日志类型、日志级别、线程名、代码位置和日志内容等字段,并对常用查询字段建立倒排索引:

CREATE TABLE `doris_log` (
`log_time` datetime NULL COMMENT 'log content time',
`collect_time` datetime NULL COMMENT 'log agent collect time',
`host` text NULL COMMENT 'hostname or ip',
`path` text NULL COMMENT 'log file path',
`type` text NULL COMMENT 'log type',
`level` text NULL COMMENT 'log level',
`thread` text NULL COMMENT 'log thread',
`position` text NULL COMMENT 'log code position',
`message` text NULL COMMENT 'log message',
INDEX idx_host (`host`) USING INVERTED,
INDEX idx_path (`path`) USING INVERTED,
INDEX idx_type (`type`) USING INVERTED,
INDEX idx_level (`level`) USING INVERTED,
INDEX idx_thread (`thread`) USING INVERTED,
INDEX idx_position (`position`) USING INVERTED,
INDEX idx_message (`message`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
) ENGINE=OLAP
DUPLICATE KEY(`log_time`)
PARTITION BY RANGE(`log_time`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "true",
"compaction_policy" = "time_series"
);

3. Vector 配置

# ==================== Sources ====================
[sources.fe_log_input]
type = "file"
include = ["/path/fe/log/fe.log"]
start_at_beginning = true
max_line_bytes = 102400
ignore_older_secs = 0
fingerprint.strategy = "device_and_inode"

# 多行日志处理:以时间戳开头的行视为新日志,其他行合并到前一行(处理 stacktrace)
multiline.start_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.mode = "halt_before"
multiline.condition_pattern = "^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}"
multiline.timeout_ms = 10000

# ==================== Transforms ====================
# 使用 VRL 解析日志内容
[transforms.parse_log]
inputs = ["fe_log_input"]
type = "remap"
source = '''
# 添加 type 字段
.type = "fe.log"

# 添加 collect_time(使用 Asia/Shanghai 时区,与 log_time 保持一致)
.collect_time = format_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Shanghai")

# 解析日志格式:2024-01-01 12:00:00,123 INFO (thread-name) [position] message
# 使用 (?s) 启用 DOTALL 模式,让 .* 可以匹配换行符(处理多行日志)
parsed, err = parse_regex(.message, r'(?s)^(?P<log_time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>[A-Z]+) \((?P<thread>[^\)]+)\) \[(?P<position>[^\]]+)\] (?P<content>.*)')

if err == null {
.log_time = parsed.log_time
.level = parsed.level
.thread = parsed.thread
.position = parsed.position
# 保留完整的原始 message(包括多行堆栈信息)
} else {
# 解析失败时设置默认值,避免 NULL 导致分区错误
.log_time = .collect_time
.level = "UNKNOWN"
.thread = ""
.position = ""
}

# 提取 host 和 path(Vector 自动添加这些元数据)
.host = .host
.path = .file
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_log"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "doris_log"
label_prefix = "vector_fe_log"
log_request = true

[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

4. 启动 Vector

${VECTOR_HOME}/bin/vector --config vector_fe_log.toml

log_request = true 时,Vector 会输出每次 Stream Load 请求的参数与响应结果,便于调试:

2025-11-19T10:14:40.822071Z  INFO sink{component_kind="sink" component_id=doris component_type=doris}:request{request_id=82}: vector::sinks::doris::service: Doris stream load response received. status_code=200 OK stream_load_status=Successful response={
"TxnId": 169721,
"Label": "vector_fe_log_log_db_doris_log_1763547280791_e2e619ee-4067-4fe8-974e-9f35f0d4e48e",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 10,
"NumberLoadedRows": 10,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 7301,
"LoadTimeMs": 30,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 8,
"ReceiveDataTimeMs": 2,
"CommitAndPublishTimeMs": 18
} internal_log_rate_limit=true

示例二:JSON 日志采集(GitHub Events)

该示例演示如何采集 NDJSON 格式的结构化日志,以 GitHub Events Archive 数据为例。

1. 数据样例

GitHub Events Archive 是 GitHub 用户操作事件的归档数据,格式为 JSON。例如下载 2024 年 1 月 1 日 15 点的数据:

wget https://data.gharchive.org/2024-01-01-15.json.gz

实际数据每行是一个完整的 JSON 对象,下面为了便于阅读进行了格式化展示:

{
"id": "37066529221",
"type": "PushEvent",
"actor": {
"id": 46139131,
"login": "Bard89",
"display_login": "Bard89",
"gravatar_id": "",
"url": "https://api.github.com/users/Bard89",
"avatar_url": "https://avatars.githubusercontent.com/u/46139131?"
},
"repo": {
"id": 780125623,
"name": "Bard89/talk-to-me",
"url": "https://api.github.com/repos/Bard89/talk-to-me"
},
"payload": {
"repository_id": 780125623,
"push_id": 17799451992,
"size": 1,
"distinct_size": 1,
"ref": "refs/heads/add_mvcs",
"head": "f03baa2de66f88f5f1754ce3fa30972667f87e81",
"before": "85e6544ede4ae3f132fe2f5f1ce0ce35a3169d21"
},
"public": true,
"created_at": "2024-04-01T23:00:00Z"
}

2. Doris 建表

利用 VARIANT 类型直接存储嵌套 JSON 对象,并对查询字段建立倒排索引:

CREATE DATABASE log_db;
USE log_db;

CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor` VARIANT,
`repo` VARIANT,
`payload` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor` (`actor`) USING INVERTED,
INDEX `idx_host` (`repo`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"inverted_index_storage_format" = "v2",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);

3. Vector 配置

# ==================== Sources ====================
[sources.github_events_reload]
type = "file"
include = ["/path/2024-01-01-15.json"]
read_from = "beginning"
ignore_checkpoints = true
max_line_bytes = 10485760
ignore_older_secs = 0
line_delimiter = "\n"
fingerprint.strategy = "device_and_inode"

# ==================== Transforms ====================
# 解析 JSON 格式的 GitHub Events 数据,VARIANT 类型可直接存储嵌套对象
[transforms.parse_json]
inputs = ["github_events_reload"]
type = "remap"
source = '''
# 解析 JSON 数据(每行是一个完整的 JSON 对象)
. = parse_json!(.message)

# payload 字段转为 JSON 字符串(TEXT 类型)
.payload = encode_json(.payload)

# 只保留表中需要的字段
. = {
"created_at": .created_at,
"id": .id,
"type": .type,
"public": .public,
"actor": .actor,
"repo": .repo,
"payload": .payload
}
'''

# ==================== Sinks ====================
[sinks.doris]
inputs = ["parse_json"]
type = "doris"
endpoints = ["http://fe_ip:http_port"]
database = "log_db"
table = "github_events"
label_prefix = "vector_github_events"
log_request = true

[sinks.doris.auth]
strategy = "basic"
user = "root"
password = ""

[sinks.doris.encoding]
codec = "json"

[sinks.doris.framing]
method = "newline_delimited"

[sinks.doris.request]
concurrency = 10

[sinks.doris.headers]
format = "json"
read_json_by_line = "true"
load_to_single_tablet = "true"

[sinks.doris.batch]
max_events = 10000
timeout_secs = 3
max_bytes = 100000000

4. 启动 Vector

vector --config vector_config.toml

配置参数参考

Doris Sink 支持丰富的配置选项,可满足不同场景下的数据写入需求。以下按功能分组列出全部可配置项。

基础配置

参数名称类型默认值说明
typestring-固定为 doris
inputsarray-上游数据源名称列表
endpointsarray<string>-Doris FE HTTP/HTTPS 地址,必须包含协议与端口,如 ["https://fe1:8030"]
databasestring / 模板-目标数据库名称,支持 Template
tablestring / 模板-目标表名称,支持模板
label_prefixstring"vector"Stream Load 标签前缀,最终标签形式为 {label_prefix}_{database}_{table}_{timestamp}_{uuid}

认证配置

参数名称类型默认值说明
auth.strategystring"basic"认证策略,目前 Doris 仅支持 Basic Auth
auth.userstring-Doris 用户名
auth.passwordstring-Doris 密码,可配合环境变量或密钥管理系统

请求与并发配置

参数名称类型默认值说明
request.concurrencystring / integer"adaptive"并发策略,支持 "adaptive""none"(串行)或正整数并发上限
request.timeout_secsinteger60单次 Stream Load 请求的超时(秒)
request.rate_limit_duration_secsinteger1速率限制时间窗(秒)
request.rate_limit_numintegeri64::MAX每个时间窗内允许的请求数,默认近似无限制
request.retry_attemptsintegerusize::MAXTower 中间件的最大重试次数,缺省表示无限重试
request.retry_initial_backoff_secsinteger1第一次重试前的等待时间(秒),后续按 Fibonacci 退避
request.retry_max_duration_secsinteger30单次重试退避的最大等待时长(秒)
request.retry_jitter_modestring"full"重试抖动模式,支持 fullnone

自适应并发

仅在 request.concurrency = "adaptive" 时生效。

参数名称类型默认值说明
request.adaptive_concurrency.initial_concurrencyinteger1自适应并发的起始值
request.adaptive_concurrency.max_concurrency_limitinteger200自适应并发的上限,防止过载
request.adaptive_concurrency.decrease_ratiofloat0.9触发降速时使用的缩减比例
request.adaptive_concurrency.ewma_alphafloat0.4RTT 指标的指数移动平均权重
request.adaptive_concurrency.rtt_deviation_scalefloat2.5RTT 偏差放大系数,用于忽略正常波动

编码与数据格式

Doris Sink 使用 encoding 区块控制事件序列化行为,默认输出 NDJSON(换行分隔的 JSON):

参数名称类型默认值说明
encoding.codecstring"json"序列化编码,可选 jsontextcsv
encoding.timestamp_formatstring-时间戳输出格式,支持 rfc3339unix
encoding.only_fields / encoding.except_fieldsarray<string>-控制字段白名单或黑名单
encoding.framing.methodstring自动推断自定义帧格式,如 newline_delimitedcharacter_delimited

Stream Load 头部(headers

headers 是一个键值对映射,会直接透传为 Doris Stream Load 的 HTTP 头,可使用 Stream Load 支持的全部 header 参数(所有值均需为字符串)。常见设置如下:

参数名称类型默认值说明
headers.formatstring"json"数据格式,支持 jsoncsvparquet
headers.read_json_by_linestring"true"是否按行读取 JSON(NDJSON)
headers.strip_outer_arraystring"false"是否移除最外层数组
headers.column_separatorstring-CSV 列分隔符(format = csv 时生效)
headers.columnsstring-CSV/JSON 映射的列顺序,如 timestamp,client_ip,status_code
headers.wherestring-Stream Load where 过滤条件

批处理配置

参数名称类型默认值说明
batch.max_bytesinteger10485760单批最大字节数(10 MB)
batch.max_eventsinteger / nullnull单批最大事件数,默认不限制,以字节数为主
batch.timeout_secsfloat1批次最长等待时间(秒)

可靠性与安全配置

参数名称类型默认值说明
max_retriesinteger-1Sink 级别的最大重试次数,-1 表示无限制
log_requestbooleanfalse是否打印每次 Stream Load 请求与响应(生产环境建议按需开启)
compression-未支持-
distribution.retry_initial_backoff_secsinteger1端点健康检查恢复的初始回退时间(秒)
distribution.retry_max_duration_secsinteger3600健康检查最大回退时长(秒)
tls.verify_certificatebooleantrue启用/禁用上游证书校验
tls.verify_hostnamebooleantrue启用/禁用主机名校验
tls.ca_file / tls.crt_file / tls.key_file / tls.key_pass / tls.alpn_protocols / tls.server_name各类-标准 Vector TLS 客户端配置项,用于自定义 CA、双向认证或 SNI
acknowledgements.enabledbooleanfalse启用端到端确认,用于与支持 acknowledgements 的 Source 组合

最佳实践

  • 批次大小:日志类高吞吐场景建议将 batch.max_bytes 调至 100 MB 左右,并适当增大 batch.timeout_secs,以减少 Stream Load 请求次数。
  • 并发策略:稳定上游建议显式设置 request.concurrency 为固定整数;流量波动较大时使用 "adaptive" 让 Vector 自动调节。
  • 单 Tablet 写入:在 headers 中配置 load_to_single_tablet = "true",可在大批量小文件写入时降低 BE 端的资源开销。
  • 动态分区 + 时间序列 Compaction:对日志类表启用 dynamic_partitioncompaction_policy = "time_series",可显著降低存储与合并开销。
  • 生产环境调试:默认关闭 log_request,仅在排查问题时开启,避免日志膨胀。
  • TLS 与认证:通过 tls.* 配置启用 HTTPS 校验;auth.password 建议结合环境变量或密钥管理系统注入,避免明文落盘。