Spring Boot整合Kafka

简介: Spring Boot整合Kafka

Spring Boot整合Kafka

pom.xml

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

默认使用

配置

prop新增kafka配置 详细配置可参考org.springframework.boot.autoconfigure.kafka.KafkaProperties中属性 生产者,消费者默认序列化都是String格式message

#kafka默认消费者配置
spring.kafka.consumer.bootstrap-servers=10.111.7.124:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
#kafka默认生产者配置
spring.kafka.producer.bootstrap-servers=10.111.7.124:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5

使用

//生产者
@Resource
private KafkaTemplate kafkaTemplate;
public void send() {
    HashMap<String, String> map = new HashMap<>();
    map.put("sendType","send");
    kafkaTemplate.send("test01", JSONUtil.toJsonStr(map));
}
//消费者
@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "test01",groupId = "group01")
    public void listen(String message) {
        log.info("message:{}",message);
    }
}

使用Json格式化

配置

自定义消费者工厂: 配置全部消费者参数,包含转换器,使用此项可删除转换器配置2

**转换器配置2:**仅配置json转换器,不需要配置全部消费者参数时可删除自定义消费者配置

addTrustedPackages("*"):默认Json转换仅信任java.util,java.lang包下类,增加后不进行类型检查

@Configuration
public class KafkaCustomConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    //自定义消费者工厂
    @Bean("customContainerFactory")
    public ConcurrentKafkaListenerContainerFactory customContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "customGroup01");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //转换器配置1
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        jsonDeserializer.getTypeMapper().addTrustedPackages("*");
        consumerFactory.setValueDeserializer(jsonDeserializer);
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可批量拉取消息消费,拉取数量一次3,看需求设置
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
    //转换器配置2
    @Bean
    public RecordMessageConverter converter() {
        ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter();
        DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
        typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
        typeMapper.addTrustedPackages("*");
        converter.setTypeMapper(typeMapper);
        return converter;
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
     * 与默认配置只能存在一个
     * @return
     */
    @Bean("custiomKafkaTemplate")
    public KafkaTemplate custiomKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //0 producer不等待broker同步完成的确认,继续发送下一条(批)信息
        //1 producer要等待leader成功收到数据并得到确认,才发送下一条message。
        //-1 producer得到follwer确认,才发送下一条数据
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        DefaultKafkaProducerFactory produceFactory = new DefaultKafkaProducerFactory(props);
        return new KafkaTemplate(produceFactory);
    }
}

使用

vo

@Data
public class TestVo {
    private String key;
    private String value;
}

生产者

//注入自定义KafkaTemplate
@Resource(name = "custiomKafkaTemplate")
private KafkaTemplate custiomKafkaTemplate;
public void customSend() {
    TestVo vo = new TestVo();
    vo.setKey("sendType");
    vo.setValue("send");
    custiomKafkaTemplate.send("custom04", vo);
}

消费者

containerFactory:指定自定义消费者工厂beanName

@Slf4j
@Component
public class KafkaConsumer {
    //自定义消费者
    @KafkaListener(topics = "custom04",containerFactory = "customContainerFactory")
    public void customListen(TestVo message) {
        log.info("message:{}",message.toString());
    }
    //仅自定义转换器
    @KafkaListener(topics = "custom04",groupId = "custom04Group")
    public void customListen(TestVo message) {
        log.info("message:{}",message.toString());
    }
}

参考资料

https://github.com/spring-projects/spring-kafka/tree/master/samples

https://blog.csdn.net/u012045045/article/details/111034500

https://www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm

目录
相关文章
|
12天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
43 5
|
14天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
22 1
|
25天前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
39 2
|
2月前
|
SQL JSON Java
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和PageHelper进行分页操作,并且集成Swagger2来生成API文档,同时定义了统一的数据返回格式和请求模块。
65 1
mybatis使用三:springboot整合mybatis,使用PageHelper 进行分页操作,并整合swagger2。使用正规的开发模式:定义统一的数据返回格式和请求模块
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
73 2
|
2月前
|
缓存 NoSQL Java
Springboot自定义注解+aop实现redis自动清除缓存功能
通过上述步骤,我们不仅实现了一个高度灵活的缓存管理机制,还保证了代码的整洁与可维护性。自定义注解与AOP的结合,让缓存清除逻辑与业务逻辑分离,便于未来的扩展和修改。这种设计模式非常适合需要频繁更新缓存的应用场景,大大提高了开发效率和系统的响应速度。
72 2
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
145 3
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
114 0
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
52 1