【KafkaStream】微服务学习笔记十一:流式计算概述&KafkaStream入门

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 【KafkaStream】微服务学习笔记十一:流式计算概述&KafkaStream入门

一: 流式计算

1.概述

       一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

       可能上面的话看起来不好理解,我举个简单的栗子,批量计算就相当于我们平时搭乘的升降式电梯,运送人是一批一批的,中间会等待人流的积累;而流式计算则相当于我们商场里面搭乘的扶梯,人是源源不断进行运送的,它会不断接收人流,你把人理解成数据就可以理解什么是数据的流式计算了。

2.应用场景

流式计算其实在我们生活中十分常见,下面我举几个例子:

    • 日志分析
      网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
    • 大屏看板统计
      可以实时的查看网站注册数量,订单数量,购买数量,金额等。
    • 公交实时数据
      可以随时更新公交车方位,计算多久到达站牌等
    • 实时文章分值计算
      头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

    3.技术方案选型

    常用的流式计算技术方案选型有如下几种:

      • Hadoop
      • Apche Storm
      • Flink
      • Kafka Stream

             前面三种都是大数据领域常用的技术方案,因此学习成本会比较高,其相关功能大家可以自行查阅官方文档或者相关教程,这里就不多做介绍了。我的项目是基于Java开发的,需求是实现文章分值实时计算,而Kafka Stream可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。 所以我这里选用的技术方案的Kafka Stream来实现流式计算。

      二:Kafka Stream

      1.概述

      Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

      Kafka Stream的特点如下:

        • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
        • 除了Kafka外,无任何外部依赖
        • 充分利用Kafka分区机制实现水平扩展和顺序性保证
        • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
        • 支持正好一次处理语义
        • 提供记录级的处理能力,从而实现毫秒级的低延迟
        • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
        • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

        2.Kafka Streams的关键概念

          • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
          • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

          下面提供一幅官方的图供大家参考帮助理解:

          image.gif编辑

          3.KStream

          3.1:KStream数据结构

          KStream数据结构有点像Map结构,都是Key-Value键值对,见下图:

          image.gif编辑

          3.2:KStream数据流

          KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

          KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

          可能上面的话不太好理解,可以看下图:

          image.gif编辑

          你可能会问,消息是按序发送,为什么处理结果不是3而是5呢?这就是上面提到的第二条数据记录不被视为先前记录的更新,而是新增

          三:入门案例编写

          1.需求分析

                 我们需要利用Kafka Stream实现对单词数量的统计,假如多个生产者发送多条数据,数据是一段英文,那么消费者将统计这些生产者发送的消息中不同单词的个数进行输出。

          2.引入依赖

          <!-- kafkfa -->
          <dependency>
              <groupId>org.springframework.kafka</groupId>
              <artifactId>spring-kafka</artifactId>
              <exclusions>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka-clients</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
          </dependency>
          <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-streams</artifactId>
              <exclusions>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>connect-json</artifactId>
                  </exclusion>
                  <exclusion>
                      <groupId>org.apache.kafka</groupId>
                      <artifactId>kafka-clients</artifactId>
                  </exclusion>
              </exclusions>
          </dependency>

          image.gif

          3.创建原生的kafka staream入门案例

          3.1:开启流式计算

          package com.my.kafka.demo;
          import org.apache.kafka.common.serialization.Serdes;
          import org.apache.kafka.streams.KafkaStreams;
          import org.apache.kafka.streams.KeyValue;
          import org.apache.kafka.streams.StreamsBuilder;
          import org.apache.kafka.streams.StreamsConfig;
          import org.apache.kafka.streams.kstream.KStream;
          import org.apache.kafka.streams.kstream.TimeWindows;
          import org.apache.kafka.streams.kstream.ValueMapper;
          import java.time.Duration;
          import java.util.Arrays;
          import java.util.Properties;
          /**
           * 流式处理
           */
          public class KafkaStreamQuickStart {
              public static void main(String[] args) {
                  //kafka的配置信息
                  Properties prop = new Properties();
                  prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                  prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                  prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
                  //stream 构建器
                  StreamsBuilder streamsBuilder = new StreamsBuilder();
                  //流式计算
                  streamProcessor(streamsBuilder);
                  //创建kafkaStream对象
                  KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
                  //开启流式计算
                  kafkaStreams.start();
              }
              /**
               * 流式计算
               * 消息的内容:hello kafka  hello tbug
               * @param streamsBuilder
               */
              private static void streamProcessor(StreamsBuilder streamsBuilder) {
                  //创建kstream对象,同时指定从哪个topic中接收消息
                  KStream<String, String> stream = streamsBuilder.stream("tbug-topic-input");
                  /**
                   * 处理消息的value
                   */
                  stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                      @Override
                      public Iterable<String> apply(String value) {
                          return Arrays.asList(value.split(" "));
                      }
                  })
                          //按照value进行聚合处理
                          .groupBy((key,value)->value)
                          //时间窗口
                          .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                          //统计单词的个数
                          .count()
                          //转换为kStream
                          .toStream()
                          .map((key,value)->{
                              System.out.println("key:"+key+",value:"+value);
                              return new KeyValue<>(key.key().toString(),value.toString());
                          })
                          //发送消息
                          .to("tbug-topic-out");
              }
          }

          image.gif

          3.2:生产者

          package com.my.kafka.demo;
          import org.apache.kafka.clients.producer.KafkaProducer;
          import org.apache.kafka.clients.producer.ProducerConfig;
          import org.apache.kafka.clients.producer.ProducerRecord;
          import java.util.Properties;
          /**
           * 生产者
           */
          public class ProducerDemo {
              public static void main(String[] args) {
                  //1.kafka的配置信息
                  Properties pro = new Properties();
                  //Kafka的连接地址
                  pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  //发送失败,失败重连次数
                  pro.put(ProducerConfig.RETRIES_CONFIG,5);
                  //消息key的序列化器
                  pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                  //消息value的序列化器
                  pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                  //数据压缩
                  pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
                  //2.生产者对象
                  KafkaProducer<String,String> producer = new KafkaProducer<String, String>(pro);
                  for (int i = 0; i < 5; i++) {
                      //3.封装发送消息
                      ProducerRecord<String, String> message = new ProducerRecord<>("tbug-topic-input", "ni gan ma ai yuo");
                      //4.发送消息
                      producer.send(message);
                  }
                  //5.关闭消息通道(必选)
                  producer.close();
              }
          }

          image.gif

          3.3:消费者

          package com.my.kafka.demo;
          import org.apache.kafka.clients.consumer.ConsumerConfig;
          import org.apache.kafka.clients.consumer.ConsumerRecord;
          import org.apache.kafka.clients.consumer.ConsumerRecords;
          import org.apache.kafka.clients.consumer.KafkaConsumer;
          import java.time.Duration;
          import java.util.Collections;
          import java.util.Properties;
          /**
           * 消费者
           */
          public class ConsumerDemo {
              public static void main(String[] args) {
                  //1.添加Kafka配置信息
                  Properties pro = new Properties();
                  //Kafka的连接地址
                  pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.234.48.192:9092");
                  //消费者组
                  pro.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
                  //消息key的反序列化器
                  pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
                  //消息value的反序列化器
                  pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
                  //手动提交偏移量
                  pro.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
                  //2.消费者对象
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(pro);
                  //3.订阅主题
                  consumer.subscribe(Collections.singletonList("tbug-topic-out"));
                  //4.设置线程一种处于监听状态
                  //同步提交和异步提交偏移量
                  try {
                      while(true) {
                          //5.获取消息
                          ConsumerRecords<String, String> messages = consumer.poll(Duration.ofMillis(1000)); //设置每秒钟拉取一次
                          for (ConsumerRecord<String, String> message : messages) {
                              System.out.print(message.key() + ":");
                              System.out.println(message.value());
                          }
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                      System.out.println("记录错误的信息:"+e);
                  } finally {
                      //同步
                      consumer.commitSync();
                  }
              }
          }

          image.gif

          4.测试结果

          先启动消费者和KafkaStream,然后启动消费者发送消息:

          image.gif编辑

          可以看到成功利用流式计算实现了单词数量的统计,当然你也可以设置多个生产者发送消息。

          要注意的是,假如你是第一次启动程序,那么你先启动KafkaStream是会报错的,这时候你需要先启动生产者创建一个topic,然后再启动KafkaStream,等待一会就能接收到消息。

          相关文章
          |
          2月前
          |
          存储 Java 关系型数据库
          微服务概述
          微服务概述
          43 1
          微服务概述
          |
          2月前
          |
          Kubernetes Cloud Native 开发者
          云原生入门:从容器到微服务
          本文将带你走进云原生的世界,从容器技术开始,逐步深入到微服务架构。我们将通过实际代码示例,展示如何利用云原生技术构建和部署应用。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的信息和启示。
          |
          2月前
          |
          Cloud Native 持续交付 云计算
          云原生入门指南:从容器到微服务
          【10月更文挑战第28天】在数字化转型的浪潮中,云原生技术成为推动现代软件开发的关键力量。本篇文章将带你了解云原生的基本概念,探索它如何通过容器化、微服务架构以及持续集成和持续部署(CI/CD)的实践来提升应用的可伸缩性、灵活性和可靠性。你将学习到如何利用这些技术构建和部署在云端高效运行的应用,并理解它们对DevOps文化的贡献。
          69 2
          |
          2月前
          |
          Kubernetes 关系型数据库 MySQL
          Kubernetes入门:搭建高可用微服务架构
          【10月更文挑战第25天】在快速发展的云计算时代,微服务架构因其灵活性和可扩展性备受青睐。本文通过一个案例分析,展示了如何使用Kubernetes将传统Java Web应用迁移到Kubernetes平台并改造成微服务架构。通过定义Kubernetes服务、创建MySQL的Deployment/RC、改造Web应用以及部署Web应用,最终实现了高可用的微服务架构。Kubernetes不仅提供了服务发现和负载均衡的能力,还通过各种资源管理工具,提升了系统的可扩展性和容错性。
          149 3
          |
          3月前
          |
          Dubbo Java 应用服务中间件
          Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
          尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。
          |
          2月前
          |
          监控 API 持续交付
          后端开发中的微服务架构:从入门到精通
          【10月更文挑战第26天】 在当今的软件开发领域,微服务架构已经成为了众多企业和开发者的首选。本文将深入探讨微服务架构的核心概念、优势以及实施过程中可能遇到的挑战。我们将从基础开始,逐步深入了解如何构建、部署和管理微服务。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和实用的建议。
          51 0
          |
          5月前
          |
          Cloud Native Java Nacos
          微服务注册中心-Nacos概述
          该博客文章提供了对Nacos的全面概述,包括其基本介绍、与Spring Cloud集成的优势、主要功能以及如何在Spring Cloud Alibaba项目中作为服务注册中心使用Nacos。文章解释了Nacos是一个动态服务发现、配置管理和服务管理平台,支持服务发现、健康监测、动态配置、DNS服务和元数据管理。还介绍了如何下载和启动Nacos服务器,以及如何将微服务注册到Nacos中,包括修改pom.xml文件引入依赖、配置application.properties文件和使用@EnableDiscoveryClient注解开启服务注册发现功能。
          微服务注册中心-Nacos概述
          |
          5月前
          |
          运维 Cloud Native Android开发
          云原生之旅:容器化与微服务架构的融合之道安卓应用开发入门指南
          本文将深入探讨云原生技术的核心要素——容器化和微服务架构,并揭示它们如何共同推动现代软件的开发与部署。通过实际案例分析,我们将看到这两种技术如何相辅相成,助力企业实现敏捷、可扩展的IT基础设施。文章旨在为读者提供一条清晰的道路,指引如何在云原生时代利用这些技术构建和优化应用。 本文将引导初学者了解安卓应用开发的基本概念和步骤,从安装开发环境到编写一个简单的“Hello World”程序。通过循序渐进的讲解,让读者快速掌握安卓开发的核心技能,为进一步深入学习打下坚实基础。
          58 1
          |
          5月前
          |
          存储 Java 关系型数据库
          微服务概述
          本文介绍了微服务基本概念、演变过程,主要的原则及优缺点,最后谈到了微服务的实现方案之一,作为引文此章节偏理论知识一些。后续笔者将借助具体技术实现带领读者一起逐步实现微服务架构体系。
          |
          8月前
          |
          SpringCloudAlibaba Java 网络架构
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
          【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
          324 0