点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
生产者阶段丢失
生产者Broker阶段丢失
消费者丢失消息
延时队列
基本概念
两个Follower副本都已经拉取到了Leader副本的最新位置,此时又向Leader副本发送拉取请求,而Leader副本并没有新的消息写入,那么此时Leader副本应该如何处理问题呢?可以直接返回空的拉取结果给Follower副本,不过在Leader副本一直没有新消息写入的情况下,Follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes),由参数(fetch.min.bytes配置,默认为1),那么就会创建一个延时拉取的操作(DelyedFetch)以等待拉取足够数量的消息。当延时拉取操作执行时,会再读取一次日志,然后将拉去结果返回给Follower副本。
延时操作不只是拉取消息时特有的操作,在Kafka中也有多种演示操作,比如延时数据删除、延时生产等。
对于延时生产而言,如果在使用生产者客户端发送消息的时候将acks设置为-1,那么意味着需要等待ISR集合中所有副本都确认收到消息之后才能正确的响应结果,或者捕获超时异常。
假设某个分区有3个副本,Leader、Follower1和Follower2,他们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求之后,将消息3和合消息4写入Leader副本的本地日志文件。
由于客户端设置了acks=-1,那么需要等到Follower1和Follower2两个副本都收到消息3和消息4才能告知客户端正确的接收了所发送的消息。如果在一定时间内,Follower1副本和Follower2副本没有能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数:request.timeout.ms配置,默认值为3000,即30秒。
那么这里等待消息3和消息4写入Follower1副本和Follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的?在将消息写入Leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作DelayedProduce,用来处理消息正常写入所有副本或超时情况,以返回相应的响应结果给客户端。
延时操作
延时操作需要延时反应响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同与定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。
延时生产操作
它的外部事件是所要写入消息的某个分区HW(HighWatermark)发生增长。也就是说,随着Follower副本不断的与Leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成次延时生产操作,如果可以就执行以此返回响应结果给客户端,如果在超时时间内始终无法完成,则强制执行。
延迟拉取操作
是由超时触发或外部事件触发而被执行的,超时触发很好理解,就要等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍微复杂了一些,因为拉取请求不单单由Follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也不同的。如果是Follower副本的延时拉取,它的外部事件就是消息追加到了Leader副本的本地日志文件中,如果是消费者客户端的延时拉取,它的外部事件可以简单的理解为HW的增长。
实现方式
时间轮实现延时队列:TimeWheel,size,每个单元格的事件,每个单元格都代表一个时间,size*每个单元格的时间就是一个周期。
重试队列
基本概念
Kafka没有重试机制不支持消息重试,也没有死信队列,因此使用Kafka做消息队列时,需要自己实现消息重试的功能。
如何实现
创建新的Kafka主题作为重试队列:
创建一个topic作为重试topic,用于接收等待重试的消息
普通topic消费者设置等待重试消息的下一个重试topic
从重试topic获取等待重试消息存储到redis的zset中,并以下一次消费时间排序
定时任务从Redis获取到达消费事件的消息,并把消息发送到对应的topic
同一个消息重试次数过多则不再重试
代码实现
新建项目
由于重复了很多次,且没什么技术难度,这里跳过。
我们新建一个Maven项目。
POM.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
修改配置
spring: application: name: kafka-test kafka: bootstrap-servers: h121.wzk.icu:9092 producer: key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest redis: database: 0 host: h121.wzk.icu port: 6379 password: lettuce: pool: min-idle: 8 max-idle: 500 max-active: 2000 max-wait: 10000 timeout: 5000 server: port: 8085
启动类
package icu.wzk; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StartApp { public static void main(String[] args) { SpringApplication.run(StartApp.class, args); } }
AppConfig
package icu.wzk.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; @Configuration public class AppConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接⼯⼚ template.setConnectionFactory(factory); return template; } }
KafkaController
package icu.wzk.controller; import icu.wzk.service.KafkaService; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.concurrent.ExecutionException; @RestController public class KafkaController { @Resource private KafkaService kafkaService; private static final String TOPIC = "tp_demo_retry_01"; @RequestMapping("/send/{message}") public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message); String result = kafkaService.sendMessage(record); return result; } }