【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结

简介: 【Kafka】(二十四)轻量级流计算 Kafka Streams 实践总结

文章目录


一、概述

1.1 Kafka Streams

1.2 Kafka Streams 特点

1.3 为什么要有 Kafka Streams


二、Kafka Streams 数据清洗案例

0)需求

1)需求分析

2)案例实操


三、总结


一、概述


1.1 Kafka Streams

Kafka Streams。Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的 库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。


1.2 Kafka Streams 特点


1. 功能强大


高扩展性,弹性,容错


2. 轻量级


无需专门的集群

一个库,而不是框架


3. 完全集成


100%的 Kafka 0.10.0 版本兼容

易于集成到现有的应用程序


4. 实时性


毫秒级延迟

并非微批处理

窗口允许乱序数据

允许迟到数据


1.3 为什么要有 Kafka Streams


当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm。Apache Storm 发展多年,应用广泛,提供记录级别的处理能力, 当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便与图计算, SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外, 目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。


既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?


主要有如下原因。


第一,Spark 和 Storm 都是流式处理框架,而 Kafka Streams 提供的是一个基于 Kafka 的 流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难 了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Streams 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。



第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部署仍然相对复杂。而 Kafka Streams 作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。


第三,就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用 Kafka Streams 的成本非常低。


第四,使用 Storm 或 Spark Streaming 时,需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使对于应用实例而言,框架本身也会占 用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不 占用系统资源。


第五,由于 Kafka 本身提供数据持久化,因此 Kafka Streams 提供滚动部署和滚动升级以 及重新计算的能力。


第六,由于 Kafka Consumer Rebalance 机制,Kafka Stream 可以在线动态调整并行度。


二、Kafka Streams 数据清洗


案例


0)需求


实时处理单词带有”>>>”前缀的内容。例如输入”aaa>>>bbb”,最终处理成 “bbb”


1)需求分析



2)案例实操


创建一个工程,并添加 jar 包


创建主类

package com.atguigu.kafka.stream; 
import java.util.Properties; 
import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.processor.Processor; 
import org.apache.kafka.streams.processor.ProcessorSupplier; 
import org.apache.kafka.streams.processor.TopologyBuilder; 
public class Application { 
  public static void main(String[] args) { 
  // 定义输入的 
  topic String from = "first"; 
  // 定义输出的 
  topic String to = "second"; 
  // 设置参数 
  Properties settings = new Properties();
  settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
  settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
  StreamsConfig config = new StreamsConfig(settings); 
  // 构建拓扑 
  TopologyBuilder builder = new TopologyBuilder(); 
  builder.addSource("SOURCE", from) 
  .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() { 
    @Override 
    public Processor<byte[], byte[]> get() { 
  // 具体分析处理
    return new LogProcessor();
    } 
   }, "SOURCE")
  .addSink("SINK", to, "PROCESS"); 
  // 创建 kafka streams 
  KafkaStreams streams = new KafkaStreams(builder, config); 
  streams.start(); 
  } 
}

具体业务处理


package com.atguigu.kafka.stream;
import org.apache.kafka.streams.processor.Processor; 
import org.apache.kafka.streams.processor.ProcessorContext; 
public class LogProcessor implements Processor<byte[], byte[]> { 
  private ProcessorContext context; 
  @Override 
  public void init(ProcessorContext context) { 
  this.context = context; 
  }
  @Override 
  public void process(byte[] key, byte[] value) { 
  String input = new String(value); 
  // 如果包含“>>>”则只保留该标记后面的内容 
  if (input.contains(">>>")) { 
  input = input.split(">>>")[1].trim(); 
  // 输出到下一个topic 
    context.forward("logProcessor".getBytes(), input.getBytes());
  }else{
    context.forward("logProcessor".getBytes(), input.getBytes()); 
    } 
  }
  @Override 
  public void punctuate(long timestamp) { 
  }
  @Override 
  public void close() { 
  } 
}


(4)运行程序


(5)在 hadoop104 上启动生产者


[root@hadoop104 kafka]$ bin/kafka-console-producer.sh \ 
--broker-list hadoop102:9092 --topic first 
>hello>>>world 
>>aaa>>>bbb
>>hahaha

(6)在 hadoop103 上启动消费者


[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic second 
world 
bbb
hahaha


三、总结


  • Kafka Streams的并行模型完全基于Kafka的分区机制和Rebalance机制,实现了在线动态调整并行度;


  • 同一Task包含了一个子Topology的所有Processor,使得所有处理逻辑都在同一线程内完成,避免了不必的网络通信开销,从而提高了效率;


  • through方法提供了类似Spark的Shuffle机制,为使用不同分区策略的数据提供了Join的可能;


  • log compact提高了基于Kafka的state store的加载效率;


  • state store为状态计算提供了可能;


  • 基于offset的计算进度管理以及基于state store的中间状态管理为发生Consumer rebalance或Failover时从断点处继续处理提供了可能,并为系统容错性提供了保障;


  • KTable的引入,使得聚合计算拥用了处理乱序问题的能力;

目录
相关文章
|
10月前
|
消息中间件 运维 Kafka
|
10月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
526 4
|
4月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
160 4
|
7月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
7月前
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
245 2
|
7月前
|
消息中间件 Java Kafka
|
7月前
|
消息中间件 存储 算法
时间轮在Kafka的实践:技术深度剖析
【8月更文挑战第13天】在分布式消息系统Kafka中,时间轮(Timing Wheel)作为一种高效的时间调度机制,被广泛应用于处理各种延时操作,如延时生产、延时拉取和延时删除等。本文将深入探讨时间轮在Kafka中的实践应用,解析其技术原理、优势及具体实现方式。
206 2
|
7月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
238 4
|
8月前
|
消息中间件 分布式计算 NoSQL
EMR-Kafka Connect:高效数据迁移的革新实践与应用探索
Kafka Connect是Kafka官方提供的一个可扩展的数据传输框架,它允许用户以声明式的方式在Kafka与其他数据源之间进行数据迁移,无需编写复杂的数据传输代码。
|
7月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
74 0