SSM集成kafka——注解,xml配置两种方式实现

简介: SSM集成kafka——注解,xml配置两种方式实现

引言


最近在和甲方 对接数据的时候,甲方要求通过kafka将处理完成数据回传,所以我们需要在项目中集成kafka,由于之前项目采用的是SSM框架,并且么有集成过kafka,所以在这里分享一下。


一、XML配置文件方式实现


1、引入jar 这两有两个地方需要注意


1) kafka-clients 包版本与服务器端kafka-clients版本保持一致(查看服务器kafka版本方法 在kafka安装目录下libs 中查找kafka-clients开头的jar文件)


2)引入的spring-kafka 版本在2.0或者2.X 时Spring版本在5.0才能支持


<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
    <exclusions>
         <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
          </exclusion>
    </exclusions>
</dependency

2、kafka.properties文件内容

# brokers集群
kafka.producer.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
kafka.producer.acks = all
#发送失败重试次数
kafka.producer.retries = 3
kafka.producer.linger.ms =  10
# 33554432 即32MB的批处理缓冲区
kafka.producer.buffer.memory = 40960
#批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能
kafka.producer.batch.size = 4096
kafka.producer.defaultTopic = nwbs-eval-task
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
################# kafka consumer ################## ,
kafka.consumer.bootstrap.servers = ip1:9092,ip2,ip3:9092
# 如果为true,消费者的偏移量将在后台定期提交
kafka.consumer.enable.auto.commit = true
#如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
kafka.consumer.auto.commit.interval.ms=1000
#order-beta 消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
kafka.consumer.group.id = sccl-nwbs
#在使用Kafka的组管理时,用于检测消费者故障的超时
kafka.consumer.session.timeout.ms = 30000
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

3.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
        <!-- 1.定义consumer的参数 -->
        <!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
                    <entry key="group.id" value="${kafka.consumer.group.id}" />
                    <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
                    <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
                    <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
                    <entry key="retry.backoff.ms" value="100" />
                    <entry key="key.deserializer"
                           value="${kafka.consumer.key.deserializer}" />
                    <entry key="value.deserializer"
                           value="${kafka.consumer.value.deserializer}" />
                </map>
            </constructor-arg>
        </bean>
        <!-- 2.创建consumerFactory bean -->
        <bean id="consumerFactory"
              class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
            <constructor-arg>
                <ref bean="consumerProperties" />
            </constructor-arg>
        </bean>
        <!--<!– 3.定义消费实现类 –>-->
        <bean id="kafkaConsumerService" class="cn.**.kafka.KafkaConsumerSerivceImpl" />
        <!-- 4.消费者容器配置信息 -->
        <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
            <!-- topic -->
            <constructor-arg name="topics">
                <list>
                    <value>${kafka.task.eval.topic}</value>
                    <value>${kafka.task.optimizeNetwork.topic}</value>
                    <value>${kafka.task.business.topic}</value>
                </list>
            </constructor-arg>
            <property name="messageListener" ref="kafkaConsumerService" />
        </bean>
        <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
        <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
            <constructor-arg ref="consumerFactory" />
            <constructor-arg ref="containerProperties" />
            <property name="concurrency" value="${kafka.consumer.concurrency}" />
        </bean>
</beans>

4.consumer-kafka.xml 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
                <!--<entry key="group.id" value="${group.id}" />-->
                <entry key="retries" value="${kafka.producer.retries}" />
                <entry key="batch.size" value="${kafka.producer.batch.size}" />
                <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
                <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
                <entry key="acks" value="${kafka.producer.acks}" />
                <entry key="key.serializer"
                       value="${kafka.producer.key.serializer}" />
                <entry key="value.serializer"
                       value="${kafka.producer.value.serializer}"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
          class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
    </bean>
</beans>

5. 调用Controller -这里 向kafka 中的 3个topic 发送了消息

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @Value("nwbs-optimizeNetwork-task")
    private  String optimizeTopic ;
    @Value("nwbs-business-task")
    private String businessTopic;
    @RequestMapping(value = "/producer" , method = RequestMethod.POST)
    public void producer(@RequestBody JSONObject params){
        kafkaTemplate.send(optimizeTopic,params.toJSONString()+"optimizeTopic");
        kafkaTemplate.send(businessTopic,params.toJSONString()+"businessTopic");
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.sendDefault(params.toJSONString());;
        //发送成功回调
        SuccessCallback<SendResult<String, String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //成功业务逻辑
                System.out.println("onSuccess");
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                //失败业务逻辑
                System.out.println("onFailure");
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}


