主流端采集Agent的测试体系概览

简介: 本文主要介绍几个主流端采集Agent的集成测试方法和体系,并总结其优劣点.

端采集Agent主要负责采集各类数据,并在对数据进行一些预处理后发送至中心化存储单元做进一步处理。目前,主流端采集Agent都支持采集多种数据源,并提供多种输出渠道选项。除此之外,对于非主打轻量的Agent,其还会提供大量的数据处理模块以提升数据表达能力。现在市场上的端采集系统基本采用如下流式架构,其中处理模块根据不同Agent的特性可进一步拆分为多个子模块:

端采集的各个功能模块通常以插件系统的形式呈现,从而呈现较强的可扩展性,使得开发者无需过多关注Agent的整体上下文即可快速开发插件。在此背景下,如何方便开发者快速验证所开发插件的功能便显得至关重要。对于中间的处理模块,一般通过单元测试即可完成功能验证;而对于输入及输出模块,由于存在外部依赖,单纯的单元测试难以真正验证模块的功能,因此往往需要借助集成测试构造环境才能完成功能验证。下面将介绍几个主流端采集Agent的测试体系,其中既有以单元测试作主体,也有以集成测试为主体。

Fluentd

Fluentd 是一个针对日志的收集、处理及转发系统,其主要架构如下图所示,其中每一个模块都是一个可扩展的插件系统:

Fluentd利用test-unit为开发者提供了一个较为简易轻便的testing driver,用于模拟Fluentd的整个运行过程,其测试体系接近于集成测试的范畴。具体来说,这个driver主要向用户暴露了如下两个功能:

  • 模拟输入:用户可以通过driver.feed函数向Fluentd模拟发送数据;
  • 模拟路由:为用户提供一个默认的输出,用户可以通过driver.events来获取输出。

用户可以在测试文件中生成这个driver的实例,从而利用其完成测试,推荐的插件文件组织如下:

.

├── Gemfile

├── LICENSE

├── README.md

├── Rakefile

├── fluent-plugin-<your_fluentd_plugin_name>.gemspec

├── lib

│   └── fluent

│       └── plugin

│           └── <plugin_type>_<your_fluentd_plugin_name>.rb

└── test

   ├── helper.rb

   └── plugin

       └── test_<plugin_type>_<your_fluentd_plugin_name>.rb

输入模块

输入模块的测试需要外部输入的依赖,Fluentd并没有提供额外的机制来帮助用户建立这种依赖,用户需要在测试时自行搭建输入环境,并利用Fluentd采集数据或向其发送数据。在测试文件中,用户可以通过模拟路由获得输入模块接收到的数据,并进行验证,典型的测试文件如下所示:

require'fluent/test'require'fluent/test/driver/input'require'fluent/test/helpers'require'fluent/plugin/input_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {})
Fluent::Test::Driver::Input.new(Fluent::Plugin::MyInput).configure(conf)
endtest'emit'dod=create_driver(config)
d.run(timeout:0.5)
d.events.eachdo |tag, time, record|
assert_equal('input.test', tag)
assert_equal({ 'foo'=>'bar' }, record)
assert(time.is_a?(Fluent::EventTime))
endendend

处理模块

处理模块的测试相对比较简单,既可以借助testing driver以集成测试的方式来进行验证(如Filter),也可通过单纯的单元测试来进行验证(如Parser)。对于集成测试,用户只需通过模拟输入向Fluentd输入数据,然后借助模拟路由来获得输出并验证,典型的测试文件如下所示:

require'fluent/test'require'fluent/test/driver/filter'require'fluent/test/helpers'require'fluent/plugin/filter_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {})
Fluent::Test::Driver::Filter.new(Fluent::Plugin::MyFilter).configure(conf)
endtest'filter'dod=create_driver(config)
time=event_timed.rundod.feed('filter.test', time, { 'foo'=>'bar', 'message'=>msg })
endassert_equal(1, d.filtered_records.size)
endend

输出模块

