端采集Agent主要负责采集各类数据,并在对数据进行一些预处理后发送至中心化存储单元做进一步处理。目前,主流端采集Agent都支持采集多种数据源,并提供多种输出渠道选项。除此之外,对于非主打轻量的Agent,其还会提供大量的数据处理模块以提升数据表达能力。现在市场上的端采集系统基本采用如下流式架构,其中处理模块根据不同Agent的特性可进一步拆分为多个子模块:
端采集的各个功能模块通常以插件系统的形式呈现,从而呈现较强的可扩展性,使得开发者无需过多关注Agent的整体上下文即可快速开发插件。在此背景下,如何方便开发者快速验证所开发插件的功能便显得至关重要。对于中间的处理模块,一般通过单元测试即可完成功能验证;而对于输入及输出模块,由于存在外部依赖,单纯的单元测试难以真正验证模块的功能,因此往往需要借助集成测试构造环境才能完成功能验证。下面将介绍几个主流端采集Agent的测试体系,其中既有以单元测试作主体,也有以集成测试为主体。
Fluentd
Fluentd 是一个针对日志的收集、处理及转发系统,其主要架构如下图所示,其中每一个模块都是一个可扩展的插件系统:
Fluentd利用test-unit为开发者提供了一个较为简易轻便的testing driver,用于模拟Fluentd的整个运行过程,其测试体系接近于集成测试的范畴。具体来说,这个driver主要向用户暴露了如下两个功能:
- 模拟输入:用户可以通过driver.feed函数向Fluentd模拟发送数据;
- 模拟路由:为用户提供一个默认的输出,用户可以通过driver.events来获取输出。
用户可以在测试文件中生成这个driver的实例,从而利用其完成测试,推荐的插件文件组织如下:
.
├── Gemfile
├── LICENSE
├── README.md
├── Rakefile
├── fluent-plugin-<your_fluentd_plugin_name>.gemspec
├── lib
│ └── fluent
│ └── plugin
│ └── <plugin_type>_<your_fluentd_plugin_name>.rb
└── test
├── helper.rb
└── plugin
└── test_<plugin_type>_<your_fluentd_plugin_name>.rb
输入模块
输入模块的测试需要外部输入的依赖,Fluentd并没有提供额外的机制来帮助用户建立这种依赖,用户需要在测试时自行搭建输入环境,并利用Fluentd采集数据或向其发送数据。在测试文件中,用户可以通过模拟路由获得输入模块接收到的数据,并进行验证,典型的测试文件如下所示:
require'fluent/test'require'fluent/test/driver/input'require'fluent/test/helpers'require'fluent/plugin/input_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {}) Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf) endtest'emit'dod=create_driver(config) d.run(timeout:0.5) d.events.eachdo |tag, time, record| assert_equal('input.test', tag) assert_equal({ 'foo'=>'bar' }, record) assert(time.is_a?(Fluent::EventTime)) endendend
处理模块
处理模块的测试相对比较简单,既可以借助testing driver以集成测试的方式来进行验证(如Filter),也可通过单纯的单元测试来进行验证(如Parser)。对于集成测试,用户只需通过模拟输入向Fluentd输入数据,然后借助模拟路由来获得输出并验证,典型的测试文件如下所示:
require'fluent/test'require'fluent/test/driver/filter'require'fluent/test/helpers'require'fluent/plugin/filter_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {}) Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf) endtest'filter'dod=create_driver(config) time=event_timed.rundod.feed('filter.test', time, { 'foo'=>'bar', 'message'=>msg }) endassert_equal(1, d.filtered_records.size) endend
输出模块
理论上,输出模块的测试也需要外部输入的依赖,但是fluentd建议对输出模块只需进行format功能的验证即可。具体来说,用户可以通过模拟输入向Fluentd发送数据,然后借助模拟路由来验证输出模块format函数是否运行正常,典型的测试文件如下所示:
require'fluent/test'require'fluent/test/driver/output'require'fluent/test/helpers'require'fluent/plugin/output_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {}) Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf) endtest'emit'dod=create_driver(config) time=event_timed.rundod.feed('output.test', time, { 'foo'=>'bar', 'message'=>msg }) endassert_equal(1, d.events.size) endend
如果要通过集成测试对输出模块进行验证,用户就必须在测试文件中自行编写从输出模块中拉取数据的功能,Kafka提供了一个样例。
Beats
Beats作为 Elastic技术栈中的采集端,是一个免费且开放的数据采集器,涵盖了包括日志采集(Filebeat)、指标采集(Metricbeat)、审计日志采集(Auditbeat)等在内的多类型数据采集。
Beats并没有提供类似于Fluentd一样的testing driver,对于输入和输出模块,它虽然支持集成测试,但还是以go单元测试的形式展现的。具体来说,对于输出插件,Beats建议在普通单元测试的基础上,额外提供以_integration_test.go结尾的测试文件,用于集成测试。但与Fluentd不同的是,Beats支持将外部依赖通过docker-compose来进行本地部署,从而方便了测试环境的搭建。
输入模块
输入模块的测试需要外部输入的依赖,用户可以借助docker- compose运行一个数据源容器和Beats容器,并指定二者之间的交互行为。在测试文件中,用户只需要只需以单元测试的形式编写测试用例,直接从函数返回值中获得模块的输出,从而进行验证。
输出模块
同输入模块一样,输出模块的测试也需要依赖外部环境,用户依然可以借助docker- compose运行一个输出目标容器和Beats容器,然后从Beats容器向目标容器发送数据。为了验证输出模块的正确性,还需要拉取已写入目标容器的数据,通过与写入前数据的对比来验证。以下是Kafka输出插件集成测试的主要框架,原始代码详见链接。
funcTestKafkaPublish(t*testing.T) { // 初始化工作...fori, test :=rangetests { // 测试用例初始化...t.Run(name, func(t*testing.T) { grp, err :=makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) output :=grp.Clients[0].(*client) deferoutput.Close() // publish test eventsvarwgsync.WaitGroupfori :=rangetest.events { batch :=outest.NewBatch(test.events[i].events...) batch.OnSignal=func(_outest.BatchSignal) { wg.Done() } wg.Add(1) output.Publish(batch) } // wait for all published batches to be ACKedwg.Wait() expected :=flatten(test.events) // check we can find all event in topictimeout :=20*time.Secondstored :=testReadFromKafkaTopic(t, test.topic, len(expected), timeout) // validate messagesiflen(expected) !=len(stored) { assert.Equal(t, len(stored), len(expected)) return } validate :=validateJSONiffmt, exists :=test.config["codec.format.string"]; exists { validate=makeValidateFmtStr(fmt.(string)) } seenMsgs :=map[string]struct{}{} for_, s :=rangestored { msg :=validate(t, s.Value, expected) seenMsgs[msg] =struct{}{} } assert.Equal(t, len(expected), len(seenMsgs)) }) } }
Telegraf
Telegraf的测试体系与Beats大体类似,虽然也支持集成测试,但还是以go单元测试的形式展现的。它借助于testcontainer这个包来提供外部环境的本地化部署,从而方便开发者进行本地测试。然而,与Beats不同的是,它并没有规定额外的集成测试文件,而是选择将集成测试和单元测试放置在一个测试文件中。对于输出模块,用户可以选择是否进行集成测试,如果仅需测试输出是否成功,则不用额外编写拉取数据的函数,则形式基本同单元测试;否则的话,用户需要自己编写拉取输出目标数据的功能,从而真正实现集成测试。有关Kafka输出插件的测试文件如链接所示。
Skywalking
Skywalking并不是严格意义上的端采集Agent,它是一个现代化的应用程序性能监控系统,尤其专为云原生、基于容器的分布式系统设计。尽管不是单纯的端采集Agent,Skywalking却提供了一个相对完整方便的端到端的infra测试体系,因此值得在这里进行探讨。
总体来说,Skywalking的测试引擎由一个控制器(controller)来负责调控,整个测试流程分为以下几个阶段:
- 初始化(Setup):构建E2E测试所需的环境依赖,如数据库、后端服务、API等。目前测试引擎提供两种初始化方式,即通过docker-compose或kinD来构建测试环境;
- 触发(Triger):为测试环境提供数据,包括向指定API发送请求和执行本地命令等;
- 验证(Verify):验证处理后的数据是否符合预期;
- 清理(Clenup):结束测试并清理环境。
测试引擎在启动时会读取yaml配置文件,从而决定后续相应行为,配置文件框架如下:
setup# set up the environmentcleanup# clean up the environmenttrigger# generate trafficverify# test cases
初始化
这里以Docker compose为例介绍初始化的相关配置,典型示例如下:
setup env compose file path/to/compose.yaml # Specified docker-compose file path timeout 20m # Timeout duration init-system-environment path/to/env # Import environment file steps# Customize steps for prepare the environmentname customize setups # Step name command command lines # Use command line to setup
利用Docker compose进行环境初始化遵循以下步骤:
- 导入init-system-environment文件以帮助构建服务并执行步骤。文件内容的每一行都是一个环境变量,键值用“=”隔开;
- 启动docker-compose服务;
- 检查服务的健康状况;
- 等待一定时间直到所有的服务都已准备好;
- 执行命令设置测试环境。
触发
触发主要用于产生测试所需的数据,这里展示以http方式产生数据的配置示例:
trigger action http # The action of the trigger. support HTTP invoke. interval 3s # Trigger the action every 3 seconds. times 5 # The retry count before the request success. url http //apache.skywalking.com/ # Http trigger url link. method GET # Http trigger method. headers "Content-Type""application/json" "Authorization""Basic whatever" body'{"k1":"v1", "k2":"v2"}'
验证
验证阶段对测试数据根据预先设定的规则进行验证,其典型配置示例如下:
verify retry# verify with retry strategy count 10 # max retry count interval 10s # the interval between two attempts, e.g. 10s, 1m. cases# verify test casesactual path/to/actual.yaml # verify by actual file path expected path/to/expected.yaml # excepted content file pathquery echo 'foo'# verify by command execute output expected path/to/expected.yaml # excepted content file pathincludes# including cases# cases file path path/to/cases.yaml
其中,retry配置项规定了验证失败后的行为,例如,在上文中,如果验证失败,则会间隔10s进行第二次验证,如果第十次尝试依然失败,则认为验证失败。
cases配置项提供了两种验证方式:
- 待验证数据写入文件,然后与预期数据文件进行匹配验证;
- 通过命令来获取待验证的数据,然后与预期数据文件进行匹配验证。
其中,预期数据文件通过Go template来编写预期输出模版。
清理
清理阶段的典型配置如下:
cleanup on always # Clean up strategy
清理选项包括:
- always:无论执行结果是成功还是失败,都会进行清理;
- success: 仅当执行成功时进行清理;
- failure: 仅当执行失败时进行清理;
- never: 永远不要清理环境。
小结
本文总体偏向于对单元、集成等基础性测试的讨论,对于系统性测试(代码安全扫描、性能内存测试等),由于主流端采集Agent对于这方面的支持也基本缺失,因此不在本文的讨论范围之列。
总体来说,现有主流端采集Agent并没有提供太多辅助测试的工具,更多停留在单元测试或集成测试阶段,对输出插件的测试能力提供不足。相比之下,Skywalking提供了一个较为通用完整的测试体系,能够满足绝大多数的infra测试场景,但是它没有考虑到端采集Agent这种infra中存在的输出模块测试场景,因此直接使用现成的测试引擎依然无法快速测试。