Kafka端到端审计

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 概述Kafka端到端审计是指生产者生产的消息存入至broker,以及消费者从broker中消费消息这个过程之间消息个数及延迟的审计,以此可以检测是否有数据丢失,是否有数据重复以及端到端的延迟等。

概述

Kafka端到端审计是指生产者生产的消息存入至broker,以及消费者从broker中消费消息这个过程之间消息个数及延迟的审计,以此可以检测是否有数据丢失,是否有数据重复以及端到端的延迟等。
目前主要调研了3个产品:

  1. Chaperone (Uber)
  2. Confluent Control Center(非开源,收费)
  3. Kafka Monitor (LinkedIn)

对于Kafka端到端的审计主要通过:

  1. 消息payload中内嵌时间戳timestamp
  2. 消息payload中内嵌全局index
  3. 消息payload中内嵌timestamp和index

内嵌timestamp的方式
主要是通过设置一个审计的时间间隔(这里称之为time_bucket_interval,可以设置几秒或者几分钟,这个可以自定义), 每个timestamp都会被分配到相应的桶中,算法有:

  1. timestamp - timestamp%time_bucket_interval
  2. floor((timestamp /15)*15)

这样可以获得相应time_bucket的起始时间time_bucket_start,一个time_bucket的区间可以记录为[time_bucket_start, time_bucket_start+time_bucket_interval]。

每发送或者消费一条消息可以根据消息payload内嵌的时间戳,分配到相应桶中,然后对桶进行计数,之后进行存储,简单的可以存储到,比如:Map<long time_bucket_start, long count>之中。

内嵌index的方式
这种方式就更容易理解了,对于每条消息都分配一个全局唯一的index,如果topic及相应的partition固定的话,可以为每一个topic-partition设置一个全局的index,当有消息发送到某个topic-partition中,那么首先获取其topic-partition对应的index, 然后内嵌到payload中,之后再发送到broker。消费者进行消费审计,可以判断出哪个消息丢失,哪个消息重复等等。如果要计算端到端延迟的话,还需要在payload中内嵌timestamp以作相应的计算。

下面来简要分析下三个产品。


Chaperone

github地址:https://github.com/uber/chaperone
官方介绍(中文):http://www.infoq.com/cn/news/2016/12/Uber-Chaperone-Kafka
官方介绍(英文):https://eng.uber.com/chaperone/

Chaperone进行消息端到端的校验主要是基于message内置timestamp实现的,根据timestamp将message分配到不同的bucket中。之后就是对这个bucket中的消息进行计数等一系列的audit操作,然后将这个audit操作之后的信息auditMessage保存起来,auditMessage的内容:

  • topicName:被audit的topic
  • time_bucket_start:bucket的起始时间
  • time_bucket_end
  • metrics_count:time_bucket中的个数
  • metrics_mean_latency, metrics_p95_latency, metrics_p99_latency,metrics_max_latency:延迟
  • tier
  • hostname
  • datacenter
  • uuid

注意这里的latency的计算规则是:currentTimeMillis - (timestamp*1000)。

Chaperone的架构

这里写图片描述

Chaperone的整体架构分为:AuditLibrary, ChaperoneService, ChaperoneCollector和WebService, 它们会收集数据,并进行相关计算,自动检测出丢失和延迟的数据,并展示审计结果。

从Chaperone的github上的源码来看:
Chaperone分为ChaperoneClient, ChaperoneCollector, ChaperoneDistribution, ChaperoneServiceController, ChaperoneServiceWorker这5个子项目。对比着上面的架构图来分析。

  • ChaperoneClient对应着AuditLibrary,主要是用来audit message的库(library),并不以实际服务运行,可以在Producer或者Consumer客户端中调用,默认使用10mins的滚动时间bucket来不断地从每个主题收集消息。然后发送到kafka的chaperone-audit这个topic中。官方文档介绍说AuditLibrary会被ChaperoneService, ChaperoneCollector和WebService这三个组件所依赖,但代码中来看并非完全如此,略有出入。
  • ChaperoneDistribution可以忽略
  • ChaperoneServiceController和ChaperoneServiceWorker对应架构图中的ChaperoneService,ChaperoneServiceController主要用来检测topics并分配topic-partitions给ChaperoneServiceWorker用以审计(audit)。ChaperoneServiceWorker主要是audit message的一个服务。
    • ChaperoneServiceWorker采用scala语言编写,内部又将ChaperoneClient或者说AuditLibrary又重新用Scala实现了一番,并丰富了一下应用,比如采用hsqldb存储数据,zk存取offsets来实现WAL(预写式日志,具体可见下段介绍)
    • Chaperone认为message中内嵌timestamp是十分必须的,但是从ChaperoneServiceWorker的代码来看消息没有timestamp也能运行,当消息没有时间戳,那么会记录noTimeMsgCount,Chaperone介绍会有一个牛逼的算法来分析消息中的timestamp(其实就是读取消息的开头部分,而不是全部整条消息,类似报文截断解析,下面也有涉及介绍),如果解析timestamp失败,会记录malformedMsgCount。
  • ChaperoneCollector对是用来读取audit的数据,然后持久化操作,默认存入mysql中,看代码也可选存入redis中。
  • 源码中没有WebService这个东西,估计是uber内部的web系统,读取下mysql中的内容展示到页面而已。

