1,课程回顾
2,本章重点
springboot整合kafka
springcloud整合kafka
3,具体内容
3.1 springboot整合kafka
3.1.1 pom.xml添加jar
org.springframework.kafka
spring-kafka
2.8.1
com.alibaba fastjson 1.2.79 注意:此处使用的springboot版本为2.4.1 kafka是编写课件时最新版本2.6.6,不是任意版本都兼容 3.1.2 配置文件application.properties #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts
#生产者配置
#spring整合kafka配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
重试次数
spring.kafka.producer.retries=3
应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=-1
批量大小
spring.kafka.producer.batch-size=16384
提交延时
spring.kafka.producer.properties.linger.ms=10
当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
#自定义topic名称
topicName=topic-deptinfo
#消费者配置
#springboot整合 kafka
#消费者配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
当kafka中没有初始offset或offset超出范围时将自动重置offset
earliest:重置为分区中最小的offset;
latest:重置为分区中最新的offset(消费分区中新产生的数据);
none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
设置批量消费
spring.kafka.listener.type=batch
批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
#自定义topic名称
topicName=topic-deptinfo
3.1.3 生成者代码
package com.aaa.sbm.task; import com.aaa.sbm.entity.Dept; import com.aaa.sbm.service.DeptService; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** • @ fileName:TimedSendDeptInfoTask • @ description: • @ author:zhz • @ createTime:2022/1/13 9:37 • @ version:1.0.0/ @Component //不在3层之内,交给IOC处理 @EnableScheduling //开启定时任务 @EnableAsync //开启异步处理 可以在任务方法上使用@Async 该方法多线程处理时,可以异步处理,提高执行效率 @Slf4j public class TimedSendDeptInfoTask { //spring用到了哪些涉及模式 模板模式(封装出来一个通用的工具模板,供你完成什么功能,简化整个操作流程) @Resource private KafkaTemplate kafkaTemplate; //juc包下线程安全的类,可以实现多线程同步自增 private AtomicInteger atomicInteger =new AtomicInteger(); //注入topic名称 @Value(“${topicName}”) private String topicN; /@Resourceprivate DeptService deptService;/ /@Resourceprivate RestTemplate restTemplate;/ //HttpClient /@Resource //模板模式private RedisTemplate redisTemplate;*//** • 定制执行任务 • 每隔3秒 使用多线程发送5条部门信息到kafka中 / // cron=“秒 分 时 日 月 周” / 每 - 范围(3-10 每分钟的3秒到10秒每秒执行一次) , 选项 (3,10,15 每分钟第3秒第10秒第15秒执行) @Scheduled(cron = "/3 * * * * ?") public void timedExecute(){ //实例化固定线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //lambda表达式 简化代码写法 ()->sendDeptInfo() 左边是参数 右边是执行业务 /* executorService.execute(new Runnable() { @Override public void run() { sendDeptInfo(); System.out.println(“1”); System.out.println(“2”); } });/ / executorService.execute(()-> { sendDeptInfo(); System.out.println(“1”); System.out.println(“2”); });*/ //启动5个线程执行 executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); executorService.execute(()->sendDeptInfo()); //关闭线程池 executorService.shutdown(); } /** • 发送部门信息 */ // @Async //异步处理,提高效率 public void sendDeptInfo(){ log.info(“线程信息为:”+Thread.currentThread().getName()+“,正在执行。。。。。。。。。。。。。。”); int getAndIncrement = atomicInteger.getAndIncrement(); Dept dept =new Dept(getAndIncrement,“dev”+getAndIncrement,“zz”+getAndIncrement); log.info(“要发送的部门信息为:”+dept); //发送部门信息到kafka中 一定要是字符串格式 kafkaTemplate.send(topicN, JSON.toJSONString(dept)); } }
3.1.4 消费者代码
package com.aaa.sbm.util; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** • @ fileName:KafkaConsumer • @ description:工具类用来监控topic-deptinof,不停的获取message • @ author:zhz • @ createTime:2022/1/13 10:26 • @ version:1.0.0*/@Component@Slf4jpublic class KafkaConsumer {/** • 消费消息方法 借助 @KafkaListener指定消费的topic 如果该topic有信息都回被拉取pull 到参数中 • @param record */ @KafkaListener(topics = {“${topicName}”}) //监听注解 监听指定的topic public void pullKafkaMsg(ConsumerRecord record){ //jdk8之后封装的专门处理空值一个类,有效防止空指针异常 Optional optional = Optional.ofNullable(record.value()); // isPresent等同于if(record!=null) if(optional.isPresent()){ log.info(“接受到的信息为:”+ record); log.info(“接受到的部门信息为:”+ optional.get()); } } }
3.1.5 测试
消费消息
启动消费者项目,观察控制台
生产信息
直接启动生成者,观察控制台
3.2 springcloud整合kafka(以121讲课项目为例子)
3.2.1 添加jar
父项目:
org.springframework.kafka
spring-kafka
2.2.14.RELEASE
<!-- fastjson的jar包--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.18</version> </dependency>
注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常
微服务:
org.projectlombok
lombok
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
3.2.2 生成者配置application.properties
#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#生产者配置
重试次数
spring.kafka.producer.retries=0
应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
批量大小
spring.kafka.producer.batch-size=16384
提交延时
spring.kafka.producer.properties.linger.ms=0
当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
3.2.3 消费者配置application.properties
#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#消费者配置
默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
当kafka中没有初始offset或offset超出范围时将自动重置offset
earliest:重置为分区中最小的offset;
latest:重置为分区中最新的offset(消费分区中新产生的数据);
none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
设置批量消费
spring.kafka.listener.type=batch
批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
3.2.4 生产者代码
@Autowired private KafkaTemplate kafkaTemplate; //发送消息方法 @GetMapping(“productOrder”) public String send() { Order order =new Order(); order.setId(100); order.setMemberUsername(“测试生产者”); order.setShopId(1001); //log.info(“+++++++++++++++++++++ message = {}”, JSON.toJSONString(dept)); //topic-dept为主题 kafkaTemplate.send(“topic-order”, JSON.toJSONString(order)); return “suc”; }
3.2.5 消费者代码
package com.aaa.ss.util; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; /** • @ fileName:KafkaConsumer • @ description: • @ author:zhz • @ createTime:2021/2/20 17:20 */ @Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = {“topic-order”}) public void consumer(ConsumerRecord record){ Optional kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info(“----------------- record =” + record); log.info(“------------------ message =” + message); } } }
3.2.6 测试 1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果) http://localhost:2221/order/productOrder 2,观察消费者项目
3,还可以使用命令查看后台topic
4,知识点总结
5,本章面试题