Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

简介: Springboot最简单的实战介绍 整合kafka-生产者与消费者(消息推送与订阅获取)

我们开始整合:


首先,先往pom.xml文件添加Kafka的依赖,


<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>


然后,接下来是配置文件(以下提供properties格式,yml格式 ,供大家随便自取),

当然注释也是要好好看看的,毕竟都一字字敲的。


application.properties


#============== kafka ===================
# 指定kafka 代理地址,可以多个
#spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
spring.kafka.bootstrap-servers=192.168.x.xxx:9092
#=============== producer生产者  =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
# 缓存容量
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer消费者  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-app
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.bootstrap-servers=192.168.8.111:9092
#spring.kafka.consumer.zookeeper.connect=192.168.8.103:2181
#指定tomcat端口
server.port=8063 


application.yml:


spring:
  # KAFKA
  kafka:
    # ָkafka服务器地址,可以指定多个
    bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
    #=============== producer生产者配置 =======================
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      # 缓存容量
      buffer-memory: 33554432
      # ָ指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer消费者配置  =======================
    consumer:
      #指定默认消费者的group id
      group-id: test-app
      #earliest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      #latest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      #none
      #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      enable-auto-commit: true
      auto-commit-interval: 100ms
      #指定消费key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


好了,配置工作准备完毕。


我们先来搞Kafka的生产者,也就是负责推送消息的模块:

创建一个类, 叫KafkaSender(注解不能少,留意代码),


package com.kafkademo.producer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
 * Hello!
 * Created By  JCccc on 2018/11/24
 * 11:25
 */
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    public void send(String topic, String taskid, String jsonStr) {
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            //推送成功
            public void onSuccess(SendResult<String, Object> result) {
                logger.info(topic + " 生产者 发送消息成功:" + result.toString());
            }
            @Override
            //推送失败
            public void onFailure(Throwable ex) {
                logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());
            }
        });
    }
}


以上就是kafka生产者了,到此刻,你已经可以开始往kafka服务器推送消息了

事不宜迟,我们立马试试:


创建个controller,搞个接口试试推送下消息,


@GetMapping("/sendMessageToKafka")
public  String sendMessageToKafka() {
    Map<String,String> messageMap=new HashMap();
    messageMap.put("message","我是一条消息");
    String taskid="123456";
    String jsonStr=JSONObject.toJSONString(messageMap);
//kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
    kafkaSender.send("testTopic",taskid,jsonStr);
    return "hi guy!";
}


用postman测一下(对了,这些推送的前提是你的kafka服务器是没问题的,能正常连接)


image.png看看控制台反应:


image.png


可以看到,我们的kafka生产者再推送消息成功后,成功进入了我们的回调函数onSuccess,也打印了日志。


没错,你已经掌握kafak生产者了,你已经掌握推送消息了。 那么接下来,我们继续搞下kafka的消费者。


我们创一个类,叫KafkaConsumer (同样,注意看代码,注解不能少) :


package com.kafkademo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * Hello!
 * Created By  JCccc on 2018/11/24
 * 13:13
 */
@Component
public class KafkaConsumer  {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
//下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
    @KafkaListener(topics = {"testTopic"})
    public void receive(ConsumerRecord<?, ?> record){
        logger.info("消费得到的消息---key: " + record.key());
        logger.info("消费得到的消息---value: " + record.value().toString());
    }
}


好,到此,kafka的消费者就这么简单完成了。

 

那么,我们接下来验证下,生产者推送消息到主题“testTopic”,消费者订阅主题“testTopic”,把消息消费下来:


一样,用postman来模拟下第三方调用接口,


image.png


我们看看控制台,


没错,kafka生产者跟刚刚一样,成功把消息推送到了主题testTopic上去了,回调函数OnSuccess打印了相关日志;


而,我们的kafka消费者,也是很有效率,再检测到自己订阅的主题testTopic有消息,立马消费了下来。


image.png


好了,springboot整合kafka,生产者、消费者就是这么轻松简单结束了。


当然了,该篇案例的生产者和消费者都放在了一个demo去介绍了,实际上大家使用的是按照业务场景,数据量去选择是否需要分开生产者项目&消费者项目,哪些是同时有生产者和消费者的身份的,哪些是只有生产者身份的,哪些是只有消费者身份的。


相关文章
|
22天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
52 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
132 58
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
34 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
105 3
|
3月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
91 3
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 Java Kafka