二 注解方式实现


参考spring-kafka官方文档 https://docs.spring.io/spring-kafka/reference/htmlsingle/


1. 文件整体结构如图


b09e374551ac30956cf794276b0292c3.png


2. KafKaConsumerConfig.java代码


/**
 * @author: hsc
 * @date: 2018/6/21 15:58
 * @description kafka 消费者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    public KafkaConsumerConfig(){
        System.out.println("kafka消费者配置加载...");
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props= new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.bootstrap.servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.group.id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.enable.auto.commit"));
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.auto.commit.interval.ms"));
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.session.timeout.ms"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.key.deserializer"));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.value.deserializer"));
        return props;
    }
    @Bean
    public KafkaConsumerListener kafkaConsumerListener(){
        return new KafkaConsumerListener();
    }
}


3.KafKaProducerConfig.java

/**
 * @author: hsc
 * @date: 2018/6/21 21:30
 * @description kafka 生产者配置
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    public KafkaProducerConfig(){
        System.out.println("kafka生产者配置");
    }
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    @Bean
    public Map<String, Object> producerProperties() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.bootstrap.servers"));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.key.serializer"));
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.value.serializer"));
        props.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.retries"));
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.batch.size",1048576));
        props.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.linger.ms"));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,PropertiesUtil.getInstance().getLong("kafka.producer.buffer.memory",33554432L));
        props.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.acks","all"));
        return props;
    }
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
        kafkaTemplate.setDefaultTopic(PropertiesUtil.getInstance().getString("kafka.producer.defaultTopic","default"));
        return kafkaTemplate;
    }
}

4.KafkaConsumerListenser

/**
 * @author: hsc
 * @date: 2018/6/21 16:33
 * @description 消费者listener
 */
public class KafkaConsumerListener {
    /**
     * @param data
     */
    @KafkaListener(groupId="xxx" ,topics = "xxx")
    void listener(ConsumerRecord<String, String> data){
        System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+data.topic()+",分区:"+data.partition()
                +" ,委托时间:"+data.timestamp()+"]消息内容如下:");
        System.out.println(data.value());
    }
}


参考文章(结合官方文档一起看)

http://www.cnblogs.com/dennyzhangdd/p/7759875.html

目录
相关文章
|
1月前
|
XML 前端开发 Java
讲解SSM的xml文件
本文详细介绍了SSM框架中的xml配置文件,包括springMVC.xml和applicationContext.xml,涉及组件扫描、数据源配置、事务管理、MyBatis集成以及Spring MVC的视图解析器配置。
57 1
|
15天前
|
SQL 缓存 Java
MyBatis如何关闭一级缓存(分注解和xml两种方式)
MyBatis如何关闭一级缓存(分注解和xml两种方式)
43 5
|
19天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
1月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
76 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
1月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
31 2
|
1月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
65 2
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
131 0
|
3月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
89 7
|
3月前
|
XML Java 数据库
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
这篇文章是Spring5框架的实战教程,详细介绍了事务的概念、ACID特性、事务操作的场景,并通过实际的银行转账示例,演示了Spring框架中声明式事务管理的实现,包括使用注解和XML配置两种方式,以及如何配置事务参数来控制事务的行为。
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
|
3月前
|
XML JSON Java
使用IDEA+Maven搭建整合一个Struts2+Spring4+Hibernate4项目,混合使用传统Xml与@注解,返回JSP视图或JSON数据,快来给你的SSH老项目翻新一下吧
本文介绍了如何使用IntelliJ IDEA和Maven搭建一个整合了Struts2、Spring4、Hibernate4的J2EE项目,并配置了项目目录结构、web.xml、welcome.jsp以及多个JSP页面,用于刷新和学习传统的SSH框架。
88 0
使用IDEA+Maven搭建整合一个Struts2+Spring4+Hibernate4项目,混合使用传统Xml与@注解,返回JSP视图或JSON数据,快来给你的SSH老项目翻新一下吧