Spring Boot 整合 Kafka

简介: Spring Boot 整合 Kafka

Spring Boot 整合 Kafka


添加依赖


cmplie "org.springframework.kafka:spring-kafka"
complie "com.google.code.gson:gson"


添加YML配置


spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      missing-topics-fatal: false


业务代码


  • 创建domain


import java.io.Serializable;
import java.util.Date;
public class KafkaMessage implements Serializable {
    private Long id;
    private String username;
    private String password;
    private Date date;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public Date getDate() {
        return date;
    }
    public void setDate(Date date) {
        this.date = date;
    }
}


  • 创建生产者


import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendKafkaMessage(KafkaMessage message) {
        this.kafkaTemplate.send("myTopic", new Gson().toJson(message));
    }
}


  • 创建消费者


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void obtainMessage(ConsumerRecord<String, String> consumerRecord) {
        logger.info("obtainMessage invoked!");
        String topic = consumerRecord.topic();
        String key = consumerRecord.key();
        String value = consumerRecord.value();
        int partition = consumerRecord.partition();
        long timestamp = consumerRecord.timestamp();
        logger.info("topic: {}", topic);
        logger.info("key: {}", key);
        logger.info("value: {}", value);
        logger.info("partition: {}", partition);
        logger.info("timestamp: {}", timestamp);
        logger.info("==========================");
    }
}


  • 创建控制器


import cn.edu.cqvie.kafka.KafkaMessage;
import cn.edu.cqvie.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping(value = "kafka")
public class KafkaController {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private KafkaProducer kafkaProducer;
    @RequestMapping(value = "message", method = RequestMethod.GET)
    public KafkaMessage sendMessage(
            @RequestParam(name = "id") Long id,
            @RequestParam(name = "username") String username,
            @RequestParam(name = "password") String password) {
        logger.info("sendMessage invoked!");
        KafkaMessage message = new KafkaMessage();
        message.setId(id);
        message.setUsername(username);
        message.setPassword(password);
        message.setDate(new Date());
        kafkaProducer.sendKafkaMessage(message);
        return message;
    }
    @RequestMapping(value = "message", method = RequestMethod.POST)
    public KafkaMessage sendMessage2(@RequestBody KafkaMessage message) {
        logger.info("sendMessage2 invoked!");
        kafkaProducer.sendKafkaMessage(message);
        return message;
    }
}


  • 测试


curl http://localhost:9090/kafka/message\?id\=1\&username\=zhangsan\&password\=abc
curl -X POST -H "Content-type:application/json" -d '{"id":5, "username":"zhangsan", "password":"789"}' http://localhost:9090/kafka/message


参考资料




相关文章
|
5月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
866 7
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
173 10
|
8月前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
2080 17
Spring Boot 两种部署到服务器的方式
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
115 5
|
6月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
192 0
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
502 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
253 1
深入实践springboot实战 蓄势待发 我不是雷锋 我是知识搬运工
springboot,说白了就是一个集合了功能的大类库,包括springMVC,spring,spring data,spring security等等,并且提供了很多和可以和其他常用框架,插件完美整合的接口(只能说是一些常用框架,基本在github上能排上名次的都有完美整合,但如果是自己写的一个框架就无法实现快速整合)。
|
Java 数据安全/隐私保护
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
693 1
|
缓存 Java Maven
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
468 1