Spring Boot 整合 Kafka

简介: Spring Boot 整合 Kafka

Spring Boot 整合 Kafka


添加依赖


cmplie "org.springframework.kafka:spring-kafka"
complie "com.google.code.gson:gson"


添加YML配置


spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      missing-topics-fatal: false


业务代码


  • 创建domain


import java.io.Serializable;
import java.util.Date;
public class KafkaMessage implements Serializable {
    private Long id;
    private String username;
    private String password;
    private Date date;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public Date getDate() {
        return date;
    }
    public void setDate(Date date) {
        this.date = date;
    }
}


  • 创建生产者


import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendKafkaMessage(KafkaMessage message) {
        this.kafkaTemplate.send("myTopic", new Gson().toJson(message));
    }
}


  • 创建消费者


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void obtainMessage(ConsumerRecord<String, String> consumerRecord) {
        logger.info("obtainMessage invoked!");
        String topic = consumerRecord.topic();
        String key = consumerRecord.key();
        String value = consumerRecord.value();
        int partition = consumerRecord.partition();
        long timestamp = consumerRecord.timestamp();
        logger.info("topic: {}", topic);
        logger.info("key: {}", key);
        logger.info("value: {}", value);
        logger.info("partition: {}", partition);
        logger.info("timestamp: {}", timestamp);
        logger.info("==========================");
    }
}


  • 创建控制器


import cn.edu.cqvie.kafka.KafkaMessage;
import cn.edu.cqvie.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping(value = "kafka")
public class KafkaController {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private KafkaProducer kafkaProducer;
    @RequestMapping(value = "message", method = RequestMethod.GET)
    public KafkaMessage sendMessage(
            @RequestParam(name = "id") Long id,
            @RequestParam(name = "username") String username,
            @RequestParam(name = "password") String password) {
        logger.info("sendMessage invoked!");
        KafkaMessage message = new KafkaMessage();
        message.setId(id);
        message.setUsername(username);
        message.setPassword(password);
        message.setDate(new Date());
        kafkaProducer.sendKafkaMessage(message);
        return message;
    }
    @RequestMapping(value = "message", method = RequestMethod.POST)
    public KafkaMessage sendMessage2(@RequestBody KafkaMessage message) {
        logger.info("sendMessage2 invoked!");
        kafkaProducer.sendKafkaMessage(message);
        return message;
    }
}


  • 测试


curl http://localhost:9090/kafka/message\?id\=1\&username\=zhangsan\&password\=abc
curl -X POST -H "Content-type:application/json" -d '{"id":5, "username":"zhangsan", "password":"789"}' http://localhost:9090/kafka/message


参考资料




相关文章
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
78 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
62 1
|
2月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
55 2
|
3月前
|
SQL JSON Java
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和PageHelper进行分页操作,并且集成Swagger2来生成API文档,同时定义了统一的数据返回格式和请求模块。
103 1
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
82 2
|
3月前
|
缓存 NoSQL Java
Springboot自定义注解+aop实现redis自动清除缓存功能
通过上述步骤,我们不仅实现了一个高度灵活的缓存管理机制,还保证了代码的整洁与可维护性。自定义注解与AOP的结合,让缓存清除逻辑与业务逻辑分离,便于未来的扩展和修改。这种设计模式非常适合需要频繁更新缓存的应用场景,大大提高了开发效率和系统的响应速度。
91 2
|
5月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
216 3
|
5月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
125 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
127 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
66 1