kafka快速入门2

简介: kafka快速入门2
1.3.6 消费者详解-理解

(1)消费者工作原理

(2)其他参数详解

  • enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设置为false,由自己控制何时提交偏移量。如果把它设置为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。

auto.offset.reset

  • earliest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest
  • 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none
    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  • anything else
    向consumer抛出异常

(3)提交和偏移量

每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

如下图:

如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

如下图:

(4)自动提交偏移量

enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms控制,默认值是5秒。

需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

(5)提交当前偏移量(同步提交)

enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
        try {
            consumer.commitSync();//同步提交当前最新的偏移量
        }catch (CommitFailedException e){
            System.out.println("记录提交失败的异常:"+e);
        }
    }
}

(6)异步提交

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });
}

(7)同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try {
    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
        consumer.commitAsync();
    }
}catch (Exception e){
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

1.4 spring boot集成kafka收发消息

1.4.1 环境搭建

(1)pom依赖,最终的依赖信息

<!-- 继承Spring boot工程 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.5.RELEASE</version>
</parent>
<properties>
    <kafka.version>2.2.7.RELEASE</kafka.version>
    <kafka.client.version>2.0.1</kafka.client.version>
    <fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>${kafka.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>${kafka.client.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>connect-json</artifactId>
                <groupId>org.apache.kafka</groupId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.client.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>

(2)在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test-hello-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)引导类

package com.oldlu.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class,args);
    }
}
1.4.2 消息生产者

新建controller

package com.oldlu.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @GetMapping("/hello")
    public String hello(){
        //第一个参数:topics  
        //第二个参数:消息内容
        kafkaTemplate.send("kafka-hello","程序员");
        return "ok";
    }
}
1.4.3 消息消费者

新建监听类:

package com.oldlu.kafka.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class HelloListener {
    @KafkaListener(topics = {"hello-oldlu"})
    public void receiverMessage(ConsumerRecord<?,?> record){
        Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
        if(optional.isPresent()){
            Object value = record.value();
            System.out.println(value);
        }
    }
}
1.4.4 测试

启动项目访问:http://localhost:9991/hello

控制台打印,效果如下

1.5 传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

(1)新建类User

package com.oldlu.kafka.pojo;
public class User {
    private String username;
    private Integer age;
    //setter  getter
}

(2)修改消息发送

@RestController
public class HelloController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @GetMapping("/hello")
    public String hello(){
        //发送消息
        User user = new User();
        user.setUsername("zhangsan");
        user.setAge(18);
        kafkaTemplate.send("hello-oldlu", JSON.toJSONString(user));
        return "ok";
    }
}

(4)修改消费者

@Component
public class HelloListener {
    @KafkaListener(topics = {"hello-oldlu"})
    public void receiverMessage(ConsumerRecord<?,?> record){
        Optional<? extends ConsumerRecord<?, ?>> optional = Optional.ofNullable(record);
        if(optional.isPresent()){
            Object value = record.value();
            User user = JSON.parseObject((String) value, User.class);
            System.out.println(user);
        }
    }
}

测试效果如下:


目录
相关文章
|
7月前
|
消息中间件 存储 安全
kafka快速入门1
kafka快速入门1
93 0
|
8月前
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
9月前
|
消息中间件 存储 传感器
macOS 系统 安装 Kafka 快速入门
macOS 系统 安装 Kafka 快速入门
160 0
|
消息中间件 存储 Java
Kafka快速入门
Kafka快速入门
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 Kafka
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
Kafka快速入门(命令行操作)
|
消息中间件 Kafka
Kafka快速入门(安装集群)
Kafka快速入门(安装集群)
|
消息中间件 缓存 Kafka
Kafka快速入门(介绍)
Kafka快速入门(介绍)
Kafka快速入门(介绍)

热门文章

最新文章