Spring整合Kafka

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Spring整合Kafka

Kafka简介


-Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。

-Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。


• Kafka特点


  • 可靠性:Kafka是一个具有分区机制、副本机制和容错机制的分布式消息系统
  • 可扩展性:Kafka消息系统支持集群规模的热扩展
  • 高性能:Kafka在数据发布和订阅过程中都能保证数据的高吞吐量。即便在TB级数据存储的情况下,仍然能保证稳定的性能。


• Kafka术语


  • Topic:在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。
  • Partition:topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
  • Partition offset:每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。
  • Replicas of partition:副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。
  • Broker:
  • Kafka 集群包含一个或多个服务器,服务器节点称为broker。
  • broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
  • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
  • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
  • Producer:生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
  • Consumer:消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
  • Leader:每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
  • Follower:Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

  • Broker:Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
  • Zookeeper:Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
  • Producer:生产者将数据推送到broker上,当集群中出现新的broker时,所有的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
  • Consumer:因为Kafka的broker是无状态的,所以consumer必须使用partition offset来记录消费了多少数据。如果一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以通过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。


• 引入依赖


- spring-kafka


1.    <dependency>
2.      <groupId>org.springframework.kafka</groupId>
3.      <artifactId>spring-kafka</artifactId>
4.    </dependency>

• 配置Kafka


- 配置server、consumer

1. # KafkaProperties
2. spring.kafka.bootstrap-servers=localhost:9092
3. spring.kafka.consumer.group-id=community-consumer-group
4. spring.kafka.consumer.enable-auto-commit=true
5. spring.kafka.consumer.auto-commit-interval=3000

- 生产者

kafkaTemplate.send(topic, data);

1. @Component
2. class KafkaProducer {
3. 
4. @Autowired
5. private KafkaTemplate kafkaTemplate;
6. 
7. public void sendMessage(String topic, String content) {
8.         kafkaTemplate.send(topic, content);
9.     }
10. 
11. }

- 消费者

@KafkaListener(topics = {"test"})

public void handleMessage(ConsumerRecord record) {}

1. @Component
2. class KafkaConsumer {
3. 
4. @KafkaListener(topics = {"test"})
5. public void handleMessage(ConsumerRecord record) {
6.         System.out.println(record.value());
7.     }
8. 
9. }

主函数调用

1. @Test
2. public void testKafka() {
3.         kafkaProducer.sendMessage("test", "你好");
4.         kafkaProducer.sendMessage("test", "在吗");
5. 
6. try {
7.             Thread.sleep(1000 * 10);
8.         } catch (InterruptedException e) {
9.             e.printStackTrace();
10.         }
11.     }

整体代码


1. package com.nowcoder.community;
2. 
3. import org.apache.kafka.clients.consumer.ConsumerRecord;
4. import org.junit.Test;
5. import org.junit.runner.RunWith;
6. import org.springframework.beans.factory.annotation.Autowired;
7. import org.springframework.boot.test.context.SpringBootTest;
8. import org.springframework.kafka.annotation.KafkaListener;
9. import org.springframework.kafka.core.KafkaTemplate;
10. import org.springframework.stereotype.Component;
11. import org.springframework.test.context.ContextConfiguration;
12. import org.springframework.test.context.junit4.SpringRunner;
13. 
14. @RunWith(SpringRunner.class)
15. @SpringBootTest
16. @ContextConfiguration(classes = CommunityApplication.class)
17. public class KafkaTests {
18. 
19. @Autowired
20. private KafkaProducer kafkaProducer;
21. 
22. @Test
23. public void testKafka() {
24.         kafkaProducer.sendMessage("test", "你好");
25.         kafkaProducer.sendMessage("test", "在吗");
26. 
27. try {
28.             Thread.sleep(1000 * 10);
29.         } catch (InterruptedException e) {
30.             e.printStackTrace();
31.         }
32.     }
33. 
34. }
35. 
36. @Component
37. class KafkaProducer {
38. 
39. @Autowired
40. private KafkaTemplate kafkaTemplate;
41. public void sendMessage(String topic, String content) {
42.         kafkaTemplate.send(topic, content);
43.     }
44. 
45. }
46. 
47. @Component
48. class KafkaConsumer {
49. @KafkaListener(topics = {"test"})
50. public void handleMessage(ConsumerRecord record) {
51.         System.out.println(record.value());
52.     }
53. 
54. }
目录
相关文章
|
1天前
|
消息中间件 Java Kafka
Spring整合kafka
Spring整合kafka
|
1天前
|
消息中间件 Java Kafka
spring kafka的问题集锦
spring kafka的问题集锦
21 0
|
1天前
|
消息中间件 Java Kafka
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
|
1天前
|
消息中间件 Java Kafka
玩转Kafka—Spring&Go整合Kafka
玩转Kafka—Spring&Go整合Kafka
38 0
|
1天前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
56 0
spring boot 集成kafka
|
1天前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
99 2
|
8月前
|
消息中间件 XML Java
Kafka与Spring的整合使用
Kafka与Spring的整合使用
82 0
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17132 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
XML 消息中间件 Apache
spring集成kafka
一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'   二、发消息(生产者) 2.1 xml配置 1 2 6 7 8 9 ...
1560 0
|
1天前
|
Java 应用服务中间件 Maven
SpringBoot 项目瘦身指南
SpringBoot 项目瘦身指南
56 0