理论上,输出模块的测试也需要外部输入的依赖,但是fluentd建议对输出模块只需进行format功能的验证即可。具体来说,用户可以通过模拟输入向Fluentd发送数据,然后借助模拟路由来验证输出模块format函数是否运行正常,典型的测试文件如下所示:

require'fluent/test'require'fluent/test/driver/output'require'fluent/test/helpers'require'fluent/plugin/output_my'classMyInputTest<Test::Unit::TestCaseincludeFluent::Test::HelperssetupdoFluent::Test.setupenddefcreate_driver(conf= {})
Fluent::Test::Driver::Output.new(Fluent::Plugin::MyOutput).configure(conf)
endtest'emit'dod=create_driver(config)
time=event_timed.rundod.feed('output.test', time, { 'foo'=>'bar', 'message'=>msg })
endassert_equal(1, d.events.size)
endend

如果要通过集成测试对输出模块进行验证,用户就必须在测试文件中自行编写从输出模块中拉取数据的功能,Kafka提供了一个样例。

Beats

Beats作为 Elastic技术栈中的采集端,是一个免费且开放的数据采集器,涵盖了包括日志采集(Filebeat)、指标采集(Metricbeat)、审计日志采集(Auditbeat)等在内的多类型数据采集。

Beats并没有提供类似于Fluentd一样的testing driver,对于输入和输出模块,它虽然支持集成测试,但还是以go单元测试的形式展现的。具体来说,对于输出插件,Beats建议在普通单元测试的基础上,额外提供以_integration_test.go结尾的测试文件,用于集成测试。但与Fluentd不同的是,Beats支持将外部依赖通过docker-compose来进行本地部署,从而方便了测试环境的搭建。

输入模块

输入模块的测试需要外部输入的依赖,用户可以借助docker- compose运行一个数据源容器和Beats容器,并指定二者之间的交互行为。在测试文件中,用户只需要只需以单元测试的形式编写测试用例,直接从函数返回值中获得模块的输出,从而进行验证。

输出模块

同输入模块一样,输出模块的测试也需要依赖外部环境,用户依然可以借助docker- compose运行一个输出目标容器和Beats容器,然后从Beats容器向目标容器发送数据。为了验证输出模块的正确性,还需要拉取已写入目标容器的数据,通过与写入前数据的对比来验证。以下是Kafka输出插件集成测试的主要框架,原始代码详见链接

funcTestKafkaPublish(t*testing.T) {
// 初始化工作...fori, test :=rangetests {
// 测试用例初始化...t.Run(name, func(t*testing.T) {
grp, err :=makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg)
output :=grp.Clients[0].(*client)
deferoutput.Close()
// publish test eventsvarwgsync.WaitGroupfori :=rangetest.events {
batch :=outest.NewBatch(test.events[i].events...)
batch.OnSignal=func(_outest.BatchSignal) {
wg.Done()
                }
wg.Add(1)
output.Publish(batch)
            }
// wait for all published batches to be ACKedwg.Wait()
expected :=flatten(test.events)
// check we can find all event in topictimeout :=20*time.Secondstored :=testReadFromKafkaTopic(t, test.topic, len(expected), timeout)
// validate messagesiflen(expected) !=len(stored) {
assert.Equal(t, len(stored), len(expected))
return            }
validate :=validateJSONiffmt, exists :=test.config["codec.format.string"]; exists {
validate=makeValidateFmtStr(fmt.(string))
            }
seenMsgs :=map[string]struct{}{}
for_, s :=rangestored {
msg :=validate(t, s.Value, expected)
seenMsgs[msg] =struct{}{}
            }
assert.Equal(t, len(expected), len(seenMsgs))
        })
    }
}

Telegraf

Telegraf的测试体系与Beats大体类似,虽然也支持集成测试,但还是以go单元测试的形式展现的。它借助于testcontainer这个包来提供外部环境的本地化部署,从而方便开发者进行本地测试。然而,与Beats不同的是,它并没有规定额外的集成测试文件,而是选择将集成测试和单元测试放置在一个测试文件中。对于输出模块,用户可以选择是否进行集成测试,如果仅需测试输出是否成功,则不用额外编写拉取数据的函数,则形式基本同单元测试;否则的话,用户需要自己编写拉取输出目标数据的功能,从而真正实现集成测试。有关Kafka输出插件的测试文件如链接所示。

