生产者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092" /> <entry key="group.id" value="0" /> <entry key="retries" value="1" /> <entry key="batch.size" value="16384" /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="defaultTopic" /> <!-- <property name="producerListener" ref="producerListener"/>--> </bean> <!-- <bean id="producerListener" class="KafkaTest.KafkaProducerListener" />--> </beans>
消费者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092"/> <entry key="group.id" value="0"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 实际执行消息消费的类 --> <bean id="messageListernerConsumerService" class="KafkaTest.KafkaConsumerServer"/> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="defaultTopic"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans>
生产者发送消息代码:
首先注入:
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
之后进行发送:
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);
消费端:
/** * kafka消费 * Created by liuhuichao on 2017/5/12. */ public class KafkaConsumerServer implements MessageListener<String, String> { private Logger logger = Logger.getLogger(KafkaConsumerServer.class); @Override public void onMessage(ConsumerRecord<String, String> record) { logger.info("KafkaConsumerServer=============kafkaConsumer开始消费============="); String topic = record.topic(); String key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); logger.info("KafkaConsumerServer-------------topic:"+topic); logger.info("KafkaConsumerServer-------------value:"+value); logger.info("KafkaConsumerServer-------------key:"+key); logger.info("KafkaConsumerServer-------------offset:"+offset); logger.info("KafkaConsumerServer-------------partition:"+partition); logger.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~"); System.out.println("消费成功***************************************************************"); } }
除了集成kafka,熟悉spring的同志们,可能还用spring集成过redis,hbase这些东西,可以完全放心把连接什么的都交给spring,自己只关注该关注的东西。
ps:
doc地址:http://docs.spring.io/spring-kafka/docs/1.0.4.RELEASE/reference/html/index.html