SSM集成kafka——注解,xml配置两种方式实现

简介: SSM集成kafka——注解,xml配置两种方式实现

引言


最近在和甲方 对接数据的时候,甲方要求通过kafka将处理完成数据回传,所以我们需要在项目中集成kafka,由于之前项目采用的是SSM框架,并且么有集成过kafka,所以在这里分享一下。


一、XML配置文件方式实现


1、引入jar 这两有两个地方需要注意


1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)


2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持


<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
    <exclusions>
         <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
          </exclusion>
    </exclusions>
</dependency

2、kafka.properties文件内容

# brokers集群
kafka.producer.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
kafka.producer.acks = all
#发送失败重试次数
kafka.producer.retries = 3
kafka.producer.linger.ms =  10
# 33554432 即32MB的批处理缓冲区
kafka.producer.buffer.memory = 40960
#批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能
kafka.producer.batch.size = 4096
kafka.producer.defaultTopic = nwbs-eval-task
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
################# kafka consumer ################## ,
kafka.consumer.bootstrap.servers = ip1:9092,ip2,ip3:9092
# 如果为true,消费者的偏移量将在后台定期提交
kafka.consumer.enable.auto.commit = true
#如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
kafka.consumer.auto.commit.interval.ms=1000
#order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
kafka.consumer.group.id = sccl-nwbs
#在使用Kafka的组管理时,用于检测消费者故障的超时
kafka.consumer.session.timeout.ms = 30000
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

3.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
        <!-- 1.定义consumer的参数 -->
        <!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
                    <entry key="group.id" value="${kafka.consumer.group.id}" />
                    <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
                    <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
                    <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
                    <entry key="retry.backoff.ms" value="100" />
                    <entry key="key.deserializer"
                           value="${kafka.consumer.key.deserializer}" />
                    <entry key="value.deserializer"
                           value="${kafka.consumer.value.deserializer}" />
                </map>
            </constructor-arg>
        </bean>
        <!-- 2.创建consumerFactory bean -->
        <bean id="consumerFactory"
              class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
        <!--<!– 3.定义消费实现类 –>-->
        <bean id="kafkaConsumerService" class="cn.**.kafka.KafkaConsumerSerivceImpl" />
        <!-- 4.消费者容器配置信息 -->
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <!-- topic -->
            <constructor-arg name="topics">
                <list>
                    <value>${kafka.task.eval.topic}</value>
                    <value>${kafka.task.optimizeNetwork.topic}</value>
                    <value>${kafka.task.business.topic}</value>
                </list>
            </constructor-arg>
            <property name="messageListener" ref="kafkaConsumerService" />
        </bean>
        <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
            <property name="concurrency" value="${kafka.consumer.concurrency}" />
        </bean>
</beans>

4.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
                <!--<entry key="group.id" value="${group.id}" />-->
                <entry key="retries" value="${kafka.producer.retries}" />
                <entry key="batch.size" value="${kafka.producer.batch.size}" />
                <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
                <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
                <entry key="acks" value="${kafka.producer.acks}" />
                <entry key="key.serializer"
                       value="${kafka.producer.key.serializer}" />
                <entry key="value.serializer"
                       value="${kafka.producer.value.serializer}"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
    </bean>
</beans>

5. 调用Controller -这里 向kafka 中的 3个topic 发送了消息

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Value("nwbs-optimizeNetwork-task")
    private  String optimizeTopic ;
    @Value("nwbs-business-task")
    private String businessTopic;
    @RequestMapping(value = "/producer" , method = RequestMethod.POST)
    public void producer(@RequestBody JSONObject params){
        kafkaTemplate.send(optimizeTopic,params.toJSONString()+"optimizeTopic");
        kafkaTemplate.send(businessTopic,params.toJSONString()+"businessTopic");
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.sendDefault(params.toJSONString());;
        //发送成功回调
        SuccessCallback<SendResult<String, String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //成功业务逻辑
                System.out.println("onSuccess");
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                //失败业务逻辑
                System.out.println("onFailure");
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}


二 注解方式实现


参考spring-kafka官方文档 https://docs.spring.io/spring-kafka/reference/htmlsingle/


1. 文件整体结构如图


b09e374551ac30956cf794276b0292c3.png


2. KafKaConsumerConfig.java代码


