前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka如何整合SpringBoot
以及它的基本使用~
好了, 废话不多说直接开整吧~
项目搭建
同样的,需要我们搭建一个maven
工程,整合非常的简单,需要用到:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
来一起看下完整的pom.xml
:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>springboot-kafka-all</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> </parent> <dependencies> <!--web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--test--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!-- kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--Hutool依赖--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.4</version> </dependency> <!--fast-json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <dependency> <groupId> org.slf4j </groupId> <artifactId> slf4j-api </artifactId> <version> 1.6.4 </version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.3.RELEASE</version> </plugin> </plugins> </build> </project>
配置也很简单 application.yml
server: port: 8081 spring: kafka: producer: bootstrap-servers: 127.0.0.1:9092
然后新建一个启动类,看下控制台是否成功链接了Kafka
,在启动之前别忘了开启我们的Kafka集群
哦~
基本使用
先从一个简单的例子,来快速体验一下Kafka
,新建HelloController
@Slf4j @RestController public class HelloController { private static final String topic = "test"; @Autowired private KafkaTemplate<Object, Object> kafkaTemplate; // 接收消息 @KafkaListener(id = "helloGroup", topics = topic) public void listen(String msg) { log.info("hello receive value: {}" , msg); // hello receive value: hello kafka } @GetMapping("/hello") public String hello() { // 发送消息 kafkaTemplate.send(topic, "hello kafka"); return "hello"; } }
我们通过KafkaTemplate
进行消息的发送, 通过@KafkaListener
进行消息的消费,我们可以指定消费者ID
以及监听的topic
,请求localhost:8081/hello
观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI
后台的消息详情,同步对比
topic创建
之前我们的topic
是在UI
后台创建的,那么在SpringBoot
中如何创建呢? 下面我们试着发送一个不存在的topic
// 当topic不存在时 会默认创建一个topic // num.partitions = 1 #默认Topic分区数 // num.replica.fetchers = 1 #默认副本数 @GetMapping("/hello1") public String hello1() { // 发送消息 kafkaTemplate.send("hello1", "hello1"); return "hello1"; } // 接收消息 @KafkaListener(id = "hello1Group", topics = "hello1") public void listen1(String msg) { log.info("hello1 receive value: {}" , msg); // hello1 receive value: hello1 }
请求之后,观察控制台以及管理后台,发现并没有报错,并且给我们自动创建了一个topic
,在自动创建下,默认的参数是:
num.partitions = 1 #默认Topic分区数 num.replica.fetchers = 1 #默认副本数
如果我想手动创建呢?我们可以通过NewTopic
来手动创建:
@Configuration public class KafkaConfig { @Bean public KafkaAdmin admin(KafkaProperties properties){ KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); // 默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True admin.setFatalIfBrokerNotAvailable(true); // setAutoCreate(false) : 默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象 // initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象 return admin; } /** * 创建指定参数的 topic * @return */ @Bean public NewTopic topic() { return new NewTopic("hello2", 0, (short) 0); } }
如果要更新呢?也非常的简单
/** * 更新 topic * @return */ @Bean public NewTopic topicUpdate() { return new NewTopic("hello2", 1, (short) 1); }
注意这里的参数只能+
不能-
那么又有小伙伴问了,这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient
来手动创建
/** * AdminClient 创建 */ @Autowired private KafkaProperties properties; @GetMapping("/create/{topicName}") public String createTopic(@PathVariable String topicName) { AdminClient client = AdminClient.create(properties.buildAdminProperties()); if(client !=null){ try { Collection<NewTopic> newTopics = new ArrayList<>(1); newTopics.add(new NewTopic(topicName,1,(short) 1)); client.createTopics(newTopics); }catch (Throwable e){ e.printStackTrace(); }finally { client.close(); } } return topicName; }
观察下管理后台,发现topic
都创建成功了~
获取消息发送的结果
有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步
一种是异步
同步获取结果
/** * 获取通知结果 * @return */ @GetMapping("/hello2") public String hello2() { // 同步获取结果 ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send("hello2","hello2"); try { SendResult<Object,Object> result = future.get(); log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2 }catch (Throwable e){ e.printStackTrace(); } return "hello2"; }
异步获取
/** * 获取通知结果 * @return */ @GetMapping("/hello2") public String hello2() { // 发送消息 - 异步获取通知结果 kafkaTemplate.send("hello2", "async hello2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { @Override public void onFailure(Throwable throwable) { log.error("fail >>>>{}", throwable.getMessage()); } @Override public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic()); // async success >>> hello2 } }); return "hello2"; }
Kafka事务
同样的,消息也会存在事务
,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下
@GetMapping("/hello3") public String hello3() { kafkaTemplate.executeInTransaction(t -> { t.send("hello3","msg1"); if(true) throw new RuntimeException("failed"); t.send("hello3","msg2"); return true; }); return "hello3"; } // 接收消息 @KafkaListener(id = "hello3Group", topics = "hello3") public void listen3(String msg) { log.info("hello3 receive value: {}" , msg); }
默认情况下,Spring-kafka
自动生成的KafkaTemplate
实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix
来激活它
spring: kafka: producer: bootstrap-servers: 127.0.0.1:9092 transaction-id-prefix: kafka_.
启动之后,观察控制台的变化~ ,除此之外,还可以使用注解的方式@Transactional
来开启事务
// 注解方式 @Transactional(rollbackFor = RuntimeException.class) @GetMapping("/hello4") public String hello4() { kafkaTemplate.send("hello3","msg1"); if(true) throw new RuntimeException("failed"); kafkaTemplate.send("hello3","msg2"); return "hello4"; }
结束语
下节带大家看下SpringBoot整合Kafka
深入使用~
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
相关文章
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址
: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)