SpringBoot使用Kafka生产者、消费者

简介: SpringBoot使用Kafka生产者、消费者

image.png

@[TOC]

依赖

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>xxx</version>
</dependency>
AI 代码解读

配置文件

image.png

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    bootstrap-servers: 192.168.20.75:9907

kafka:
  spark:
    task:
      topic: platform-model-spark-topic1
AI 代码解读

生产者

方法一:添加@RunWith(SpringRunner.class)@SpringBootTest(classes = DataComputingModelApplication.class)实现初始化配置注入kafkaTemplate,调用send()
@Autowired
private KafkaTemplate kafkaTemplate;

    @Test
    public void kafkaSend() {
   
        final ProducerRecord<String, String> record = new ProducerRecord("test20201228", "{\"key\":\"27\"}");
        kafkaTemplate.send(record);
        log.info("------------send success!----------------");
    }

方法二:不需要注解@RunWith@SpringBootTest,但是初始化Properties,同样调用send()
@Test
    public void kafkaSend2() {
   
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.20.75:9907");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        ProducerRecord record = new ProducerRecord<String, String>("test20201228", "key", "{\"key\":\"20\"}");
        producer.send(record);
        log.info("------------send success!----------------");
        producer.close();
    }
AI 代码解读

消费者

==说明:
① Topic主题用来区分不同类型的消息
② GroupId用来解决同一个Topic主题下重复消费问题,比如一条消费需要多个消费者接收到,就可以通过设置不同的GroupId实现,实际消息是存一份的,只是通过逻辑上设置标识来区分,系统会记录Topic主题下--》GroupId分组下--》partition分区下的offsert,来标识是否消费过。==

@KafkaListener(topics = "big_data_task_state", groupId = "bigDataTaskState")
    public void taskStateConsumer(String msg) {
   
        log.info("----receive:{}----", msg);
    }
AI 代码解读

image.png

重要信息

image.png
image.png
image.png

目录
打赏
0
9
10
0
230
分享
相关文章
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
110 61
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
143 5
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
103 1
基于SpringBoot+Vue实现的留守儿童爱心网站设计与实现(计算机毕设项目实战+源码+文档)
博主是一位全网粉丝超过100万的CSDN特邀作者、博客专家,专注于Java、Python、PHP等技术领域。提供SpringBoot、Vue、HTML、Uniapp、PHP、Python、NodeJS、爬虫、数据可视化等技术服务,涵盖免费选题、功能设计、开题报告、论文辅导、答辩PPT等。系统采用SpringBoot后端框架和Vue前端框架,确保高效开发与良好用户体验。所有代码由博主亲自开发,并提供全程录音录屏讲解服务,保障学习效果。欢迎点赞、收藏、关注、评论,获取更多精品案例源码。
基于SpringBoot+Vue实现的家政服务管理平台设计与实现(计算机毕设项目实战+源码+文档)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
基于SpringBoot+Vue实现的家乡特色推荐系统设计与实现(源码+文档+部署)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
基于SpringBoot+Vue实现的大学生就业服务平台设计与实现(系统源码+文档+数据库+部署等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!

物联网

+关注

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等