开发者学堂课程【SpringBoot 实战教程: SpringBoot 整合 Kafka】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/651/detail/10819
SpringBoot 整合 Kafka
1、配置好 kafka,并且成功启动,在 springboot 中如何整合 kafka,首先进行架包的依赖,以下是 kafka 的依赖。把它放入工程中,创建好工程,放入 pom 中,版本是1.1.1。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
2、kafka 是在 linux 系统中进行安装,所以需要指定它的 IP 地址,端口号默认是9092,java 端访问是9092,创建全局配置文件,a pplication.yml。
IP 地址是192.168.25.130,全局配置:
spring:
kafka:
producer:
bootstrap-servers:192.168.25.130:9092
consumer:
group-id:springboot-group1
auto-offset-reset:earliest
3、使用 kafka 完成消息的发送,KafkaProducer,消息的发送者在 kafka 中称为 Producer。
发消息:
@Component
创建对象
@EnableScheduling
开启定时任务
Public
class
Kafka
Producer{
@Autowired
Private
KafkaTemplate
kafkaTemplate;
sprinbootg 在启动时进行了自动配置,它给创建了 Kafka 的模版对象,只需要自动注入即可,使用这个模版就可以实现消息发送。
@Scheduled(cron="00/1****?")
使用定时任务,在这个方法上使用 Scheduled。每隔一秒发送一个随机生成的 uuid,都是发送到同一个 topic 中。
/**
*定时任务
*/
Public
void
send(){
{
String
message=UUID.randomUUID().toString();
发送多个消息,每个消息是不一样的,用 uuid 生成的字符串作为要发送的消息。
//发送消息
Listenable
Future
future
=
kafkaTemplate . send("tests‘’
,
msg)
;
用 send 方法进行消息的发送,要注意 Kafka 是基于发布订阅模式的,所以都是基于 topic,首先指明 topic 的名称,叫 tests,第二个参数是要发送的消息。 send 方法的返回值,返回的是 ListenableFuture future 类型的,所以在整合时利用 springboot 提供的模版就可以很方便的实现消息的发送。
}
future.addCallback(o->System.out.println("send-消息发送成功:"+m
sg
),
throwable->System.out.println("消息发送失败:"+m
sg
));
}
}
把消息发送成功的结果发送到控制台,用 future 提供的工程进行实现,如果成功就显示消息发送成功,如果出现异常,就显示消息发送失败,借助于 future 进行控制台的打印。
4、在集成时 KafkaTemplate kafkaTemplate 依赖这个模版实现消息的发送,依然是定时任务,所以就不需要写 controller。
5、启动,查看控制台打印,如果成功,就会出现每隔一秒打印一次。
send-消息发送成功: c978f505-d9f7-4275-a453-44f46eff65fc
send-消息发送成功: b2606521- 6326- 4b8g-a3ed-226bc2bb2ce9
send-消息发送成功: d3380009-3186-4534-9932- 6c7ab2b39c4c
send-消息发送成功: 491 73d42- 29e4-42b7-811e-c4 7bea227f0f
send-消息发送成功: 6853f301- fb28-43c3-b4f1- 76b4901c27f4
send-消息发送成功: a03dca15- f7e4-4dbe- 9f35-7c9399620a53
send-消息发送成功: 707df1 65-89f5-46d8- 98d8-b87b61934086.
中间是随机生成的 uuid 的值,每隔一秒发送一次。这就是在 springboot 中如何集成Kafka。