引言
代码已提交至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringBoot-Kafka-Demo
搭建教程上两篇博客又讲,可以参考:
为了方便起见,建议参考: 《消息中间件系列教程(21) -Kafka- 集群搭建(自带Zookeeper)》
SpringBoot集成Kafka
1.新建Maven项目
2.添加Maven依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ylw</groupId> <artifactId>com.ylw.springboot.kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- springBoot集成kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> </project>
3.添加application.yml
# kafka spring: kafka: # kafka服务器地址(可以多个) bootstrap-servers: 192.168.162.131:9092,192.168.162.132:9092,192.168.162.133:9092 consumer: # 指定一个默认的组名 group-id: kafka2 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 # 服务器地址 bootstrap-servers: 192.168.162.131:9092,192.168.162.132:9092,192.168.162.133:9092
4.Controller代码:
package com.ylw; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { /** * 注入kafkaTemplate */ @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 发送消息的方法 * * @param key * 推送数据的key * @param data * 推送数据的data */ private void send(String key, String data) { // topic 名称 key data 消息数据 kafkaTemplate.send("my_test", key, data); } // test 主题 1 my_test 3 @RequestMapping("/kafka") public String testKafka() { int iMax = 6; for (int i = 1; i < iMax; i++) { send("key" + i, "data" + i); } return "success"; } /** * 消费者使用日志打印消息 */ @KafkaListener(topics = "my_test") public void receive(ConsumerRecord<?, ?> consumer) { System.out.println("topic名称:" + consumer.topic() + ",key:" + consumer.key() + ",分区位置:" + consumer.partition() + ", 下标" + consumer.offset()); } }
5.启动类:
@SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }
6.启动程序,浏览器请求:http://localhost:8080/kafka
控制台收到消息,(消费者消费消息):
使用Zookeeper可视化工具可以看到新增了一个my_test
主题:
本文完!