Skywalking

Skywalking并不是严格意义上的端采集Agent,它是一个现代化的应用程序性能监控系统,尤其专为云原生、基于容器的分布式系统设计。尽管不是单纯的端采集Agent,Skywalking却提供了一个相对完整方便的端到端的infra测试体系,因此值得在这里进行探讨。

总体来说,Skywalking的测试引擎由一个控制器(controller)来负责调控,整个测试流程分为以下几个阶段:

  1. 初始化(Setup):构建E2E测试所需的环境依赖,如数据库、后端服务、API等。目前测试引擎提供两种初始化方式,即通过docker-compose或kinD来构建测试环境;
  2. 触发(Triger):为测试环境提供数据,包括向指定API发送请求和执行本地命令等;
  3. 验证(Verify):验证处理后的数据是否符合预期;
  4. 清理(Clenup):结束测试并清理环境。

测试引擎在启动时会读取yaml配置文件,从而决定后续相应行为,配置文件框架如下:

setup:# set up the environmentcleanup:# clean up the environmenttrigger:# generate trafficverify:# test cases

初始化

这里以Docker compose为例介绍初始化的相关配置,典型示例如下:

setup:  env: compose
  file: path/to/compose.yaml            # Specified docker-compose file path  timeout: 20m                          # Timeout duration  init-system-environment: path/to/env  # Import environment file  steps:                                # Customize steps for prepare the environment    - name: customize setups            # Step name      command: command lines            # Use command line to setup 

利用Docker compose进行环境初始化遵循以下步骤:

  1. 导入init-system-environment文件以帮助构建服务并执行步骤。文件内容的每一行都是一个环境变量,键值用“=”隔开;
  2. 启动docker-compose服务;
  3. 检查服务的健康状况;
  4. 等待一定时间直到所有的服务都已准备好;
  5. 执行命令设置测试环境。

触发

触发主要用于产生测试所需的数据,这里展示以http方式产生数据的配置示例:

trigger:  action: http      # The action of the trigger. support HTTP invoke.  interval: 3s      # Trigger the action every 3 seconds.  times: 5          # The retry count before the request success.  url: http://apache.skywalking.com/ # Http trigger url link.  method: GET       # Http trigger method.  headers:    "Content-Type": "application/json"    "Authorization": "Basic whatever"  body: '{"k1":"v1", "k2":"v2"}'

验证

验证阶段对测试数据根据预先设定的规则进行验证,其典型配置示例如下:

verify:  retry:            # verify with retry strategy    count: 10       # max retry count    interval: 10s   # the interval between two attempts, e.g. 10s, 1m.  cases:            # verify test cases    - actual: path/to/actual.yaml       # verify by actual file path      expected: path/to/expected.yaml   # excepted content file path    - query: echo 'foo'# verify by command execute output      expected: path/to/expected.yaml   # excepted content file path    - includes:      # including cases        - path/to/cases.yaml            # cases file path

其中,retry配置项规定了验证失败后的行为,例如,在上文中,如果验证失败,则会间隔10s进行第二次验证,如果第十次尝试依然失败,则认为验证失败。

cases配置项提供了两种验证方式:

  • 待验证数据写入文件,然后与预期数据文件进行匹配验证;
  • 通过命令来获取待验证的数据,然后与预期数据文件进行匹配验证。

其中,预期数据文件通过Go template来编写预期输出模版。

清理

清理阶段的典型配置如下:

cleanup:   on: always     # Clean up strategy

清理选项包括:

  • always:无论执行结果是成功还是失败,都会进行清理;
  • success: 仅当执行成功时进行清理;
  • failure: 仅当执行失败时进行清理;
  • never: 永远不要清理环境。

小结