如果程序段内嵌Audit Library(ChaperoneClient),那么整个audit过程如下:

这里写图片描述

如果producer端或者consumer端需要进行消息审计,那么可以内嵌Audit Library。就以发送端为例,消息先发送到kafka中,然后对这条消息进行审计,将审计的信息存入到kafka中,之后有专门的ChaperoneServiceCollector进行数据消费,之后存入mysql中,也可以选择存入redis中。页面系统webService可以查询mysql(redis)中的数据,之后进而在页面中展示。

如果使用ChanperoneServiceWork,整个流转过程如下:

这里写图片描述

上面是对broker端进行审计的过程。首先从存储消息的kafka(图中上面的kafka)中消费数据,之后对收到的消息进行审计操作,之后将审计消息auditmsg以及相应的offset存储起来(auditmsg存入hsqldb中,offset存到用来存储审计数据的kafka的zk之中),之后再将审计消息auditmsg存入kafka(图中下面的kafka)中,最后成功存储并返回给消费端(Consumer1,即ChaperoneServiceWork),之后再把hsqldb中的auditmsg标记为已统计。之后ChaperoneServiceCollector和producer端(consumer端)内嵌Audit Library时相同。

官方文档部分介绍如下:
每个消息只被审计一次
为了确保每个消息只被审计一次,ChaperoneService使用了预写式日志(WAL)。ChaperoneService每次在触发Kafka审计消息时,会往审计消息里添加一个UUID。这个带有相关偏移量的消息在发送到Kafka之前被保存在WAL里。在得到Kafka的确认之后,WAL里的消息被标记为已完成。如果ChaperoneService崩溃,在重启后它可以重新发送WAL里未被标记的审计消息,并定位到最近一次的审计偏移量,然后继续消费。WAL确保了每个Kafka消息只被审计一次,而且每个审计消息至少会被发送一次。
接下来,ChaperoneCollector使用ChaperoneService之前添加过的UUID来移除重复消息。有了UUID和WAL,我们可以确保审计的一次性。在代理客户端和服务器端难以实现一次性保证,因为这样会给它们带来额外的开销。我们依赖它们的优雅关闭操作,这样它们的状态才会被冲刷出去。

在层间使用一致性的时间戳
因为Chaperone可以在多个层里看到相同的Kafka消息,所以为消息内嵌时间戳是很有必要的。如果没有这些时间戳,在计数时会发生时间错位。在Uber,大部分发送到Kafka的数据要么使用avro风格的schema编码,要么使用JSON格式。对于使用schema编码的消息,可以直接获取时间戳。而对于JSON格式的消息,需要对JSON数据进行解码才能拿到时间戳。为了加快这个过程,我们实现了一个基于流的JSON消息解析器,这个解析器无需预先解码整个消息就可以扫描到时间戳。这个解析器用在ChaperoneService里是很高效的,不过对代理客户端和服务器来说仍然需要付出很高代价。所以在这两个层里,我们使用的是消息的处理时间戳。因为时间戳的不一致造成的层间计数差异可能会触发错误的数据丢失警告。我们正在着手解决时间戳不一致问题,之后也会把解决方案公布出来。

温馨提示: github上的quickstart中,如果不能根据脚本自动安装kafka和zk,而是自己安装kafka和zk的话,需要改动脚本、配置文件甚至源码才能将服务运行起来。另外需要安装hsqldb和mysql(redis)。


Confluent Control Center

文档地址:http://docs.confluent.io/3.0.0/control-center/docs/index.html

这是个收费产品,文档中介绍的并不多。和Chaperone相同,主要也是根据消息payload内嵌timestamp来实现,计算time_bucket的算法是:floor((timestamp /15)*15)。
架构图如下:

这里写图片描述

主要是在producer端或者consumer端内嵌审计程序(相当于Chaperone的Audit Library)继续审计,最终将审计消息同样存入kafka中,最后的web系统是直接消费kafka中的审计消息进行内容呈现。

web系统部分呈现如下:
这里写图片描述


Kafka Monitor

github地址:https://github.com/linkedin/kafka-monitor

Kafka Monitor是基于在消息payload内嵌index和timestamp来实现审计:消息丢失,消息重复以及端到端延迟等。

这里写图片描述

web系统部分呈现如下:

这里写图片描述

几种典型的metrics解释:

name description
produce-avaliablility-avg The average produce availability
consume-avaliability-avg The average consume availability
records-produced-total The total number of records that are produced
records-consumed-total The total number of records that are consumed
records-lost-total The total number of records that are lost
records-duplicated-total The total number of records that are duplicated
records-delay-ms-avg The average latency of records from producer to consumer
records-produced-rate The average number of records per second that are produced
produce-error-rate The average number of errors per second
consume-error-rate The average number of errors per second
records-delay-ms-99th The 99th percentile latency of records from producer to consu
records-delay-ms-999th The 999th percentile latency of records from producer to consumer
records-delay-ms-max The maximum latency of records from producer to consumer
目录
相关文章
|
消息中间件 缓存 负载均衡
【日志架构】ELK Stack + Kafka 端到端练习
【日志架构】ELK Stack + Kafka 端到端练习
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
158 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
71 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
402 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
91 3
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
223 0
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
65 3

热门文章

最新文章