kafka第三次课!

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: kafka第三次课!

1,课程回顾

2,本章重点

springboot整合kafka

springcloud整合kafka

3,具体内容

3.1 springboot整合kafka

3.1.1 pom.xml添加jar



org.springframework.kafka

spring-kafka

2.8.1

com.alibaba fastjson 1.2.79 注意:此处使用的springboot版本为2.4.1 kafka是编写课件时最新版本2.6.6,不是任意版本都兼容 3.1.2 配置文件application.properties #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts

#生产者配置

#spring整合kafka配置

#连接集群配置

spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

重试次数

spring.kafka.producer.retries=3

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=-1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=10

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

#自定义topic名称

topicName=topic-deptinfo

#消费者配置

#springboot整合 kafka

#消费者配置

#连接集群配置

spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

#自定义topic名称

topicName=topic-deptinfo

3.1.3 生成者代码

package com.aaa.sbm.task;
import com.aaa.sbm.entity.Dept;
import com.aaa.sbm.service.DeptService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
• @ fileName:TimedSendDeptInfoTask
• @ description:
• @ author:zhz
• @ createTime:2022/1/13 9:37
• @ version:1.0.0/
@Component //不在3层之内,交给IOC处理
@EnableScheduling //开启定时任务
@EnableAsync //开启异步处理 可以在任务方法上使用@Async 该方法多线程处理时,可以异步处理,提高执行效率
@Slf4j
public class TimedSendDeptInfoTask {
//spring用到了哪些涉及模式 模板模式(封装出来一个通用的工具模板,供你完成什么功能,简化整个操作流程)
@Resource
private KafkaTemplate kafkaTemplate;
//juc包下线程安全的类,可以实现多线程同步自增
private AtomicInteger atomicInteger =new AtomicInteger();
//注入topic名称
@Value(“${topicName}”)
private String topicN;
/@Resourceprivate DeptService deptService;/
/@Resourceprivate RestTemplate restTemplate;/ //HttpClient
/@Resource //模板模式private RedisTemplate redisTemplate;*//**
• 定制执行任务
• 每隔3秒 使用多线程发送5条部门信息到kafka中
/
// cron=“秒 分 时 日 月 周” / 每 - 范围(3-10 每分钟的3秒到10秒每秒执行一次) , 选项 (3,10,15 每分钟第3秒第10秒第15秒执行)
@Scheduled(cron = "/3 * * * * ?")
public void timedExecute(){
//实例化固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//lambda表达式 简化代码写法 ()->sendDeptInfo() 左边是参数 右边是执行业务
/* executorService.execute(new Runnable() {
@Override
public void run() {
sendDeptInfo();
System.out.println(“1”);
System.out.println(“2”);
}
});/
/ executorService.execute(()-> {
sendDeptInfo();
System.out.println(“1”);
System.out.println(“2”);
});*/
//启动5个线程执行
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
//关闭线程池
executorService.shutdown();
}
/**
• 发送部门信息
*/
// @Async //异步处理,提高效率
public void sendDeptInfo(){
log.info(“线程信息为:”+Thread.currentThread().getName()+“,正在执行。。。。。。。。。。。。。。”);
int getAndIncrement = atomicInteger.getAndIncrement();
Dept dept =new Dept(getAndIncrement,“dev”+getAndIncrement,“zz”+getAndIncrement);
log.info(“要发送的部门信息为:”+dept);
//发送部门信息到kafka中 一定要是字符串格式
kafkaTemplate.send(topicN, JSON.toJSONString(dept));
}
}

3.1.4 消费者代码

package com.aaa.sbm.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
• @ fileName:KafkaConsumer
• @ description:工具类用来监控topic-deptinof,不停的获取message
• @ author:zhz
• @ createTime:2022/1/13 10:26
• @ version:1.0.0*/@Component@Slf4jpublic class KafkaConsumer {/**
• 消费消息方法 借助 @KafkaListener指定消费的topic 如果该topic有信息都回被拉取pull 到参数中
• @param record
*/
@KafkaListener(topics = {“${topicName}”}) //监听注解 监听指定的topic
public void pullKafkaMsg(ConsumerRecord record){
//jdk8之后封装的专门处理空值一个类,有效防止空指针异常
Optional optional = Optional.ofNullable(record.value());
// isPresent等同于if(record!=null)
if(optional.isPresent()){
log.info(“接受到的信息为:”+ record);
log.info(“接受到的部门信息为:”+ optional.get());
}
}
}

3.1.5 测试

消费消息

