Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
今天我们去看看kafka去整合spring看看需要哪些步骤吧:1.首先引入我们需要的jar包(spring的基本jar这里不再赘述了,只介绍kafka的了)
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <!-- 这个版本最好和服务端保持一致,否则可能会出现兼容问题 --> <version>2.4.1</version> </dependency> <!-- kafka -->
2.配置生产者xml(很多配置可以放到配置文件或者阿波罗里,由于是举例,所以都直接写死在代码里,方便大家理解了)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="127.0.0.1:9092" /> <entry key="group.id" value="test_group1"/> <entry key="retries" value="1" /> <entry key="batch.size" value="16384" /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="test" /> <!--启用生产者-监听器--> <property name="producerListener" ref="producerListener"/> </bean> <bean id="producerListener" class="com.demo.kafka.KafkaProducerListener" /> </beans>
3.消费者xml配置,bootstrap.servers是kafka服务器地址,group.id要和生产者保持一致:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <!-- 1、定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <!-- 集群的时候多个逗号隔开 --> <entry key="bootstrap.servers" value="127.0.0.1:9092"/> <entry key="group.id" value="test_group1"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 2、创建kafkatemplate需要使用的consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 3、定义实际执行消息消费的类 --> <bean id="messageListenerConsumerService" class="com.demo.kafka.KafkaConsumerServer"/> <!-- 4、消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <!-- topic 可以是多个 --> <constructor-arg value="test"/> <property name="messageListener" ref="messageListenerConsumerService"/> </bean> <!-- 5.消费者并发消息监听容器,执行doStart()方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> <!--#消费监听器容器并发数--> <!--concurrency = 3--> <property name="concurrency" value="3" /> </bean> </beans>
4.创建一个生产者发送消息后的监听类KafkaProducerListener:
package com.demo.kafka; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; import org.springframework.kafka.support.ProducerListener; public class KafkaProducerListener implements ProducerListener{ private static final Logger LOG = Logger.getLogger(KafkaProducerListener.class); @Override public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { LOG.info("==========kafka发送数据成功(日志开始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); LOG.info("----------value:"+value); LOG.info("----------RecordMetadata:"+recordMetadata); LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~"); } /** * 发送消息错误后调用 */ @Override public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { LOG.info("==========kafka发送数据错误(日志开始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); LOG.info("----------value:"+value); LOG.info("----------Exception:"+exception); LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~"); exception.printStackTrace(); } /** * 方法返回值代表是否启动kafkaProducer监听器 */ @Override public boolean isInterestedInSuccess() { LOG.info("--------------✨ kafkaProducer监听器启动 ✨-------------"); return true; } }
5.创建一个消费者消息的处理类KafkaConsumerServer:
package com.demo.kafka; import org.springframework.kafka.listener.MessageListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.stereotype.Component; @Component public class KafkaConsumerServer implements MessageListener<Integer,String> { @Override public void onMessage(ConsumerRecord<Integer, String> consumerRecord) { Object o = consumerRecord.value(); System.out.println("---收到topic:test的kafka消息---"); System.out.println(String.valueOf(o)); } }
6.定义一个接口和其实现类用来发送消息 test是topic的名字,messge是消息内容:
package com.demo.service; public interface KafkaProducerServer { /** * 发送消息 * @param message * @return */ void sendMesage(String message); } package com.demo.service.impl; import com.demo.service.KafkaProducerServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducerServerImpl implements KafkaProducerServer { private Logger logger = LoggerFactory.getLogger(KafkaProducerServerImpl.class); @Autowired private KafkaTemplate kafkaTemplate; @Override public void sendMesage(String message) { kafkaTemplate.send("test",message); } } 7.spring的主启动xml里加载一下生产者消费者的xml: <!--加载kafka消费者配置--> <import resource="classpath:kafka/consumer.xml" /> <!--加载kafka生产者配置--> <import resource="classpath:kafka/producer.xml" /> 8.写一个controller测试下发送消息是否能被消费者消费: /** * 测试kafka * * @return */ @RequestMapping(value = "sendKafkaMessage", method = RequestMethod.GET) @ResponseBody public void sendKafkaMessage() { kafkaProducerServer.sendMesage("hello world kafka"); }