〇、前言
便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:
名词 | 解释 |
Broker 节点 | 一个Kafka节点就是一个Broker,一个和多个Broker可以组成一个Kafka集群 |
Topic 主题 | Kafka根据topic对消息进行归类,发布到kafka集群的每套消息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的 |
Producer 生产者 | 用于向Kafka中发送消息 |
Consumer 消费者 | 从Kafka中获取消息 |
Consumer Group 消费组 | 每个Consumer都会归属于一个消费组,一条消息可以同时被多个不同的消费组消费,但是只能被一个消费组中的消费者消费 |
Partition 分片 | 物理上的概念,可以将一个topic上的数据拆分为多分放到Partition中,每个Patition内部的消息是有序的。 |
本篇文章的主要目的就是操作一下Kafka,从直观感受上面使用一下它,而不是让它仅仅存在于我们理论和想象中的认知上。
那么对于这种中间件的操作,我们一般来说普遍会采用两种方式:
【方式1】通过bin路径下的脚本指令,在控制台端进行使用操作;
【方式2】通过对jar包的引用,在代码层面上进行使用操作;
在下面章节中,我们就分别针对控制台层面操作
和代码层面操作
这两个方面,对Kafka进行第一次亲密的接触。
一、控制台层面操作
对于Kafka支持多少控制台指令,在其官网(https://kafka.apache.org/documentation/#quickstart
)中就已经详细的列举出来了,我们可以很方面的从官网中获得对某个指令的解释和使用说明,如下所示:
同样,在我们的安装了Kafka的bin
目录下,也存在着对应这些指令的sh脚本文件, 也是它构成了我们可以非常方便的在控制台这一层面操作Kafka的可能性,如下所示:
虽然指令和脚本文件挺多的,但是我们没有必要从头学一遍,毕竟是第一次操作Kafka,我们只做3件事:创建Topic、通过Producer发送消息和通过Consumer接受消息。
1.1> 创建/查看主题(kafka-topics.sh)
当我们发送消息的时候,主题topic
是我们重要的参数之一,我们可以针对不同业务创建不同的topic,从而达到对消息的隔离性。在bin目录下,负责管理Topic的脚本是kafka-topics.sh,我们可以通过对它操作来实现topic的管理。下面,我们就来尝试创建一个名称为“muse
”的Topic。
kafka_2.13-3.0.0> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1 Created topic muse.
【 --bootstrap-server 】待链接到的Kafka服务地址,此处我们指定
localhost:9092
;【 --create 】执行创建Topic主题指令;
【 --topic 】指定待创建的主题名称,此处我们指定创建名称为“
muse
”的topic;【 --partitions 】指定分区个数,由于我们采用单机模式,即只有1个Broker,所以指定创建
1个分区
;【 --replication-factor 】指定创建副本的个数,此处我们指定创建
1个副本
,即主副本;
创建完主题Topic之后,我们也可以通过 --list指令 查看Kafka下所有主题列表,如下所示:
kafka_2.13-3.0.0> bin/kafka-topics.sh --list --bootstrap-server localhost:9092 muse
同样的,我们也可以通过 --describe指令 查看刚刚创建的那个名称为“muse”的Topic内的具体描述信息:
kafka_2.13-3.0.0> bin/kafka-topics.sh --describe --topic muse --bootstrap-server localhost:9092 Topic: muse TopicId: iDQpnERjSI2IvaB2kaB2aQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: muse Partition: 0 Leader: 0 Replicas: 0 Isr: 0
1.2> 生产端(kafka-console-producer.sh)
在上面,我们已经创建好了名称为“muse
”的主题Topic了,那么我们就可以尝试向这个Topic发送消息了。此时,我们可以通过使用kafka-console-producer.sh
来发送消息,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。具体发送语句如下所示:
kafka_2.13-3.0.0> bin/kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092 >message1 >message2 >
其中,通过使用
--bootstrap-server
来指定Kafka服务地址;如果配置了Kafka集群,用逗号分割即可。
1.3> 消费端(kafka-console-consumer.sh)
上面我们虽然向Kafka中发送了两条消息——message1
和message2
,但是由于此时并没有任何消费者Consumer,所以这两个消息也无法被读取。那么,我们可以利用kafka-console-consumer.sh
来执行消息消费操作,具体消费指令如下所示:
kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092
我们发现执行了上面的指令,控制台没有输入任何内容,那么,我们切换到Producer段,再发送两条消息——message3
和message4
,我们发现,此时Consumer端有消息输出了,如下所示:
kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 message3 message4
发生上面情况的原因就是,在默认情况下,消费者是从最后一条消息的偏移量+1开始消费,即:Consumer客户端启动之前的消息是不会被消费的。那如果我们想要把Consumer客户端启动之前的消息也获取到,则可以添加--from-beginning
参数即可,如下所示:
kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning message1 message2 message3 message4
二、代码层面操作
项目中引入kafka-clients
的依赖(也可以直接引入spring-kafka
的依赖,里面内嵌了kafka-clients)
2.1> 编写生产者端
2.1.1> 初始化配置
创建配置对象Properties
Properties properties = new Properties();
配置kafka的Broker列表
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");
发出消息持久化机制参数
properties.put(ProducerConfig.ACKS_CONFIG, "1");
ACKS_CONFIG的类型有如下3种:
【acks=0】表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。
【acks=1】表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。
【acks=-1/all】需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。
配置失败重试机制
properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms
配置缓存相关信息
Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。
kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。
LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。
配置key和value的序列化实现类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
2.1.2> 同步消息发送
同步消息发送代码如下所示
2.1.3> 异步消息发送
2.2> 编写消费者端
2.2.1> 初始化配置
创建配置对象Properties
Properties properties = new Properties();
配置kafka的Broker列表
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");
配置消费组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");
offset的重置策略
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
【解释】offset的重置策略——例如:创建一个新的消费组,offset是不存在的,如何对offset赋值消费。
latest:默认值,只消费自己启动之后发送到主题的消息。
earliest:第一次从头开始消费,以后按照消费offset记录继续消费。
心跳相关配置
/** Consumer给Broker发送心跳的时间间隔 */ properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); /** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */ properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
poll相关配置
/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */ properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); /** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */ properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
配置key和value的反序列化实现类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
2.2.2> 自动提交offset
自动提交offset
当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。
自动提交会出现丢失消息的情况。
因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。
2.2.3> 手动提交offset
手动提交offset
当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。
今天的文章内容就这些了:
写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享 。
更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」