/**
 * @author: hsc
 * @date: 2018/6/21 15:58
 * @description kafka 消费者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    public KafkaConsumerConfig(){
        System.out.println("kafka消费者配置加载...");
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props= new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.bootstrap.servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.group.id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.enable.auto.commit"));
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.auto.commit.interval.ms"));
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.session.timeout.ms"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.key.deserializer"));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.value.deserializer"));
        return props;
    }
    @Bean
    public KafkaConsumerListener kafkaConsumerListener(){
        return new KafkaConsumerListener();
    }
}


3.KafKaProducerConfig.java

/**
 * @author: hsc
 * @date: 2018/6/21 21:30
 * @description kafka 生产者配置
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    public KafkaProducerConfig(){
        System.out.println("kafka生产者配置");
    }
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    @Bean
    public Map<String, Object> producerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.bootstrap.servers"));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.key.serializer"));
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.value.serializer"));
        props.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.retries"));
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.batch.size",1048576));
        props.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.linger.ms"));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,PropertiesUtil.getInstance().getLong("kafka.producer.buffer.memory",33554432L));
        props.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.acks","all"));
        return props;
    }
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
        kafkaTemplate.setDefaultTopic(PropertiesUtil.getInstance().getString("kafka.producer.defaultTopic","default"));
        return kafkaTemplate;
    }
}

4.KafkaConsumerListenser

/**
 * @author: hsc
 * @date: 2018/6/21 16:33
 * @description 消费者listener
 */
public class KafkaConsumerListener {
    /**
     * @param data
     */
    @KafkaListener(groupId="xxx" ,topics = "xxx")
    void listener(ConsumerRecord<String, String> data){
        System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+data.topic()+",分区:"+data.partition()
                +" ,委托时间:"+data.timestamp()+"]消息内容如下:");
        System.out.println(data.value());
    }
}


参考文章(结合官方文档一起看)

http://www.cnblogs.com/dennyzhangdd/p/7759875.html

目录
相关文章
|
5月前
|
SQL Java 关系型数据库
Dataphin功能Tips系列(53)-离线集成任务如何合理配置JVM资源
本文探讨了将MySQL数据同步至Hive时出现OOM问题的解决方案。
122 5
|
7月前
|
缓存 Java API
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的配置
本文介绍了在Spring Boot中配置Swagger2的方法。通过创建一个配置类,添加`@Configuration`和`@EnableSwagger2`注解,使用Docket对象定义API文档的详细信息,包括标题、描述、版本和包路径等。配置完成后,访问`localhost:8080/swagger-ui.html`即可查看接口文档。文中还提示了可能因浏览器缓存导致的问题及解决方法。
746 0
微服务——SpringBoot使用归纳——Spring Boot集成 Swagger2 展现在线接口文档——Swagger2 的配置
|
7月前
|
XML Java 数据库连接
微服务——SpringBoot使用归纳——Spring Boot集成MyBatis——基于 xml 的整合
本教程介绍了基于XML的MyBatis整合方式。首先在`application.yml`中配置XML路径,如`classpath:mapper/*.xml`,然后创建`UserMapper.xml`文件定义SQL映射,包括`resultMap`和查询语句。通过设置`namespace`关联Mapper接口,实现如`getUserByName`的方法。Controller层调用Service完成测试,访问`/getUserByName/{name}`即可返回用户信息。为简化Mapper扫描,推荐在Spring Boot启动类用`@MapperScan`注解指定包路径避免逐个添加`@Mapper`
318 0
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
667 0
|
5月前
|
JSON JavaScript API
MCP 实战:用配置与真实代码玩转 GitHub 集成
MCP 实战:用配置与真实代码玩转 GitHub 集成
1222 4
|
6月前
|
缓存 前端开发 API
(网页系统集成CAD功能)在线CAD中配置属性的使用教程
本文介绍了Mxcad SDK在线预览和编辑CAD图纸的功能及配置方法。通过Vite、CDN或Webpack实现集成,用户可自定义设置以满足项目需求。主要内容包括:1)`createMxCad()`方法的初始属性配置,如画布ID、WASM文件路径、字体加载路径等;2)`MxFun.setIniset()`方法提供的更多CAD初始配置;3)`McObject`对象API用于动态调整视图背景色、浏览模式等。此外,还提供了在线Demo(https://demo2.mxdraw3d.com:3000/mxcad/)供用户测试实时效果。
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
596 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
3039 14
|
7月前
|
Java 数据库连接 数据库
微服务——SpringBoot使用归纳——Spring Boot集成MyBatis——MyBatis 介绍和配置
本文介绍了Spring Boot集成MyBatis的方法,重点讲解基于注解的方式。首先简述MyBatis作为持久层框架的特点,接着说明集成时的依赖导入,包括`mybatis-spring-boot-starter`和MySQL连接器。随后详细展示了`properties.yml`配置文件的内容,涵盖数据库连接、驼峰命名规范及Mapper文件路径等关键设置,帮助开发者快速上手Spring Boot与MyBatis的整合开发。
881 0
|
7月前
|
缓存 Java 应用服务中间件
微服务——SpringBoot使用归纳——Spring Boot集成Thymeleaf模板引擎——依赖导入和Thymeleaf相关配置
在Spring Boot中使用Thymeleaf模板,需引入依赖`spring-boot-starter-thymeleaf`,并在HTML页面标签中声明`xmlns:th=&quot;http://www.thymeleaf.org&quot;`。此外,Thymeleaf默认开启页面缓存,开发时建议关闭缓存以实时查看更新效果,配置方式为`spring.thymeleaf.cache: false`。这可避免因缓存导致页面未及时刷新的问题。
287 0