【Kafka】(十五)流式计算 Kafka Streams 架构深入2

简介: 【Kafka】(十五)流式计算 Kafka Streams 架构深入2

Kafka Stream如何解决流式系统中关键问题


时间


在流式数据处理中,时间是数据的一个非常重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增加了timestamp属性。目前Kafka Stream支持三种时间


事件发生时间。事件发生的时间,包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定。并且需要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)才能生效。


消息接收时间,也即消息存入Broker的时间。当Broker或Topic将message.timestamp.type设置为LogAppendTime时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp属性值设置为当前机器时间。一般消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。


消息处理时间,也即Kafka Stream处理消息时的时间。


注:Kafka Stream允许通过实现org.apache.kafka.streams.processor.TimestampExtractor接口自定义记录时间。


窗口


前文提到,流式数据是在时间上无界的数据。而聚合操作只能作用在特定的数据集,也即有界的数据集上。因此需要通过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种非常常用的设定计算边界的方式。不同的流式处理系统支持的窗口类似,但不尽相同。


Kafka Stream支持的窗口如下。


1.Hopping Time Window 该窗口定义如下图所示。它有两个属性,一个是Window size,一个是Advance interval。Window size指定了窗口的大小,也即每次计算的数据集的大小。而Advance interval定义输出的时间间隔。一个典型的应用场景是,每隔5秒钟输出一次过去1个小时内网站的PV或者UV。


2.Tumbling Time Window该窗口定义如下图所示。可以认为它是Hopping Time Window的一种特例,也即Window size和Advance interval相等。它的特点是各个Window之间完全不相交。


3.Sliding Window该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。


4.Session Window该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。


Join


Kafka Stream由于包含KStream和Ktable两种数据集,因此提供如下Join计算


KTable Join KTable 结果仍为KTable。任意一边有更新,结果KTable都会更新。


KStream Join KStream 结果为KStream。必须带窗口操作,否则会造成Join操作一直不结束。


KStream Join KTable / GlobakKTable 结果为KStream。只有当KStream中有新数据时,才会触发Join计算并输出结果。KStream无新数据时,KTable的更新并不会触发Join计算,也不会输出数据。并且该更新只对下次Join生效。一个典型的使用场景是,KStream中的订单信息与KTable中的用户信息做关联计算。


对于Join操作,如果要得到正确的计算结果,需要保证参与Join的KTable或KStream中Key相同的数据被分配到同一个Task。具体方法是


参与Join的KTable或KStream的Key类型相同(实际上,业务含意也应该相同)


参与Join的KTable或KStream对应的Topic的Partition数相同


Partitioner策略的最终结果等效(实现不需要完全一样,只要效果一样即可),也即Key相同的情况下,被分配到ID相同的Partition内


如果上述条件不满足,可通过调用如下方法使得它满足上述条件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)


聚合与乱序处理


聚合操作可应用于KStream和KTable。当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集。


需要说明的是,聚合操作的结果肯定是KTable。因为KTable是可更新的,可以在晚到的数据到来时(也即发生数据乱序时)更新结果KTable。


这里举例说明。假设对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操作。并且KStream先后出现时间为1秒, 3秒, 5秒的数据,此时5秒的窗口已达上限,Kafka Stream关闭该窗口,触发Count操作并将结果3输出到KTable中(假设该结果表示为<1-5,3>)。若1秒后,又收到了时间为2秒的记录,由于1-5秒的窗口已关闭,若直接抛弃该数据,则可认为之前的结果<1-5,3>不准确。而如果直接将完整的结果<1-5,4>输出到KStream中,则KStream中将会包含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。因此Kafka Stream选择将聚合结果存于KTable中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可得到完整的正确的结果。


这种方式保证了数据准确性,同时也提高了容错性。


但需要说明的是,Kafka Stream并不会对所有晚到的数据都重新计算并更新结果集,而是让用户设置一个retention period,将每个窗口的结果集在内存中保留一定时间,该窗口内的数据晚到时,直接合并计算,并更新结果KTable。超过retention period后,该窗口结果将从内存中删除,并且晚到的数据即使落入窗口,也会被直接丢弃。


容错


Kafka Stream从如下几个方面进行容错


高可用的Partition保证无数据丢失。每个Task计算一个Partition,而Kafka数据复制机制保证了Partition内数据的高可用性,故无数据丢失风险。同时由于数据是持久化的,即使任务失败,依然可以重新计算。


状态存储实现快速故障恢复和从故障点继续处理。对于Join和聚合及窗口等有状态计算,状态存储可保存中间状态。即使发生Failover或Consumer Rebalance,仍然可以通过状态存储恢复中间状态,从而可以继续从Failover或Consumer Rebalance前的点继续计算。


