【Kafka】(十五)流式计算 Kafka Streams 架构深入1

简介: 【Kafka】(十五)流式计算 Kafka Streams 架构深入1

Kafka Streams背景


Kafka Streams是什么


Kafka Streams是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。


Kafka Streams的特点如下:


Kafka Streams提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署


除了Kafka外,无任何外部依赖


充分利用Kafka分区机制实现水平扩展和顺序性保证


通过可容错的state store实现高效的状态操作(如windowed join和aggregation)


支持正好一次处理语义


提供记录级的处理能力,从而实现毫秒级的低延迟


支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)


同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)


什么是流式计算


一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。


批量处理模型中,一般先有全量数据集,然后定义计算逻辑,并将计算应用于全量数据。特点是全量计算,并且计算结果一次性全量输出。


为什么要有Kafka Streams


当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。


既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Streams呢?笔者认为主要有如下原因。


第一,Spark和Storm都是流式处理框架,而Kafka Streams提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Streams作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。


(点击放大图像)


第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Streams作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。更为重要的是,Kafka Streams充分利用了Kafka的分区机制和Consumer的Rebalance机制,使得Kafka Stream可以非常方便的水平扩展,并且各个实例可以使用不同的部署方式。具体来说,每个运行Kafka Streams的应用程序实例都包含了Kafka Consumer实例,多个同一应用的实例之间并行处理数据集。而不同实例之间的部署方式并不要求一致,比如部分实例可以运行在Web容器中,部分实例可运行在Docker或Kubernetes中。


第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Streams的成本非常低。


第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。


第五,由于Kafka本身提供数据持久化,因此Kafka Streams提供滚动部署和滚动升级以及重新计算的能力。


第六,由于Kafka Consumer Rebalance机制,Kafka Streams可以在线动态调整并行度。


Kafka Streams架构


Kafka Streams整体架构


Kafka Streams的整体架构图如下。


目前(Kafka 0.11.0.0)Kafka Streams的数据源只能如上图所示是Kafka。但是处理结果并不一定要如上图所示输出到Kafka。实际上KStream和Ktable的实例化都需要指定Topic。

KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");


另外,上图中的Consumer和Producer并不需要开发者在应用中显示实例化,而是由Kafka Stream根据参数隐式实例化和管理,从而降低了使用门槛。开发者只需要专注于开发核心业务逻辑,也即上图中Task内的部分。


Processor Topology


基于Kafka Streams的流式应用的业务逻辑全部通过一个被称为Processor Topology的地方执行。它与Storm的Topology和Spark的DAG类似,都定义了数据在各个处理单元(在Kafka Stream中被称作Processor)间的流动方式,或者说定义了数据的处理逻辑。


下面是一个Processor的示例,它实现了Word Count功能,并且每秒输出一次结果。

public class WordCountProcessor implements Processor<String, String> {
  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;
  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }
  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }
  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }
  @Override
  public void close() {
    this.kvStore.close();
  }
}


从上述代码中可见


process定义了对每条记录的处理逻辑,也印证了Kafka可具有记录级的数据处理能力。

context.scheduler定义了punctuate被执行的周期,从而提供了实现窗口操作的能力。

context.getStateStore提供的状态存储为有状态计算(如窗口,聚合)提供了可能。


Kafka Streams并行模型


Kafka Streams的并行模型中,最小粒度为Task,而每个Task包含一个特定子Topology的所有Processor。因此每个Task所执行的代码完全一样,唯一的不同在于所处理的数据集互补。这一点跟Storm的Topology完全不一样。Storm的Topology的每一个Task只包含一个Spout或Bolt的实例。因此Storm的一个Topology内的不同Task之间需要通过网络通信传递数据,而Kafka Stream的Task包含了完整的子Topology,所以Task之间不需要传递数据,也就不需要网络通信。这一点降低了系统复杂度,也提高了处理效率。


如果某个Streams的输入Topic有多个(比如2个Topic,1个Partition数为4,另一个Partition数为3),则总的Task数等于Partition数最多的那个Topic的Partition数(max(4,3)=4)。这是因为Kafka Streams使用了Consumer的Rebalance机制,每个Partition对应一个Task。


下图展示了在一个进程(Instance)中以2个Topic(Partition数均为4)为数据源的Kafka Streams应用的并行模型。从图中可以看到,由于Kafka Streams应用的默认线程数为1,所以4个Task全部在一个线程中运行。


为了充分利用多线程的优势,可以设置Kafka Stream的线程数。下图展示了线程数为2时的并行模型。


前文有提到,Kafka Streams可被嵌入任意Java应用(理论上基于JVM的应用都可以)中,下图展示了在同一台机器的不同进程中同时启动同一Kafka Streams应用时的并行模型。注意,这里要保证两个进程的StreamsConfig.APPLICATION_ID_CONFIG完全一样。因为Kafka Streams将APPLICATION_ID_CONFI作为隐式启动的Consumer的Group ID。只有保证APPLICATION_ID_CONFI相同,才能保证这两个进程的Consumer属于同一个Group,从而可以通过Consumer Rebalance机制拿到互补的数据集。


既然实现了多进程部署,可以以同样的方式实现多机器部署。该部署方式也要求所有进程的APPLICATION_ID_CONFIG完全一样。从图上也可以看到,每个实例中的线程数并不要求一样。但是无论如何部署,Task总数总会保证一致。


