怎么使用Kafka?收藏这篇短文就可以了

简介: 怎么使用Kafka?收藏这篇短文就可以了

〇、前言

便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:

名词 解释
Broker 节点 一个Kafka节点就是一个Broker,一个和多个Broker可以组成一个Kafka集群
Topic 主题 Kafka根据topic对消息进行归类,发布到kafka集群的每套消息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的
Producer 生产者 用于向Kafka中发送消息
Consumer 消费者 从Kafka中获取消息
Consumer Group 消费组 每个Consumer都会归属于一个消费组,一条消息可以同时被多个不同的消费组消费,但是只能被一个消费组中的消费者消费
Partition 分片 物理上的概念,可以将一个topic上的数据拆分为多分放到Partition中,每个Patition内部的消息是有序的。

本篇文章的主要目的就是操作一下Kafka,从直观感受上面使用一下它,而不是让它仅仅存在于我们理论和想象中的认知上。

那么对于这种中间件的操作,我们一般来说普遍会采用两种方式:

方式1】通过bin路径下的脚本指令,在控制台端进行使用操作;

方式2】通过对jar包的引用,在代码层面上进行使用操作;

在下面章节中,我们就分别针对控制台层面操作代码层面操作这两个方面,对Kafka进行第一次亲密的接触。

一、控制台层面操作

对于Kafka支持多少控制台指令,在其官网(https://kafka.apache.org/documentation/#quickstart)中就已经详细的列举出来了,我们可以很方面的从官网中获得对某个指令的解释和使用说明,如下所示:

同样,在我们的安装了Kafka的bin目录下,也存在着对应这些指令的sh脚本文件, 也是它构成了我们可以非常方便的在控制台这一层面操作Kafka的可能性,如下所示:

虽然指令和脚本文件挺多的,但是我们没有必要从头学一遍,毕竟是第一次操作Kafka,我们只做3件事:创建Topic、通过Producer发送消息和通过Consumer接受消息。

1.1> 创建/查看主题(kafka-topics.sh)

当我们发送消息的时候,主题topic是我们重要的参数之一,我们可以针对不同业务创建不同的topic,从而达到对消息的隔离性。在bin目录下,负责管理Topic的脚本是kafka-topics.sh,我们可以通过对它操作来实现topic的管理。下面,我们就来尝试创建一个名称为“muse”的Topic。

kafka_2.13-3.0.0> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1
Created topic muse.

--bootstrap-server 】待链接到的Kafka服务地址,此处我们指定localhost:9092

--create 】执行创建Topic主题指令;

--topic 】指定待创建的主题名称,此处我们指定创建名称为“muse”的topic;

--partitions 】指定分区个数,由于我们采用单机模式,即只有1个Broker,所以指定创建1个分区

--replication-factor 】指定创建副本的个数,此处我们指定创建1个副本,即主副本;

创建完主题Topic之后,我们也可以通过 --list指令 查看Kafka下所有主题列表,如下所示:

kafka_2.13-3.0.0> bin/kafka-topics.sh --list --bootstrap-server localhost:9092                
muse

同样的,我们也可以通过 --describe指令 查看刚刚创建的那个名称为“muse”的Topic内的具体描述信息:

kafka_2.13-3.0.0> bin/kafka-topics.sh --describe --topic muse --bootstrap-server localhost:9092                            
Topic: muse TopicId: iDQpnERjSI2IvaB2kaB2aQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: muse Partition: 0 Leader: 0 Replicas: 0 Isr: 0

1.2> 生产端(kafka-console-producer.sh)

在上面,我们已经创建好了名称为“muse”的主题Topic了,那么我们就可以尝试向这个Topic发送消息了。此时,我们可以通过使用kafka-console-producer.sh来发送消息,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。具体发送语句如下所示:

kafka_2.13-3.0.0> bin/kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092                
>message1
>message2
>

其中,通过使用--bootstrap-server来指定Kafka服务地址;如果配置了Kafka集群,用逗号分割即可。

1.3> 消费端(kafka-console-consumer.sh)

上面我们虽然向Kafka中发送了两条消息——message1message2,但是由于此时并没有任何消费者Consumer,所以这两个消息也无法被读取。那么,我们可以利用kafka-console-consumer.sh来执行消息消费操作,具体消费指令如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092

我们发现执行了上面的指令,控制台没有输入任何内容,那么,我们切换到Producer段,再发送两条消息——message3message4,我们发现,此时Consumer端有消息输出了,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                
message3
message4

发生上面情况的原因就是,在默认情况下,消费者是从最后一条消息的偏移量+1开始消费,即:Consumer客户端启动之前的消息是不会被消费的。那如果我们想要把Consumer客户端启动之前的消息也获取到,则可以添加--from-beginning参数即可,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning
message1
message2
message3
message4

二、代码层面操作

项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面内嵌了kafka-clients)