本文总体偏向于对单元、集成等基础性测试的讨论,对于系统性测试(代码安全扫描、性能内存测试等),由于主流端采集Agent对于这方面的支持也基本缺失,因此不在本文的讨论范围之列。

总体来说,现有主流端采集Agent并没有提供太多辅助测试的工具,更多停留在单元测试或集成测试阶段,对输出插件的测试能力提供不足。相比之下,Skywalking提供了一个较为通用完整的测试体系,能够满足绝大多数的infra测试场景,但是它没有考虑到端采集Agent这种infra中存在的输出模块测试场景,因此直接使用现成的测试引擎依然无法快速测试。

相关文章
|
16天前
|
人工智能 测试技术 Windows
Windows 竞技场:面向下一代AI Agent的测试集
【10月更文挑战第25天】随着人工智能的发展,大型语言模型(LLMs)在多模态任务中展现出巨大潜力。为解决传统基准测试的局限性,研究人员提出了Windows Agent Arena,一个在真实Windows操作系统中评估AI代理性能的通用环境。该环境包含150多个多样化任务,支持快速并行化评估。研究团队还推出了多模态代理Navi,在Windows领域测试中成功率达到19.5%。尽管存在局限性,Windows Agent Arena仍为AI代理的评估和研究提供了新机遇。
37 3
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
6月前
|
数据挖掘
工程监测仪器振弦采集仪详细的稳定性测试
工程监测仪器振弦采集仪详细的稳定性测试 振弦采集仪是一种常用于工程监测的仪器,用于测量结构的振动和变形。稳定性测试是评估采集仪的测量稳定性和精度的一种方法,可以确保采集仪在长时间使用中的准确性和可靠性。
工程监测仪器振弦采集仪详细的稳定性测试
|
6月前
岩土工程监测振弦采集仪广泛应用于岩土工程中的土体动力特性的测试和分析
岩土工程监测振弦采集仪广泛应用于岩土工程中的土体动力特性的测试和分析。以下是一些岩土工程监测振弦采集仪的应用案例:
|
传感器 数据采集 数据挖掘
无线振弦采集仪高低温试验箱测试原理
无线振弦采集仪是一种用来测量结构物动力学特性的仪器,它可以通过振弦传感器采集到结构物的振动信号,并通过数据分析,得到结构物的自然频率、阻尼比、振型等信息。为了确保无线振弦采集仪的准确性和可靠性,需要进行高低温试验,以验证它在各种环境下的性能。
无线振弦采集仪高低温试验箱测试原理
|
传感器 数据采集
工程监测仪器无线振弦采集仪高低温试验箱测试原理
无线振弦采集仪和高低温试验箱是两个独立的设备,但是它们可以结合使用来进行振弦测试。高低温试验箱是一种可以控制温度和湿度的设备,用于模拟各种环境下的物体性能变化,包括机械性能、电气性能等。无线振弦采集仪则是一种用于测量物体振动状态的设备,通常包括振动传感器和数据采集模块。
工程监测仪器无线振弦采集仪高低温试验箱测试原理
|
6月前
|
监控 Java
Pinpoint【部署 02】Pinpoint Agent 安装启动及监控 SpringBoot 项目案例分享(添加快速测试math-game.jar包)
Pinpoint【部署 02】Pinpoint Agent 安装启动及监控 SpringBoot 项目案例分享(添加快速测试math-game.jar包)
238 0
|
Java 测试技术 Android开发
Sonic 开源移动端云真机测试平台 - 设备中心接入安卓设备实例演示,Agent端服务部署过程详解(下)
Sonic 开源移动端云真机测试平台 - 设备中心接入安卓设备实例演示,Agent端服务部署过程详解
420 0
|
Web App开发 JavaScript Java
Sonic 开源移动端云真机测试平台 - 设备中心接入安卓设备实例演示,Agent端服务部署过程详解(上)
Sonic 开源移动端云真机测试平台 - 设备中心接入安卓设备实例演示,Agent端服务部署过程详解
501 0
|
6天前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
34 3

热门文章

最新文章