启动消费者项目,观察控制台

生产信息

直接启动生成者,观察控制台

3.2 springcloud整合kafka(以121讲课项目为例子)

3.2.1 添加jar

父项目:



org.springframework.kafka

spring-kafka

2.2.14.RELEASE

<!-- fastjson的jar包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.18</version>
        </dependency>

注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常

微服务:


org.projectlombok

lombok




org.springframework.kafka

spring-kafka




com.alibaba

fastjson


3.2.2 生成者配置application.properties

图片: https://shimo.im/fake.png

#springboot 整合kafka

#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts

spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#生产者配置

重试次数

spring.kafka.producer.retries=0

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=0

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3.2.3 消费者配置application.properties

#springboot 整合kafka

#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\Windows\System32\drivers\etc\hosts

spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#消费者配置

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

3.2.4 生产者代码

图片: https://shimo.im/fake.png

@Autowired
private KafkaTemplate kafkaTemplate;
//发送消息方法
@GetMapping(“productOrder”)
public String send() {
Order order =new Order();
order.setId(100);
order.setMemberUsername(“测试生产者”);
order.setShopId(1001);
//log.info(“+++++++++++++++++++++ message = {}”, JSON.toJSONString(dept));
//topic-dept为主题
kafkaTemplate.send(“topic-order”, JSON.toJSONString(order));
return “suc”;
}

3.2.5 消费者代码

图片: https://shimo.im/fake.png

package com.aaa.ss.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
• @ fileName:KafkaConsumer
• @ description:
• @ author:zhz
• @ createTime:2021/2/20 17:20
*/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {“topic-order”})
public void consumer(ConsumerRecord record){
Optional kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info(“----------------- record =” + record);
log.info(“------------------ message =” + message);
}
}
}
3.2.6  测试
            1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果)
             http://localhost:2221/order/productOrder
            2,观察消费者项目

图片: https://shimo.im/fake.png

3,还可以使用命令查看后台topic

图片: https://shimo.im/fake.png

4,知识点总结

5,本章面试题


目录
相关文章
|
存储 测试技术 编译器
芯片测试:万字长文一起聊聊IC测试机-ATE
芯片测试:万字长文一起聊聊IC测试机-ATE
1876 0
|
6月前
|
机器学习/深度学习 算法 关系型数据库
强化学习:动态规划求解最优状态价值函数——手把手教你入门强化学习(四)
本文介绍了基于模型的强化学习算法,重点讲解动态规划(DP)。动态规划通过分解问题为子问题求解状态价值函数,利用贝尔曼期望方程迭代更新。其核心性质包括最优子结构和重叠子问题,适用于已知转移概率和奖励的MDP场景。文章回顾了前期强化学习基础,并展望了后续内容如蒙特卡罗法。适合初学者系统了解强化学习算法原理与应用。
185 7
|
4月前
|
监控 网络协议 Linux
硬件厂商的MIB文档详解 | 如何查询OID? | MIB Browser实战指南-优雅草卓伊凡
硬件厂商的MIB文档详解 | 如何查询OID? | MIB Browser实战指南-优雅草卓伊凡
451 12
硬件厂商的MIB文档详解 | 如何查询OID? | MIB Browser实战指南-优雅草卓伊凡
|
存储 Kubernetes 调度
k8s常见的排错指南Node,svc,Pod等以及K8s网络不通问题
k8s常见的排错指南Node,svc,Pod等以及K8s网络不通问题
4761 1
|
11月前
|
JSON JavaScript 前端开发
js如何格式化一个JSON对象?
js如何格式化一个JSON对象?
386 3
|
10月前
|
弹性计算 Kubernetes Perl
k8s 设置pod 的cpu 和内存
在 Kubernetes (k8s) 中,设置 Pod 的 CPU 和内存资源限制和请求是非常重要的,因为这有助于确保集群资源的合理分配和有效利用。你可以通过定义 Pod 的 `resources` 字段来设置这些限制。 以下是一个示例 YAML 文件,展示了如何为一个 Pod 设置 CPU 和内存资源请求(requests)和限制(limits): ```yaml apiVersion: v1 kind: Pod metadata: name: example-pod spec: containers: - name: example-container image:
1374 2
|
网络虚拟化 网络架构
路由器实现 IP 子网之间的通信
路由器实现 IP 子网之间的通信
187 2
|
监控 安全 Java
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
在Java中如何优雅的停止一个线程?可别再用Thread.stop()了!
220 2
|
Kubernetes API Python

热门文章

最新文章