Spring整合Kafka

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Spring整合Kafka

Kafka简介


-Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。

-Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。


• Kafka特点


  • 可靠性:Kafka是一个具有分区机制、副本机制和容错机制的分布式消息系统
  • 可扩展性:Kafka消息系统支持集群规模的热扩展
  • 高性能:Kafka在数据发布和订阅过程中都能保证数据的高吞吐量。即便在TB级数据存储的情况下,仍然能保证稳定的性能。


• Kafka术语


  • Topic:在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。
  • Partition:topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
  • Partition offset:每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。
  • Replicas of partition:副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。
  • Broker:
  • Kafka 集群包含一个或多个服务器,服务器节点称为broker。
  • broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
  • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
  • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
  • Producer:生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
  • Consumer:消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
  • Leader:每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
  • Follower:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

  • Broker:Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
  • Zookeeper:Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
  • Producer:生产者将数据推送到broker上,当集群中出现新的broker时,所有的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
  • Consumer:因为Kafka的broker是无状态的,所以consumer必须使用partition offset来记录消费了多少数据。如果一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以通过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。


• 引入依赖


- spring-kafka


1.    <dependency>
2.      <groupId>org.springframework.kafka</groupId>
3.      <artifactId>spring-kafka</artifactId>
4.    </dependency>

• 配置Kafka


- 配置server、consumer

1. # KafkaProperties
2. spring.kafka.bootstrap-servers=localhost:9092
3. spring.kafka.consumer.group-id=community-consumer-group
4. spring.kafka.consumer.enable-auto-commit=true
5. spring.kafka.consumer.auto-commit-interval=3000

- 生产者

kafkaTemplate.send(topic, data);

1. @Component
2. class KafkaProducer {
3. 
4. @Autowired
5. private KafkaTemplate kafkaTemplate;
6. 
7. public void sendMessage(String topic, String content) {
8.         kafkaTemplate.send(topic, content);
9.     }
10. 
11. }

- 消费者

@KafkaListener(topics = {"test"})

public void handleMessage(ConsumerRecord record) {}

1. @Component
2. class KafkaConsumer {
3. 
4. @KafkaListener(topics = {"test"})
5. public void handleMessage(ConsumerRecord record) {
6.         System.out.println(record.value());
7.     }
8. 
9. }

主函数调用

1. @Test
2. public void testKafka() {
3.         kafkaProducer.sendMessage("test", "你好");
4.         kafkaProducer.sendMessage("test", "在吗");
5. 
6. try {
7.             Thread.sleep(1000 * 10);
8.         } catch (InterruptedException e) {
9.             e.printStackTrace();
10.         }
11.     }

整体代码


1. package com.nowcoder.community;
2. 
3. import org.apache.kafka.clients.consumer.ConsumerRecord;
4. import org.junit.Test;
5. import org.junit.runner.RunWith;
6. import org.springframework.beans.factory.annotation.Autowired;
7. import org.springframework.boot.test.context.SpringBootTest;
8. import org.springframework.kafka.annotation.KafkaListener;
9. import org.springframework.kafka.core.KafkaTemplate;
10. import org.springframework.stereotype.Component;
11. import org.springframework.test.context.ContextConfiguration;
12. import org.springframework.test.context.junit4.SpringRunner;
13. 
14. @RunWith(SpringRunner.class)
15. @SpringBootTest
16. @ContextConfiguration(classes = CommunityApplication.class)
17. public class KafkaTests {
18. 
19. @Autowired
20. private KafkaProducer kafkaProducer;
21. 
22. @Test
23. public void testKafka() {
24.         kafkaProducer.sendMessage("test", "你好");
25.         kafkaProducer.sendMessage("test", "在吗");
26. 
27. try {
28.             Thread.sleep(1000 * 10);
29.         } catch (InterruptedException e) {
30.             e.printStackTrace();
31.         }
32.     }
33. 
34. }
35. 
36. @Component
37. class KafkaProducer {
38. 
39. @Autowired
40. private KafkaTemplate kafkaTemplate;
41. public void sendMessage(String topic, String content) {
42.         kafkaTemplate.send(topic, content);
43.     }
44. 
45. }
46. 
47. @Component
48. class KafkaConsumer {
49. @KafkaListener(topics = {"test"})
50. public void handleMessage(ConsumerRecord record) {
51.         System.out.println(record.value());
52.     }
53. 
54. }
目录
相关文章
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
107 3
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
145 4
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
110 0
|
4月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
55 8
|
3月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
38 0
|
4月前
|
消息中间件 Java Kafka
Spring Boot与Apache Kafka Streams的集成
Spring Boot与Apache Kafka Streams的集成
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1

热门文章

最新文章

下一篇
无影云桌面