2.1> 编写生产者端

2.1.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

发出消息持久化机制参数

properties.put(ProducerConfig.ACKS_CONFIG, "1");

ACKS_CONFIG的类型有如下3种:

【acks=0】表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。

【acks=1】表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。

【acks=-1/all】需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。

配置失败重试机制

properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次

properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

配置缓存相关信息

Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。

kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。

LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。

配置key和value的序列化实现类

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

2.1.2> 同步消息发送

同步消息发送代码如下所示

2.1.3> 异步消息发送

2.2> 编写消费者端

2.2.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

配置消费组

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");

offset的重置策略

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

【解释】offset的重置策略——例如:创建一个新的消费组,offset是不存在的,如何对offset赋值消费。

latest:默认值,只消费自己启动之后发送到主题的消息。

earliest:第一次从头开始消费,以后按照消费offset记录继续消费。

心跳相关配置

/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);

poll相关配置

/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

配置key和value的反序列化实现类

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

2.2.2> 自动提交offset

自动提交offset

当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。

自动提交会出现丢失消息的情况

因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。

2.2.3> 手动提交offset

手动提交offset

当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

相关文章
|
8月前
|
消息中间件 存储 监控
Kafka 面试题及答案整理,最新面试题
Kafka 面试题及答案整理,最新面试题
230 3
|
消息中间件 存储 缓存
Kafka 架构和原理机制 (图文全面详解)
一文了解掌握 Kafka 的基本架构、原理、特性、应用场景,以及Zookeeper 在 kafka 的作用。
Kafka 架构和原理机制 (图文全面详解)
|
2月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
3月前
|
消息中间件 存储 Java
Kafka核心知识点整理,收藏再看!
Kafka核心知识点整理,收藏再看!
Kafka核心知识点整理,收藏再看!
|
8月前
|
消息中间件 存储 设计模式
Kafka原理篇:图解kakfa架构原理
Kafka原理篇:图解kakfa架构原理
520 1
|
8月前
|
消息中间件 JSON Kafka
【十九】初学Kafka并实战整合SpringCloudStream进行使用
【十九】初学Kafka并实战整合SpringCloudStream进行使用
159 1
【十九】初学Kafka并实战整合SpringCloudStream进行使用
|
消息中间件 存储 Kafka
这是面试官最想听到的回答:谈谈你对Kafka数据存储原理的理解?
一位5年工作经验的小伙伴面试的时候被问到这样一个问题,说”谈谈你对Kafka数据存储原理的理解“。然后,这位小伙伴突然愣住了,什么是零拷贝,零拷贝跟Kafka有关系吗? 那么今天,我给大家来聊一聊我对Kafka零拷贝原理的理解。
106 0
|
消息中间件 安全 Java
kafka入门必备知识
Kafka是一个分布式流处理平台: 1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。 2. 可以储存流式的记录,并且有较好的容错性。 3. 可以在流式记录产生时就进行处理。
117 1
|
消息中间件 NoSQL 中间件
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(上)
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建
357 0
|
消息中间件 存储 运维
【深入浅出之透析RocketMQ原理及实战指南】RocketMQ学习入门指南 | ​​RocketMQ物理和逻辑架构结构精讲​
【深入浅出之透析RocketMQ原理及实战指南】RocketMQ学习入门指南 | ​​RocketMQ物理和逻辑架构结构精讲​
239 0
【深入浅出之透析RocketMQ原理及实战指南】RocketMQ学习入门指南 | ​​RocketMQ物理和逻辑架构结构精讲​

热门文章

最新文章

下一篇
开通oss服务