SSM集成kafka——注解,xml配置两种方式实现

简介: SSM集成kafka——注解,xml配置两种方式实现

引言


最近在和甲方 对接数据的时候,甲方要求通过kafka将处理完成数据回传,所以我们需要在项目中集成kafka,由于之前项目采用的是SSM框架,并且么有集成过kafka,所以在这里分享一下。


一、XML配置文件方式实现


1、引入jar 这两有两个地方需要注意


1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)


2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持


<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
    <exclusions>
         <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
          </exclusion>
    </exclusions>
</dependency

2、kafka.properties文件内容

# brokers集群
kafka.producer.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
kafka.producer.acks = all
#发送失败重试次数
kafka.producer.retries = 3
kafka.producer.linger.ms =  10
# 33554432 即32MB的批处理缓冲区
kafka.producer.buffer.memory = 40960
#批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能
kafka.producer.batch.size = 4096
kafka.producer.defaultTopic = nwbs-eval-task
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
################# kafka consumer ################## ,
kafka.consumer.bootstrap.servers = ip1:9092,ip2,ip3:9092
# 如果为true,消费者的偏移量将在后台定期提交
kafka.consumer.enable.auto.commit = true
#如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
kafka.consumer.auto.commit.interval.ms=1000
#order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
kafka.consumer.group.id = sccl-nwbs
#在使用Kafka的组管理时,用于检测消费者故障的超时
kafka.consumer.session.timeout.ms = 30000
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

3.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
        <!-- 1.定义consumer的参数 -->
        <!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
                    <entry key="group.id" value="${kafka.consumer.group.id}" />
                    <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
                    <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
                    <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
                    <entry key="retry.backoff.ms" value="100" />
                    <entry key="key.deserializer"
                           value="${kafka.consumer.key.deserializer}" />
                    <entry key="value.deserializer"
                           value="${kafka.consumer.value.deserializer}" />
                </map>
            </constructor-arg>
        </bean>
        <!-- 2.创建consumerFactory bean -->
        <bean id="consumerFactory"
              class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
        <!--<!– 3.定义消费实现类 –>-->
        <bean id="kafkaConsumerService" class="cn.**.kafka.KafkaConsumerSerivceImpl" />
        <!-- 4.消费者容器配置信息 -->
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <!-- topic -->
            <constructor-arg name="topics">
                <list>
                    <value>${kafka.task.eval.topic}</value>
                    <value>${kafka.task.optimizeNetwork.topic}</value>
                    <value>${kafka.task.business.topic}</value>
                </list>
            </constructor-arg>
            <property name="messageListener" ref="kafkaConsumerService" />
        </bean>
        <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
            <property name="concurrency" value="${kafka.consumer.concurrency}" />
        </bean>
</beans>

4.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
                <!--<entry key="group.id" value="${group.id}" />-->
                <entry key="retries" value="${kafka.producer.retries}" />
                <entry key="batch.size" value="${kafka.producer.batch.size}" />
                <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
                <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
                <entry key="acks" value="${kafka.producer.acks}" />
                <entry key="key.serializer"
                       value="${kafka.producer.key.serializer}" />
                <entry key="value.serializer"
                       value="${kafka.producer.value.serializer}"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
    </bean>
</beans>

5. 调用Controller -这里 向kafka 中的 3个topic 发送了消息

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Value("nwbs-optimizeNetwork-task")
    private  String optimizeTopic ;
    @Value("nwbs-business-task")
    private String businessTopic;
    @RequestMapping(value = "/producer" , method = RequestMethod.POST)
    public void producer(@RequestBody JSONObject params){
        kafkaTemplate.send(optimizeTopic,params.toJSONString()+"optimizeTopic");
        kafkaTemplate.send(businessTopic,params.toJSONString()+"businessTopic");
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.sendDefault(params.toJSONString());;
        //发送成功回调
        SuccessCallback<SendResult<String, String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //成功业务逻辑
                System.out.println("onSuccess");
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                //失败业务逻辑
                System.out.println("onFailure");
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}


二 注解方式实现


参考spring-kafka官方文档 https://docs.spring.io/spring-kafka/reference/htmlsingle/


1. 文件整体结构如图


b09e374551ac30956cf794276b0292c3.png


2. KafKaConsumerConfig.java代码