KTable与retention period提供了对乱序数据的处理能力。


Kafka Stream应用示例


下面结合一个案例来讲解如何开发Kafka Stream应用。本例完整代码可从作者Github获取。


订单KStream(名为orderStream),底层Topic的Partition数为3,Key为用户名,Value包含用户名,商品名,订单时间,数量。用户KTable(名为userTable),底层Topic的Partition数为3,Key为用户名,Value包含性别,地址和年龄。商品KTable(名为itemTable),底层Topic的Partition数为6,Key为商品名,价格,种类和产地。现在希望计算每小时购买产地与自己所在地相同的用户总数。


首先由于希望使用订单时间,而它包含在orderStream的Value中,需要通过提供一个实现TimestampExtractor接口的类从orderStream对应的Topic中抽取出订单时间。

public class OrderTimestampExtractor implements TimestampExtractor {
  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}


接着通过将orderStream与userTable进行Join,来获取订单用户所在地。由于二者对应的Topic的Partition数相同,且Key都为用户名,再假设Producer往这两个Topic写数据时所用的Partitioner实现相同,则此时上文所述Join条件满足,可直接进行Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 该lamda表达式定义了如何从orderStream与userTable生成结果集的Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 结果集Key序列化方式
        Serdes.String(),
         // 结果集Value序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)


从上述代码中,可以看到,Join时需要指定如何从参与Join双方的记录生成结果记录的Value。Key不需要指定,因为结果记录的Key与Join Key相同,故无须指定。Join结果存于名为orderUserStream的KStream中。


接下来需要将orderUserStream与itemTable进行Join,从而获取商品产地。此时orderUserStream的Key仍为用户名,而itemTable对应的Topic的Key为产品名,并且二者的Partition数不一样,因此无法直接Join。此时需要通过through方法,对其中一方或双方进行重新分区,使得二者满足Join条件。这一过程相当于Spark的Shuffle过程和Storm的FieldGrouping。

orderUserStrea
    .through(
        // Key的序列化方式
        Serdes.String(),
        // Value的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 重新按照商品名进行分区,具体取商品名的哈希值,然后对分区数取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))


从上述代码可见,through时需要指定Key的序列化器,Value的序列化器,以及分区方式和结果集所在的Topic。这里要注意,该Topic(orderuser-repartition-by-item)的Partition数必须与itemTable对应Topic的Partition数相同,并且through使用的分区方法必须与iteamTable对应Topic的分区方式一样。经过这种through操作,orderUserStream与itemTable满足了Join条件,可直接进行Join。


总结


Kafka Stream的并行模型完全基于Kafka的分区机制和Rebalance机制,实现了在线动态调整并行度


同一Task包含了一个子Topology的所有Processor,使得所有处理逻辑都在同一线程内完成,避免了不必的网络通信开销,从而提高了效率。


through方法提供了类似Spark的Shuffle机制,为使用不同分区策略的数据提供了Join的可能


log compact提高了基于Kafka的state store的加载效率


state store为状态计算提供了可能


基于offset的计算进度管理以及基于state store的中间状态管理为发生Consumer rebalance或Failover时从断点处继续处理提供了可能,并为系统容错性提供了保障


KTable的引入,使得聚合计算拥用了处理乱序问题的能力

目录
相关文章
|
5月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
298 7
|
8月前
|
消息中间件 数据可视化 Kafka
docker arm架构部署kafka要点
本内容介绍了基于 Docker 的容器化解决方案,包含以下部分: 1. **Docker 容器管理**:通过 Portainer 可视化管理工具实现对主节点和代理节点的统一管理。 2. **Kafka 可视化工具**:部署 Kafka-UI 以图形化方式监控和管理 Kafka 集群,支持动态配置功能, 3. **Kafka 安装与配置**:基于 Bitnami Kafka 镜像,提供完整的 Kafka 集群配置示例,涵盖 KRaft 模式、性能调优参数及数据持久化设置,适用于高可用生产环境。 以上方案适合 ARM64 架构,为用户提供了一站式的容器化管理和消息队列解决方案。
754 10
|
7月前
|
消息中间件 存储 大数据
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
|
消息中间件 缓存 架构师
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
Kafka 是一个高吞吐量、高性能的消息中间件,关于 Kafka 高性能背后的实现,是大厂面试高频问题。本篇全面详解 Kafka 高性能背后的实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
关于 Kafka 高性能架构,这篇说得最全面,建议收藏!
|
12月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
507 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
395 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1386 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。