Spring Boot整合Kafka

简介: Spring Boot整合Kafka

Spring Boot整合Kafka

pom.xml

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

默认使用

配置

prop新增kafka配置 详细配置可参考org.springframework.boot.autoconfigure.kafka.KafkaProperties中属性 生产者,消费者默认序列化都是String格式message

#kafka默认消费者配置
spring.kafka.consumer.bootstrap-servers=10.111.7.124:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
#kafka默认生产者配置
spring.kafka.producer.bootstrap-servers=10.111.7.124:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5

使用

//生产者
@Resource
private KafkaTemplate kafkaTemplate;
public void send() {
    HashMap<String, String> map = new HashMap<>();
    map.put("sendType","send");
    kafkaTemplate.send("test01", JSONUtil.toJsonStr(map));
}
//消费者
@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "test01",groupId = "group01")
    public void listen(String message) {
        log.info("message:{}",message);
    }
}

使用Json格式化

配置

自定义消费者工厂: 配置全部消费者参数,包含转换器,使用此项可删除转换器配置2

**转换器配置2:**仅配置json转换器,不需要配置全部消费者参数时可删除自定义消费者配置

addTrustedPackages("*"):默认Json转换仅信任java.util,java.lang包下类,增加后不进行类型检查

@Configuration
public class KafkaCustomConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    //自定义消费者工厂
    @Bean("customContainerFactory")
    public ConcurrentKafkaListenerContainerFactory customContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "customGroup01");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //转换器配置1
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        jsonDeserializer.getTypeMapper().addTrustedPackages("*");
        consumerFactory.setValueDeserializer(jsonDeserializer);
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //设置可批量拉取消息消费,拉取数量一次3,看需求设置
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
    //转换器配置2
    @Bean
    public RecordMessageConverter converter() {
        ByteArrayJsonMessageConverter converter = new ByteArrayJsonMessageConverter();
        DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
        typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
        typeMapper.addTrustedPackages("*");
        converter.setTypeMapper(typeMapper);
        return converter;
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
     * 与默认配置只能存在一个
     * @return
     */
    @Bean("custiomKafkaTemplate")
    public KafkaTemplate custiomKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        //0 producer不等待broker同步完成的确认,继续发送下一条(批)信息
        //1 producer要等待leader成功收到数据并得到确认,才发送下一条message。
        //-1 producer得到follwer确认,才发送下一条数据
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        DefaultKafkaProducerFactory produceFactory = new DefaultKafkaProducerFactory(props);
        return new KafkaTemplate(produceFactory);
    }
}

使用

vo

@Data
public class TestVo {
    private String key;
    private String value;
}

生产者

//注入自定义KafkaTemplate
@Resource(name = "custiomKafkaTemplate")
private KafkaTemplate custiomKafkaTemplate;
public void customSend() {
    TestVo vo = new TestVo();
    vo.setKey("sendType");
    vo.setValue("send");
    custiomKafkaTemplate.send("custom04", vo);
}

消费者

containerFactory:指定自定义消费者工厂beanName

@Slf4j
@Component
public class KafkaConsumer {
    //自定义消费者
    @KafkaListener(topics = "custom04",containerFactory = "customContainerFactory")
    public void customListen(TestVo message) {
        log.info("message:{}",message.toString());
    }
    //仅自定义转换器
    @KafkaListener(topics = "custom04",groupId = "custom04Group")
    public void customListen(TestVo message) {
        log.info("message:{}",message.toString());
    }
}

参考资料

https://github.com/spring-projects/spring-kafka/tree/master/samples

https://blog.csdn.net/u012045045/article/details/111034500

https://www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm

目录
相关文章
|
3月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
182 7
|
4月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
131 10
|
6月前
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
1729 17
Spring Boot 两种部署到服务器的方式
|
5月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
|
4月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
112 0
|
8月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
408 5
|
8月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
178 1
深入实践springboot实战 蓄势待发 我不是雷锋 我是知识搬运工
springboot,说白了就是一个集合了功能的大类库,包括springMVC,spring,spring data,spring security等等,并且提供了很多和可以和其他常用框架,插件完美整合的接口(只能说是一些常用框架,基本在github上能排上名次的都有完美整合,但如果是自己写的一个框架就无法实现快速整合)。
|
11月前
|
缓存 Java Maven
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
Java本地高性能缓存实践问题之SpringBoot中引入Caffeine作为缓存库的问题如何解决
305 1
|
Java 数据安全/隐私保护
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
Neo4j【付诸实践 01】SpringBoot集成报错org.neo4j.driver.exceptions.ClientException:服务器不支持此驱动程序支持的任何协议版本(解决+源代码)
615 1