点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka (正在更新…)
章节内容
上节我们完成了:
topics.sh、producer.sh、consumer.sh 脚本的基本使用
pom.xml 配置
JavaAPI的使用:producer 和 consumer
简单介绍
在Spring Boot中使用Kafka,是构建分布式消息驱动应用程序的一种常见方法。Kafka的强大之处在于其高吞吐量、低延迟和良好的可扩展性,非常适合处理大量实时数据。
Kafka的基本概念
Producer(生产者): 负责向Kafka的主题(topic)发送消息。
Consumer(消费者): 从Kafka的主题中读取消息。
Broker(代理): Kafka集群中的节点,负责消息的存储和传输。
Topic(主题): 类似于消息队列的概念,用于分类和组织消息。一个topic可以有多个分区(partition),每个分区是一个日志(log)。
Partition(分区): Kafka中的主题被分成多个分区,每个分区内部的消息是有序的,但分区之间是无序的。
Consumer Group(消费者组): 一组消费者组成的一个逻辑订阅者,保证每条消息在消费者组中只被一个消费者消费。
spring-kafka
Spring-Kafka 是 Spring 框架对 Apache Kafka 的集成,使得在 Spring 应用中使用 Kafka 更加简便和直观。它提供了一系列功能和配置选项来帮助开发者快速构建基于消息驱动的微服务架构。
KafkaTemplate
KafkaTemplate 是 Spring-Kafka 提供的用于发送消息的核心类。它简化了生产者与 Kafka 交互的过程。你可以通过这个类轻松地将消息发送到 Kafka 的主题中。
KafkaListener
@KafkaListener 是用于消费 Kafka 消息的注解。通过这个注解,可以非常方便地定义消息消费者,处理从指定主题接收到的消息。
Spring-Kafka 的配置
Spring-Kafka 支持通过配置文件来配置 Kafka 客户端的属性。这些配置可以在 application.properties 或 application.yml 中指定。
架构图
上节已经出现过了,这里再放一次
POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>springboot-kafka</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置文件
我们常见的配置文件如下图:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer template: default-topic: my-topic
Producer
编写代码
编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。
@RestController public class KafkaProducerController { @Resource private KafkaTemplate<Integer, String> kafkaTemplate; @RequestMapping("/sendSync/{message}") public String sendSync(@PathVariable String message) { ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message); ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record); try { SendResult<Integer, String> result = future.get(); System.out.println(result.getProducerRecord().key() + "->" + result.getProducerRecord().partition() + "->" + result.getProducerRecord().timestamp()); } catch (Exception e) { e.printStackTrace(); } return "Success"; } @RequestMapping("/sendAsync/{message}") public String sendAsync(@PathVariable String message) { ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message); ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { System.out.println("发送失败!"); ex.printStackTrace(); } @Override public void onSuccess(SendResult<Integer, String> result) { System.out.println("发送成功"); System.out.println(result.getProducerRecord().key() + "->" + result.getProducerRecord().partition() + "->" + result.getProducerRecord().timestamp()); } }); return "Success"; } }
测试结果
http://localhost:8085/sendSync/wzktest1 http://localhost:8085/sendAsync/wzktest2 http://localhost:8085/sendAsync/wzktest222222
我们观察控制台的效果如下:
Consumer
编写代码
编一个类来实现Consumer:
@Configuration public class KafkaConsumer { @KafkaListener(topics = {"wzk_topic_test"}) public void consume(ConsumerRecord<Integer, String> consumerRecord) { System.out.println( consumerRecord.topic() + "\t" + consumerRecord.partition() + "\t" + consumerRecord.offset() + "\t" + consumerRecord.key() + "\t" + consumerRecord.value()); } }
测试运行
2024-07-12 13:48:46.831 INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}} 2024-07-12 13:48:46.926 INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : wzk-test: partitions assigned: [wzk_topic_test-0] wzk_topic_test 0 13 1 wzktest wzk_topic_test 0 14 2 wzktest222 wzk_topic_test 0 15 2 wzktest222222
控制台的截图如下: