spring-boot-route(十四)整合Kafka

简介: kafka是用Scala和Java语言开发的,高吞吐量的分布式消息中间件。高吞吐量使它在大数据领域具有天然的优势,被广泛用来记录日志。

在上一章中SpringBoot整合RabbitMQ,已经详细介绍了消息队列的作用,这一种我们直接来学习SpringBoot如何整合kafka发送消息。

kafka简介

kafka是用Scala和Java语言开发的,高吞吐量的分布式消息中间件。高吞吐量使它在大数据领域具有天然的优势,被广泛用来记录日志。

kafka架构分析

注1:图中的红色箭头表示消息的流动过程,蓝色表示分区备份,绿色表示kafka集群注册到zookeeper。

注2:在kafka0.9版本之前,消费者消费消息的位置记录在zookeeper中,在0.9版本之后,消费消息的位置记录在kafka的一个topic上。

kafka名词简介

  1. Producer:消息生产者
  2. Consumer:消息消费者
  3. Consumer Group(CG):消费者组,一个topic可以有多个CG,每个Partition只会把消息发送给GG中的一个Consumer
  4. Broker:一台kafka服务器就是一个broker,一个broker有多个topic
  5. Topic:消息主题,消息分类,可看作队列
  6. Partition:分区,为了实现扩展,一个大的topic可能分布到多个broker上,一个topic可以分为多个partition,partition中的每条消息都会被分配一个有序的id(offset),每个partiton中的消息是有序的。
  7. Offset:kafka的存储文件都是按照offset.kafka来命名的,方便查找,第一个offset为0000000000.kafka。
  8. Leader:分区具有被备份,主分区
  9. Follower:从分区

1. 生产者分区策略

  1. 指定分区。
  2. 没有指定分区但有key值,将key的hash值与当前topic的分区个数进行取余得到分区。
  3. 如果既没有指定分区又没有指定key,第一次调用时随机生成一个整数(以后调用每次在这个整数上自增),将这个随机数与该topic的分区数取余得到分区。

2. 消息可靠性问题

采用ack确认机制来保证消息的可靠性。

kafka在发送消息后会同步到其他分区副本,等所有副本都接收到消息后,kafka才会发送ack进行确认。采用这种模式的劣势就是当其中一个副本宕机后,则消息生产者就不会收到kafka的ack。

kafka采用ISR来解决这个问题。

ISR:Leader维护的一个和leader保持同步的follower集合。

当ISR中的folower完成数据同步之后,leader就会向follower发送ack,如果follower长时间未向leader同步数据,则该follower就会被踢出ISR,该时间阀值的设置参数为replica.lag.time.max.ms,默认时间为10s,leader发生故障后,就会从ISR中选举新的leader。

注:本文所讲的kafka版本为0.11,在0.9版本以前成为ISR还有一个条件,就是同步消息的条数。

ack参数配置

0:生产者不等待broker的ack。

1:leader分区接收到消息向生产者发送ack。

-1(all):ISR中的leader和follower同步成功后,向生产者发送ack。

3. 消息一致性问题

假如leader中有10条消息,向两个follower同步数据,follower A同步了8条,follower B同步了9条。这时候leader宕机了,follower A和follower B中的消息是不一致的,剩下两个follower就会重新选举出一个leader。

  • LEO(log end offset):每个副本的最后一个offset

  • HW(high watermark):所有副本中最小的offset

为了保证数据的一致性,所有的follower会将各自的log文件高出HW的部分截掉,然后再从新的leader中同步数据。

4. 消息重复性问题

在kafka0.11版本中引入了一个新特性:幂等性。启用幂等性后,ack默认为-1。将生产者中的enable.idompotence设置为true,即启用了幂等性。

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。Broker端会对做缓存,当具有相同主键的消息提交时,Broker只会缓存一条。但是每次重启PID就会发生变化,因此只能保证一次会话同一分区的消息不重复。

5. 消费者组分区分配策略

kafka有两种分配策略,一种是RoundRobin,另一种是Range

RoundRobin是按照消费者组以轮询的方式去给消费者分配分区的方式,前提条件是消费者组中的消费者需要订阅同一个topic。

Range是kafka默认的分配策略,它是通过当前的topic按照一定范围来分配的,假如有3个分区,消费者组有两个消费者,则消费者A去消费1和2分区,消费者B去消费3分区。

6. 消费者offset维护

Kafka 0.9 版本之前,consumer默认将offset保存在zookeeper中,0.9 版本开始,offset保存在kafka的一个内置topic中,该topic为_consumer_offsets

7. 生产者事务

为了实现跨分区会话的事务,需要引入一个全局唯一的Tracscation ID,并将Producer 获得的PID与之绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transcation ID,kafka引入了一个新的组件Transcation Coordinator。Producer就是通过和Transcation Coordinator交互获得Transction ID对应的任务状态。

Spring Boot 整合kafka

1. 引入kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置kafka服务信息

spring:
  kafka:
    # kafka服务地址
    bootstrap-servers: 47.104.155.182:9092
    producer:
      # 生产者消息key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 生产者消息value序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消费者组
      group-id: test-consumer-group
      # 消费者消息value反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费者消息value反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 消费者

@Component
@Slf4j
@KafkaListener(topics = {
   "first-topic"},groupId = "test-consumer-group")
public class Consumer {
   

    @KafkaHandler
    public void receive(String message){
   

        log.info("我是消费者,我接收到的消息是:"+message);
    }
}

4. 生产者

@RestController
public class Producer {
   

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping("send")
    public void send(){
   

        String message = "你好,我是Java旅途";
        // 第一个参数 topic
        // 第二个参数 消息
        kafkaTemplate.send("first-topic",message);
    }
}

更多优质内容推荐访问 毕设侠

目录
相关文章
|
容器
Echarts报错 Cant read property getWidth of null的解决方案
Echarts报错 Cant read property getWidth of null的解决方案
277 0
|
物联网 Java Linux
Linux安装与配置Eclipse Paho库:实现MQTT通信
Eclipse Paho是一个开源的MQTT(Message Queuing Telemetry Transport)实现,提供了多种编程语言的客户端库,包括C、C++、Java、Python等。在Linux系统中,通过安装和配置Eclipse Paho库,我们可以方便地实现MQTT通信功能。本文将详细介绍在Linux系统中安装和配置Eclipse Paho库的步骤,以便于开发者在物联网项目中使用MQTT协议进行通信。
2064 0
Server-Sent Events 和 WebSocket 之间有什么区别
Server-Sent Events (SSE) 和 WebSocket 分别代表单向和双向通信机制。SSE,基于 HTTP,仅允许服务器向客户端发送事件流;而 WebSocket 是双向实时通信协议,支持客户端与服务器的双向交互。SSE适合低实时性场景,依赖长轮询或流传输;WebSocket 提供更低延迟,适用于高实时性应用。两者在现代浏览器中普遍被支持,但旧版浏览器或特定网络环境可能影响兼容性。选择哪种机制取决于实际需求,如通信方向、实时性要求及目标浏览器支持。
|
6月前
|
人工智能 数据可视化 数据挖掘
工业零件不良率、残次率的智能数据分析和数字化管理
在传统工业领域,我们通过引入DataV-Note平台,成功实现了企业智能数据分析与数字化管理的初步目标。这一平台不仅显著提升了数据处理的效率和准确性,还为我们的日常运营提供了更加科学、直观的决策支持。然而,这只是智能化转型的第一步。展望未来,我们期望能够进一步深化技术应用,推动企业管理向更高层次的智能化方向迈进。通过持续优化数据分析能力、完善数字化管理体系,我们致力于将企业的运营模式从传统的经验驱动转变为数据驱动,从而全面提升管理效能和市场竞争力,为企业创造更大的长期价值
|
存储 缓存 Kubernetes
在K8S中,有状态服务与无状态服务都是怎么使用pv和pvc?(可以通过应用场景说明一下)
在K8S中,有状态服务与无状态服务都是怎么使用pv和pvc?(可以通过应用场景说明一下)
|
Java Linux 容器
JVM内存问题之什么是OOM-Killer,它通常会在什么情况下触发
JVM内存问题之什么是OOM-Killer,它通常会在什么情况下触发
320 2
|
算法 Java 程序员
【C++专栏】C++入门 | 类和对象 | 面向过程与面向对象的初步认识
【C++专栏】C++入门 | 类和对象 | 面向过程与面向对象的初步认识
146 0
|
Linux 数据安全/隐私保护 网络虚拟化
centos7部署Pritunl
centos7部署Pritunl
|
SQL 前端开发 关系型数据库
芋道框架万字详解(前后端分离)、若依框架、yudao-cloud保姆级攻略
芋道框架万字详解(前后端分离)、若依框架、yudao-cloud保姆级攻略
15713 4
|
Java Maven
ProtostuffUtil—快速序列化和反序列化对象工具
ProtostuffUtil—快速序列化和反序列化对象工具
247 0