Kafka与Spring的整合使用

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka与Spring的整合使用

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

今天我们去看看kafka去整合spring看看需要哪些步骤吧:1.首先引入我们需要的jar包(spring的基本jar这里不再赘述了,只介绍kafka的了)

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <!-- 这个版本最好和服务端保持一致,否则可能会出现兼容问题 -->
    <version>2.4.1</version>
</dependency>
<!-- kafka -->

2.配置生产者xml(很多配置可以放到配置文件或者阿波罗里,由于是举例,所以都直接写死在代码里,方便大家理解了)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="127.0.0.1:9092" />
                <entry key="group.id" value="test_group1"/>
                <entry key="retries" value="1" />
                <entry key="batch.size" value="16384" />
                <entry key="linger.ms" value="1" />
                <entry key="buffer.memory" value="33554432" />
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>
    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="test" />
        <!--启用生产者-监听器-->
        <property name="producerListener" ref="producerListener"/>
    </bean>
    <bean id="producerListener" class="com.demo.kafka.KafkaProducerListener" />
</beans>

3.消费者xml配置,bootstrap.servers是kafka服务器地址,group.id要和生产者保持一致:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
    <!-- 1、定义consumer的参数 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!-- 集群的时候多个逗号隔开 -->
                <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                <entry key="group.id" value="test_group1"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="30000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!-- 2、创建kafkatemplate需要使用的consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>
    <!-- 3、定义实际执行消息消费的类 -->
    <bean id="messageListenerConsumerService" class="com.demo.kafka.KafkaConsumerServer"/>
    <!-- 4、消费者容器配置信息 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <!-- topic 可以是多个 -->
        <constructor-arg value="test"/>
        <property name="messageListener" ref="messageListenerConsumerService"/>
    </bean>
    <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        <!--#消费监听器容器并发数-->
        <!--concurrency = 3-->
        <property name="concurrency" value="3" />
    </bean>
</beans>

4.创建一个生产者发送消息后的监听类KafkaProducerListener:

package com.demo.kafka;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.springframework.kafka.support.ProducerListener;
public class KafkaProducerListener implements ProducerListener{
    private static final Logger LOG = Logger.getLogger(KafkaProducerListener.class);
    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        LOG.info("==========kafka发送数据成功(日志开始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------RecordMetadata:"+recordMetadata);
        LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
    }
    /**
     * 发送消息错误后调用
     */
    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
        LOG.info("==========kafka发送数据错误(日志开始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------Exception:"+exception);
        LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
        exception.printStackTrace();
    }
    /**
     * 方法返回值代表是否启动kafkaProducer监听器
     */
    @Override
    public boolean isInterestedInSuccess() {
        LOG.info("--------------✨ kafkaProducer监听器启动 ✨-------------");
        return true;
    }
}

5.创建一个消费者消息的处理类KafkaConsumerServer:

package com.demo.kafka;
import org.springframework.kafka.listener.MessageListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerServer implements MessageListener<Integer,String> {
    @Override
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
        Object o = consumerRecord.value();
        System.out.println("---收到topic:test的kafka消息---");
        System.out.println(String.valueOf(o));
    }
}

6.定义一个接口和其实现类用来发送消息 test是topic的名字,messge是消息内容:

package com.demo.service;
public interface KafkaProducerServer {
    /**
     * 发送消息
     * @param message
     * @return
     */
    void sendMesage(String message);
}
package com.demo.service.impl;
import com.demo.service.KafkaProducerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerServerImpl implements KafkaProducerServer {
    private Logger logger = LoggerFactory.getLogger(KafkaProducerServerImpl.class);
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Override
    public void sendMesage(String message) {
        kafkaTemplate.send("test",message);
    }
}
7.spring的主启动xml里加载一下生产者消费者的xml:
<!--加载kafka消费者配置-->
<import resource="classpath:kafka/consumer.xml" />
<!--加载kafka生产者配置-->
<import resource="classpath:kafka/producer.xml" />
8.写一个controller测试下发送消息是否能被消费者消费:
/**
 * 测试kafka
 *
 * @return
 */
@RequestMapping(value = "sendKafkaMessage", method = RequestMethod.GET)
@ResponseBody
public void sendKafkaMessage() {
    kafkaProducerServer.sendMesage("hello world kafka");
}


源码地址:https://gitee.com/fengyamin/FrameDemo.git

相关文章
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
91 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
72 1
|
5月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
245 3
|
5月前
|
消息中间件 Java Kafka
|
5月前
|
消息中间件 Java Kafka
|
5月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
216 4
|
5月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
131 0
|
5月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
50 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
147 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1