【KafkaStream】微服务学习笔记十一:流式计算概述&KafkaStream入门

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【KafkaStream】微服务学习笔记十一:流式计算概述&KafkaStream入门

一: 流式计算

1.概述

       一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

       可能上面的话看起来不好理解,我举个简单的栗子,批量计算就相当于我们平时搭乘的升降式电梯,运送人是一批一批的,中间会等待人流的积累;而流式计算则相当于我们商场里面搭乘的扶梯,人是源源不断进行运送的,它会不断接收人流,你把人理解成数据就可以理解什么是数据的流式计算了。

2.应用场景

流式计算其实在我们生活中十分常见,下面我举几个例子:

    • 日志分析
      网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
    • 大屏看板统计
      可以实时的查看网站注册数量,订单数量,购买数量,金额等。
    • 公交实时数据
      可以随时更新公交车方位,计算多久到达站牌等
    • 实时文章分值计算
      头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

    3.技术方案选型

    常用的流式计算技术方案选型有如下几种:

      • Hadoop
      • Apche Storm
      • Flink
      • Kafka Stream

             前面三种都是大数据领域常用的技术方案,因此学习成本会比较高,其相关功能大家可以自行查阅官方文档或者相关教程,这里就不多做介绍了。我的项目是基于Java开发的,需求是实现文章分值实时计算,而Kafka Stream可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。 所以我这里选用的技术方案的Kafka Stream来实现流式计算。

      二:Kafka Stream

      1.概述

      Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

      Kafka Stream的特点如下:

        • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
        • 除了Kafka外,无任何外部依赖
        • 充分利用Kafka分区机制实现水平扩展和顺序性保证
        • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
        • 支持正好一次处理语义
        • 提供记录级的处理能力,从而实现毫秒级的低延迟
        • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
        • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

        2.Kafka Streams的关键概念

          • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
          • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

          下面提供一幅官方的图供大家参考帮助理解:

          image.gif编辑

          3.KStream

          3.1:KStream数据结构

          KStream数据结构有点像Map结构,都是Key-Value键值对,见下图:

          image.gif编辑

          3.2:KStream数据流

          KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

          KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

          可能上面的话不太好理解,可以看下图:

          image.gif编辑

          你可能会问,消息是按序发送,为什么处理结果不是3而是5呢?这就是上面提到的第二条数据记录不被视为先前记录的更新,而是新增

          三:入门案例编写

          1.需求分析

                 我们需要利用Kafka Stream实现对单词数量的统计,假如多个生产者发送多条数据,数据是一段英文,那么消费者将统计这些生产者发送的消息中不同单词的个数进行输出。

          2.引入依赖

          <!-- kafkfa -->
          <dependency>
              <groupId>org.springframework.kafka</groupId>
              <artifactId>spring-kafka</artifactId>
              <exclusions>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka-clients</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
          </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-streams</artifactId>
              <exclusions>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>connect-json</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka-clients</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>

          image.gif

          3.创建原生的kafka staream入门案例

          3.1:开启流式计算

          package com.my.kafka.demo;
          import org.apache.kafka.common.serialization.Serdes;
          import org.apache.kafka.streams.KafkaStreams;
          import org.apache.kafka.streams.KeyValue;
          import org.apache.kafka.streams.StreamsBuilder;
          import org.apache.kafka.streams.StreamsConfig;
          import org.apache.kafka.streams.kstream.KStream;
          import org.apache.kafka.streams.kstream.TimeWindows;
          import org.apache.kafka.streams.kstream.ValueMapper;
          import java.time.Duration;
          import java.util.Arrays;
          import java.util.Properties;
          /**
           * 流式处理
           */
          public class KafkaStreamQuickStart {
              public static void main(String[] args) {
                  //kafka的配置信息
                  Properties prop = new Properties();
                  prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                  prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                  prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
                  //stream 构建器
                  StreamsBuilder streamsBuilder = new StreamsBuilder();
                  //流式计算
                  streamProcessor(streamsBuilder);
                  //创建kafkaStream对象
                  KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
                  //开启流式计算
                  kafkaStreams.start();
              }
              /**
               * 流式计算
               * 消息的内容:hello kafka  hello tbug
               * @param streamsBuilder
               */
              private static void streamProcessor(StreamsBuilder streamsBuilder) {
                  //创建kstream对象,同时指定从哪个topic中接收消息
                  KStream<String, String> stream = streamsBuilder.stream("tbug-topic-input");
                  /**
                   * 处理消息的value
                   */
                  stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                      @Override
                      public Iterable<String> apply(String value) {
                          return Arrays.asList(value.split(" "));
                      }
                  })
                          //按照value进行聚合处理
                          .groupBy((key,value)->value)
                          //时间窗口
                          .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                          //统计单词的个数
                          .count()
                          //转换为kStream
                          .toStream()
                          .map((key,value)->{
                              System.out.println("key:"+key+",value:"+value);
                              return new KeyValue<>(key.key().toString(),value.toString());
                          })
                          //发送消息
                          .to("tbug-topic-out");
              }
          }

          image.gif

          3.2:生产者

          package com.my.kafka.demo;
          import org.apache.kafka.clients.producer.KafkaProducer;
          import org.apache.kafka.clients.producer.ProducerConfig;
          import org.apache.kafka.clients.producer.ProducerRecord;
          import java.util.Properties;
          /**
           * 生产者
           */
          public class ProducerDemo {
              public static void main(String[] args) {
                  //1.kafka的配置信息
                  Properties pro = new Properties();
                  //Kafka的连接地址
                  pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  //发送失败,失败重连次数
                  pro.put(ProducerConfig.RETRIES_CONFIG,5);
                  //消息key的序列化器
                  pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                  //消息value的序列化器
                  pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                  //数据压缩
                  pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
                  //2.生产者对象
                  KafkaProducer<String,String> producer = new KafkaProducer<String, String>(pro);
                  for (int i = 0; i < 5; i++) {
                      //3.封装发送消息
                      ProducerRecord<String, String> message = new ProducerRecord<>("tbug-topic-input", "ni gan ma ai yuo");
                      //4.发送消息
                      producer.send(message);
                  }
                  //5.关闭消息通道(必选)
                  producer.close();
              }
          }

          image.gif

          3.3:消费者

          package com.my.kafka.demo;
          import org.apache.kafka.clients.consumer.ConsumerConfig;
          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.apache.kafka.clients.consumer.ConsumerRecords;
          import org.apache.kafka.clients.consumer.KafkaConsumer;
          import java.time.Duration;
          import java.util.Collections;
          import java.util.Properties;
          /**
           * 消费者
           */
          public class ConsumerDemo {
              public static void main(String[] args) {
                  //1.添加Kafka配置信息
                  Properties pro = new Properties();
                  //Kafka的连接地址
                  pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  //消费者组
                  pro.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
                  //消息key的反序列化器
                  pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
                  //消息value的反序列化器
                  pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
                  //手动提交偏移量
                  pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
                  //2.消费者对象
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(pro);
                  //3.订阅主题
                  consumer.subscribe(Collections.singletonList("tbug-topic-out"));
                  //4.设置线程一种处于监听状态
                  //同步提交和异步提交偏移量
                  try {
                      while(true) {
                          //5.获取消息
                          ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000)); //设置每秒钟拉取一次
                          for (ConsumerRecord<String, String> message : messages) {
                              System.out.print(message.key() + ":");
                              System.out.println(message.value());
                          }
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                      System.out.println("记录错误的信息:"+e);
                  } finally {
                      //同步
                      consumer.commitSync();
                  }
              }
          }

          image.gif

          4.测试结果

          先启动消费者和KafkaStream,然后启动消费者发送消息:

          image.gif编辑

          可以看到成功利用流式计算实现了单词数量的统计,当然你也可以设置多个生产者发送消息。

          要注意的是,假如你是第一次启动程序,那么你先启动KafkaStream是会报错的,这时候你需要先启动生产者创建一个topic,然后再启动KafkaStream,等待一会就能接收到消息。

          相关文章
          |
          2月前
          |
          SpringCloudAlibaba Java 网络架构
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
          56 0
          |
          4月前
          |
          消息中间件 RocketMQ 微服务
          分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)(下)
          分布式事物【库存微服务业务层实现、实现充值微服务、充值微服务之业务层实现、账户微服务之业务层实现】(九)-全面详解(学习总结---从入门到深化)
          37 0
          |
          2月前
          |
          SpringCloudAlibaba Java 网络架构
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
          108 0
          |
          1天前
          |
          监控 Java 应用服务中间件
          【微服务系列笔记】Sentinel入门-微服务保护
          Sentinel是一个开源的分布式系统和应用程序的运维监控平台。它提供了实时数据收集、可视化、告警和自动化响应等功能,帮助用户监控和管理复杂的IT环境。本文简单介绍了微服务保护以及常见雪崩问题,解决方案。以及利用sentinel进行入门案例。
          13 3
          |
          1天前
          |
          存储 Java 数据库
          【微服务系列笔记】微服务概述
          本文对比了单体应用和微服务架构。单体应用中所有功能模块在一个工程中,而微服务则按领域模型拆分为独立服务,每个服务有明确边界,可独立开发、部署和扩展。微服务允许使用不同语言和技术栈,每个服务有自己的数据库。微服务架构的优点包括易于开发维护、技术栈开放和错误隔离,但缺点包括增加运维成本、调用链路复杂、分布式事务处理困难以及学习成本高。实现微服务通常涉及SpringCloud等开发框架和Docker等运行平台。
          15 2
          |
          2月前
          |
          SpringCloudAlibaba 负载均衡 Java
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
          68 1
          |
          2月前
          |
          Java Nacos Sentinel
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(九)Nacos+Sentinel+Seata
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(九)Nacos+Sentinel+Seata
          215 0
          |
          2月前
          |
          消息中间件 SpringCloudAlibaba Java
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
          786 0
          |
          2月前
          |
          SpringCloudAlibaba Java 测试技术
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(六)Hystrix(豪猪哥)的使用
          43 1
          |
          2月前
          |
          SpringCloudAlibaba 负载均衡 Java
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(五)OpenFeign的使用
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(五)OpenFeign的使用
          45 0