/**
 * @author: hsc
 * @date: 2018/6/21 15:58
 * @description kafka 消费者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    public KafkaConsumerConfig(){
        System.out.println("kafka消费者配置加载...");
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props= new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.bootstrap.servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.group.id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.enable.auto.commit"));
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.auto.commit.interval.ms"));
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.session.timeout.ms"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.key.deserializer"));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.value.deserializer"));
        return props;
    }
    @Bean
    public KafkaConsumerListener kafkaConsumerListener(){
        return new KafkaConsumerListener();
    }
}


3.KafKaProducerConfig.java

/**
 * @author: hsc
 * @date: 2018/6/21 21:30
 * @description kafka 生产者配置
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    public KafkaProducerConfig(){
        System.out.println("kafka生产者配置");
    }
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    @Bean
    public Map<String, Object> producerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.bootstrap.servers"));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.key.serializer"));
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.value.serializer"));
        props.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.retries"));
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.batch.size",1048576));
        props.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.linger.ms"));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,PropertiesUtil.getInstance().getLong("kafka.producer.buffer.memory",33554432L));
        props.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.acks","all"));
        return props;
    }
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
        kafkaTemplate.setDefaultTopic(PropertiesUtil.getInstance().getString("kafka.producer.defaultTopic","default"));
        return kafkaTemplate;
    }
}

4.KafkaConsumerListenser

/**
 * @author: hsc
 * @date: 2018/6/21 16:33
 * @description 消费者listener
 */
public class KafkaConsumerListener {
    /**
     * @param data
     */
    @KafkaListener(groupId="xxx" ,topics = "xxx")
    void listener(ConsumerRecord<String, String> data){
        System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+data.topic()+",分区:"+data.partition()
                +" ,委托时间:"+data.timestamp()+"]消息内容如下:");
        System.out.println(data.value());
    }
}


参考文章(结合官方文档一起看)

http://www.cnblogs.com/dennyzhangdd/p/7759875.html

目录
相关文章
|
2月前
|
XML 前端开发 Java
讲解SSM的xml文件
本文详细介绍了SSM框架中的xml配置文件,包括springMVC.xml和applicationContext.xml,涉及组件扫描、数据源配置、事务管理、MyBatis集成以及Spring MVC的视图解析器配置。
79 1
|
6天前
|
XML Java 数据格式
Spring容器Bean之XML配置方式
通过对以上内容的掌握,开发人员可以灵活地使用Spring的XML配置方式来管理应用程序的Bean,提高代码的模块化和可维护性。
33 6
|
3月前
|
XML Java 数据格式
Spring IOC—基于XML配置Bean的更多内容和细节(通俗易懂)
Spring 第二节内容补充 关于Bean配置的更多内容和细节 万字详解!
264 18
|
3月前
|
XML Java 应用服务中间件
springMVC01,springMVC的执行流程【第一个springMVC例子(XML配置版本):HelloWorld】
通过一个HelloWorld实例,介绍了SpringMVC的基本概念、执行流程,并详细讲解了如何创建和配置第一个SpringMVC项目(基于XML)。
springMVC01,springMVC的执行流程【第一个springMVC例子(XML配置版本):HelloWorld】
|
2月前
|
XML 分布式计算 资源调度
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
183 5
|
2月前
|
XML 资源调度 网络协议
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(二)
157 4
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
大数据-01-基础环境搭建 超详细 Hadoop Java 环境变量 3节点云服务器 2C4G XML 集群配置 HDFS Yarn MapRedece
93 4
|
2月前
|
XML Java 数据格式
手动开发-简单的Spring基于XML配置的程序--源码解析
手动开发-简单的Spring基于XML配置的程序--源码解析
88 0
|
3月前
|
SQL XML Java
mybatis :sqlmapconfig.xml配置 ++++Mapper XML 文件(sql/insert/delete/update/select)(增删改查)用法
当然,这些仅是MyBatis功能的初步介绍。MyBatis还提供了高级特性,如动态SQL、类型处理器、插件等,可以进一步提供对数据库交互的强大支持和灵活性。希望上述内容对您理解MyBatis的基本操作有所帮助。在实际使用中,您可能还需要根据具体的业务要求调整和优化SQL语句和配置。
70 1
|
4月前
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
101 1