Spring boot 自定义kafkaTemplate的bean实例进行生产消息和发送消息

简介: Spring boot 自定义kafkaTemplate的bean实例进行生产消息和发送消息

本文为博主原创,未经允许不得转载:

目录:

  1.  自定义生产消息 kafkaTemplate 实例

  2.  封装 kafka 发送消息的service 方法

  3.  测试 kafka 发送消息service 的方法

    4.  自定义 kafka 消费消息的工厂 bean

    5.  kafka 监听消费消息

      

  1.  自定义 kafkaTemplate 实例

    a : 使用 @ConditionalOnProperty 注解属性控制是否加载 kafka 相关初始化配置,因为在项目开发过程中,如kafka 或redis 等工具容易封装为

    工具类,被各微服务引用并进行加载。使用 @ConditionalOnProperty 注解的 havingValue 属性可以控制服务中是否进行加载对应的配置。

    该属性的值,可在 yaml 配置文件中指定: kafka.used = true 。如果为true 则加载,false则不加载

    b.  使用工厂实例生成指定的 kafkaTemplate 实例


package com.example.demo.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnProperty(prefix="kafka",name = "used",havingValue = "true")
public class KafkaTemplateConfig {
    /**
     * Producer Template 配置
     */
    @Bean(name="kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    /**
     * Producer 工厂配置
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    /**
     * Producer 参数配置
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 指定多个kafka集群多个地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");
        // 重试次数,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默认为1
        // acks=0 把消息发送到kafka就认为发送成功
        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        return props;
    }
}

 

  2.  封装 kafka 发送消息的service 方法:


package com.example.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class KafkaProduceService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    /**
     * producer 同步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
    }
    /**
     * producer 异步方式发送数据
     *
     * @param topic   topic名称
     * @param message producer发送的数据
     */
    public void sendMessageAsync(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("success");
            }
            @Override
            public void onSuccess(Object o) {
                System.out.println("failure");
            }
        });
    }
}


  3. 测试 kafka 发送消息service 的方法:


package com.example.demo;
import com.example.demo.service.KafkaProduceService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProduceServiceTest {
    @Autowired
    private KafkaProduceService producerService;
    @Test
    public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException {
        producerService.sendMessageSync("test","同步发送消息测试");
    }
    @Test
    public void sendMessageAsync() {
        producerService.sendMessageAsync("test","异步发送消息测试");
    }
}

  

  4. 自定义 kafka 消费消息的工厂 bean :


package com.example.demo.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);
        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);
        return factory;
    }
//    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
//    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092");
        //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }
}


  5. kafka 监听消费消息:

package com.example.demo.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerListener {
    @KafkaListener(topics = {"test"},groupId = "group1",
            containerFactory="kafkaListenerContainerFactory")
    public void kafkaListener(String message){
        System.out.println(message);
    }
}

 

标签: kafka

目录
打赏
0
5
5
1
77
分享
相关文章
|
11天前
|
编写SpringBoot的自定义starter包
通过本文的介绍,我们详细讲解了如何创建一个Spring Boot自定义Starter包,包括自动配置类、配置属性类、`spring.factories`文件的创建和配置。通过自定义Starter,可以有效地复用公共配置和组件,提高开发效率。希望本文能帮助您更好地理解和应用Spring Boot自定义Starter,在实际项目中灵活使用这一强大的功能。
39 17
使用idea中的Live Templates自定义自动生成Spring所需的XML配置文件格式
本文介绍了在使用Spring框架时,如何通过创建`applicationContext.xml`配置文件来管理对象。首先,在resources目录下新建XML配置文件,并通过IDEA自动生成部分配置。为完善配置,特别是添加AOP支持,可以通过IDEA的Live Templates功能自定义XML模板。具体步骤包括:连续按两次Shift搜索Live Templates,配置模板内容,输入特定前缀(如spring)并按Tab键即可快速生成完整的Spring配置文件。这样可以大大提高开发效率,减少重复工作。
使用idea中的Live Templates自定义自动生成Spring所需的XML配置文件格式
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
163 14
|
3月前
|
SpringBoot构建Bean(RedisConfig + RestTemplateConfig)
SpringBoot构建Bean(RedisConfig + RestTemplateConfig)
61 2
SpringBoot中定义Bean的几种方式
本文介绍了Spring Boot中定义Bean的多种方式,包括使用@Component、@Bean、@Configuration、@Import等注解及Java配置类。每种方式适用于不同的场景,帮助开发者高效管理和组织应用组件。
如何将Spring Boot应用程序运行到自定义端口
如何将Spring Boot应用程序运行到自定义端口
119 0
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
在40岁老架构师尼恩的读者交流群中,近期多位读者成功获得了知名互联网企业的面试机会,如得物、阿里、滴滴等。然而,面对“Spring Boot自动装配机制”等核心面试题,部分读者因准备不足而未能顺利通过。为此,尼恩团队将系统化梳理和总结这一主题,帮助大家全面提升技术水平,让面试官“爱到不能自已”。
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
SpringBoot项目打包成war包
通过上述步骤,我们成功地将一个Spring Boot应用打包成WAR文件,并部署到外部的Tomcat服务器中。这种方式适用于需要与传统Servlet容器集成的场景。
25 8
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
267 17
Spring Boot 两种部署到服务器的方式
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等