别再把 Collector 当黑箱:OpenTelemetry Collector 拓展与自定义处理器实战指南
作者|Echo_Wish(运维老司机)
先打个比方:OpenTelemetry Collector 就像数据管道中的“门卫”和“搬运工”。接收(receiver)是门口把人接进来,处理器(processor)是门卫做身份证核验和安检,导出器(exporter)才是把人送到目的地。而扩展(extension)则像是门卫的通讯设备、健康检查系统或认证系统。想要把这套流程精细化、跟公司业务规则对接,最直接的办法就是自己写处理器或扩展,把业务逻辑放到 Collector 里去。本文手把手讲思路、给代码样例,并分享运维实战的坑与心得。
为什么要自定义处理器/扩展?别只会套样板
企业常见场景包括:接入特殊日志格式需要预处理、对某些 span/metric 做采样或脱敏、基于组织规则过滤噪音、注入自定义的业务标签、或实现内部鉴权/接入管理。把这些逻辑写成独立的 processor/extension,有这些好处:
- 统一管控:逻辑靠代码、靠 CI,而不是散落在应用里。
- 减少后端负担:早在 Collector 层做过滤与采样,能显著降低后端存储与处理成本。
- 可观测性更强:处理器可以上报自身指标,方便排障。
官方文档也明确:Collector 支持自定义组件,并推荐通过代码方式扩展与构建自定义发行版。
开发前的准备:知识清单(别着急就开工)
- 熟悉 Collector 的组件模型:receiver → processors → exporters;extensions 是运行时的辅助。
- 选好语言:大部分官方组件用 Go 实现,生态与示例也多,推荐用 Go。
- 了解
opentelemetry-collector-contrib仓库和已有 processor 模板,先读一读别人的实现再动手。[4]) - 学会用 OpenTelemetry Collector Builder(ocb)构建自定义发行版,方便调试与发布。
实战:一个“简单脱敏处理器”的最小实现(Go)
下面是一个精简版的 processor 骨架,目标:对 traces 中的某些敏感 attribute 做脱敏(示例仅示意,非生产级完整实现)。
// file: processor.go
package maskprocessor
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processorhelper"
)
type maskProcessor struct{
}
func newMaskProcessor(_ component.Config, _ component.ProcessorCreateSettings) (component.TracesProcessor, error) {
return &maskProcessor{
}, nil
}
func (m *maskProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
rs := td.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
il := rs.At(i).ScopeSpans()
for j := 0; j < il.Len(); j++ {
spans := il.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
attrs := span.Attributes()
if val, ok := attrs.Get("user.email"); ok {
attrs.InsertString("user.email", maskEmail(val.StringVal()))
}
}
}
}
return nil
}
func (m *maskProcessor) Shutdown(context.Context) error {
return nil }
// helper omitted: maskEmail implementation
说明(要点):
- 主体实现
ConsumeTraces,遍历 Resource/Scope/Spans,修改属性。 - 生产实现需注意并发、安全、性能(不要做阻塞型 I/O),并添加配置解析、metrics 与日志。
- 推荐使用
processorhelper提供的工厂方法来简化重试、pipeline 兼容等细节(示例为了清晰省略部分样板)。
配置示例(collector.yaml)
processors:
maskprocessor:
# 这里可以定义需要脱敏的 key 列表、黑名单、白名单等配置
service:
pipelines:
traces:
receivers: [otlp]
processors: [maskprocessor, batch]
exporters: [otlp/mybackend]
说明:把 processor 插入 pipeline,注意顺序(先脱敏再 batch/导出)。
常见坑与运维建议(来自实战)
- 不要在处理器内做长时间阻塞(HTTP 调用、DB 查询等)——会拖慢整个 pipeline,最好把远程调用放异步或通过 sidecar。
- 加上处理器自身的指标与日志,出问题能第一时间发现(比如处理耗时、脱敏命中率)。
- 考虑采样与流控:如果你在处理器里做复杂计算,最好先做抽样或限制并发。
- 使用自定义发行版(ocb)来集成并测试,不要把测试逻辑混进主分支的生产镜像。官方有构建与分发的建议和工具。
- 复用社区实现:很多常见需求(attributes processor、transform processor)社区已有成熟实现,先看是否能复用再造轮子。
小结 — 开发策略与心态
写一个自定义处理器,不是“把业务逻辑塞进去就完事”。好的做法是把它当成运维工程的一部分:要可观测、要可配置、要轻量、要测试。先从简单的“规则+模板”开始,逐步抽象成平台化组件;再把这些组件用 ocb 打包成适合你公司部署模型的 Collector 发行版。别贪快,一开始把痛点最重的几类处理器先做起来:脱敏、过滤、注入业务标签、采样。这样既能快速看到价值,又能把复杂度逐步推向工程化管理。