我们开始整合:
首先,先往pom.xml文件添加Kafka的依赖,
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
然后,接下来是配置文件(以下提供properties格式,yml格式 ,供大家随便自取),
当然注释也是要好好看看的,毕竟都一字字敲的。
application.properties
#============== kafka =================== # 指定kafka 代理地址,可以多个 #spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 spring.kafka.bootstrap-servers=192.168.x.xxx:9092 #=============== producer生产者 ======================= spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 # 缓存容量 spring.kafka.producer.buffer-memory=33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer消费者 ======================= # 指定默认消费者group id spring.kafka.consumer.group-id=test-app spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100ms # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #spring.kafka.consumer.bootstrap-servers=192.168.8.111:9092 #spring.kafka.consumer.zookeeper.connect=192.168.8.103:2181 #指定tomcat端口 server.port=8063
application.yml:
spring: # KAFKA kafka: # ָkafka服务器地址,可以指定多个 bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 #=============== producer生产者配置 ======================= producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 # 缓存容量 buffer-memory: 33554432 # ָ指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #=============== consumer消费者配置 ======================= consumer: #指定默认消费者的group id group-id: test-app #earliest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 #latest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 #none #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest enable-auto-commit: true auto-commit-interval: 100ms #指定消费key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
好了,配置工作准备完毕。
我们先来搞Kafka的生产者,也就是负责推送消息的模块:
创建一个类, 叫KafkaSender(注解不能少,留意代码),
package com.kafkademo.producer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * Hello! * Created By JCccc on 2018/11/24 * 11:25 */ @Component public class KafkaSender { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private final Logger logger = LoggerFactory.getLogger(KafkaSender.class); public void send(String topic, String taskid, String jsonStr) { //发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override //推送成功 public void onSuccess(SendResult<String, Object> result) { logger.info(topic + " 生产者 发送消息成功:" + result.toString()); } @Override //推送失败 public void onFailure(Throwable ex) { logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage()); } }); } }
以上就是kafka生产者了,到此刻,你已经可以开始往kafka服务器推送消息了
事不宜迟,我们立马试试:
创建个controller,搞个接口试试推送下消息,
@GetMapping("/sendMessageToKafka") public String sendMessageToKafka() { Map<String,String> messageMap=new HashMap(); messageMap.put("message","我是一条消息"); String taskid="123456"; String jsonStr=JSONObject.toJSONString(messageMap); //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null) kafkaSender.send("testTopic",taskid,jsonStr); return "hi guy!"; }
用postman测一下(对了,这些推送的前提是你的kafka服务器是没问题的,能正常连接)
看看控制台反应:
可以看到,我们的kafka生产者再推送消息成功后,成功进入了我们的回调函数onSuccess,也打印了日志。
没错,你已经掌握kafak生产者了,你已经掌握推送消息了。 那么接下来,我们继续搞下kafka的消费者。
我们创一个类,叫KafkaConsumer (同样,注意看代码,注解不能少) :
package com.kafkademo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Controller; import javax.servlet.http.HttpSession; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Hello! * Created By JCccc on 2018/11/24 * 13:13 */ @Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开 @KafkaListener(topics = {"testTopic"}) public void receive(ConsumerRecord<?, ?> record){ logger.info("消费得到的消息---key: " + record.key()); logger.info("消费得到的消息---value: " + record.value().toString()); } }
好,到此,kafka的消费者就这么简单完成了。
那么,我们接下来验证下,生产者推送消息到主题“testTopic”,消费者订阅主题“testTopic”,把消息消费下来:
一样,用postman来模拟下第三方调用接口,
我们看看控制台,
没错,kafka生产者跟刚刚一样,成功把消息推送到了主题testTopic上去了,回调函数OnSuccess打印了相关日志;
而,我们的kafka消费者,也是很有效率,再检测到自己订阅的主题testTopic有消息,立马消费了下来。
好了,springboot整合kafka,生产者、消费者就是这么轻松简单结束了。
当然了,该篇案例的生产者和消费者都放在了一个demo去介绍了,实际上大家使用的是按照业务场景,数据量去选择是否需要分开生产者项目&消费者项目,哪些是同时有生产者和消费者的身份的,哪些是只有生产者身份的,哪些是只有消费者身份的。