注意:Kafka Stream的并行模型,非常依赖于《Kafka设计解析(一)- Kafka背景及架构介绍》一文中介绍的Kafka分区机制和《Kafka设计解析(四)- Kafka Consumer设计解析》中介绍的Consumer的Rebalance机制。强烈建议不太熟悉这两种机制的朋友,先行阅读这两篇文章。


这里对比一下Kafka Stream的Processor Topology与Storm的Topology。


Storm的Topology由Spout和Bolt组成,Spout提供数据源,而Bolt提供计算和数据导出。Kafka Stream的Processor Topology完全由Processor组成,因为它的数据固定由Kafka的Topic提供。


Storm的不同Bolt运行在不同的Executor中,很可能位于不同的机器,需要通过网络通信传输数据。而Kafka Stream的Processor Topology的不同Processor完全运行于同一个Task中,也就完全处于同一个线程,无需网络通信。


Storm的Topology可以同时包含Shuffle部分和非Shuffle部分,并且往往一个Topology就是一个完整的应用。而Kafka Stream的一个物理Topology只包含非Shuffle部分,而Shuffle部分需要通过through操作显示完成,该操作将一个大的Topology分成了2个子Topology。


Storm的Topology内,不同Bolt/Spout的并行度可以不一样,而Kafka Stream的子Topology内,所有Processor的并行度完全一样。


Storm的一个Task只包含一个Spout或者Bolt的实例,而Kafka Stream的一个Task包含了一个子Topology的所有Processor。


KTable vs. KStream


KTable和KStream是Kafka Stream中非常重要的两个概念,它们是Kafka实现各种语义的基础。因此这里有必要分析下二者的区别。


KStream是一个数据流,可以认为所有记录都通过Insert only的方式插入进这个数据流里。而KTable代表一个完整的数据集,可以理解为数据库中的表。由于每条记录都是Key-Value对,这里可以将Key理解为数据库中的Primary Key,而Value可以理解为一行记录。可以认为KTable中的数据都是通过Update only的方式进入的。也就意味着,如果KTable对应的Topic中新进入的数据的Key已经存在,那么从KTable只会取出同一Key对应的最后一条数据,相当于新的数据更新了旧的数据。


以下图为例,假设有一个KStream和KTable,基于同一个Topic创建,并且该Topic中包含如下图所示5条数据。此时遍历KStream将得到与Topic内数据完全一样的所有5条数据,且顺序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将得到3条记录,每个Key对应最新的值,并且这三条数据之间的顺序与原来在Topic中的顺序保持一致。这一点与Kafka的日志compact相同。


此时如果对该KStream和KTable分别基于key做Group,对Value进行Sum,得到的结果将会不同。对KStream的计算结果是<Jack,4>,<Lily,7>,<Mike,4>。而对Ktable的计算结果是<Mike,4>,<Jack,3>,<Lily,5>。


State store


流式处理中,部分操作是无状态的,例如过滤操作(Kafka Stream DSL中用filer方法实现)。而部分操作是有状态的,需要记录中间状态,如Window操作和聚合计算。State store被用来存储中间状态。它可以是一个持久化的Key-Value存储,也可以是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储。


Topic中存储的数据记录本身是Key-Value形式的,同时Kafka的log compaction机制可对历史数据做compact操作,保留每个Key对应的最后一个Value,从而在保证Key不丢失的前提下,减少总数据量,从而提高查询效率。


构造KTable时,需要指定其state store name。默认情况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,实际就是遍历它对应的state store,或者说遍历Topic的所有key,并取每个Key最新值的过程。为了使得该过程更加高效,默认情况下会对该Topic进行compact操作。


另外,除了KTable,所有状态计算,都需要指定state store name,从而记录中间状态。



目录
相关文章
|
6天前
|
消息中间件 监控 Java
使用Kafka实现分布式事件驱动架构
使用Kafka实现分布式事件驱动架构
|
24天前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
38 2
|
5天前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
13天前
|
消息中间件 Java Kafka
Java中的流处理框架:Kafka Streams与Flink
Java中的流处理框架:Kafka Streams与Flink
|
17天前
|
消息中间件 Java Kafka
教程:Spring Boot集成Kafka Streams流处理框架
教程:Spring Boot集成Kafka Streams流处理框架
|
18天前
|
消息中间件 存储 SQL
RocketMQ与Kafka架构深度对比
RocketMQ与Kafka架构深度对比
|
2月前
|
消息中间件 Kafka Apache
Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK
Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK
|
2月前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
42 0
|
3天前
|
运维 Kubernetes 监控
深入解析微服务架构的演进与实践
本文旨在探究微服务架构从诞生到成熟的发展历程,分析其背后的技术推动力和业务需求,并结合具体案例,揭示实施微服务过程中的挑战与解决策略。通过对微服务架构与传统单体架构的对比,阐明微服务如何优化现代应用开发流程,提高系统的可扩展性、可维护性和敏捷性。
14 0
|
1天前
|
监控 负载均衡 安全
探索微服务架构中的API网关模式
【7月更文挑战第13天】在微服务架构的海洋中,API网关犹如一座灯塔,指引着服务间的通信和客户端请求。本文将深入剖析API网关的核心作用、设计考量以及实现策略,为构建高效、可靠的分布式系统提供实践指南。
17 10