探究Kafka原理-2.Kafka基本命令实操(上):https://developer.aliyun.com/article/1413712
生产者:kafka-console-producer
bin/kafka-console-producer.sh --broker-list doitedu01:9092 --topic test
>hello word >kafka >nihao
其实存在着一些思考和问题,比如我们根本不知道到底是不是写进去了,那么我们应该怎么办?
消费者:kafka-console-consumer
消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量 起始偏移量的指定策略有 3 种: earliest 从最早的开始消费 latest 从最新的开始消费 指定的 offset( 分区号:偏移量) 从你指定的位置开始消费 从之前所记录的偏移量开始消费 kafka 的 topic 中的消息,是有序号的(序号叫消息偏移量),而且消息的偏移量是在各个 partition 中 独立维护的,在各个分区内,都是从 0 开始递增编号!
消费消息(从开始的开始消费)
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --from-beginning --topic test
但是会存在一种情况,比如说 先生产了很多消息进集群中,然后开始消费的话,可能不会保证有序,因为数据是存储在不同的分区中的,消费者在消费的时候,是先把一个分区的数据消费完,然后再去消息其他分区。所以这也就导致了全局顺序不一致的情况。
如果不加 --from-beginning 默认从最新的开始消费 当再次执行消费者的时候,会返回0条,因为已经没有最新的了,已经存在的都叫老数据了。
如果此时还想让消费者 消费到数据,那就去生产新的数据。
指定要消费的分区,和要消费的起始 offset
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic doit14 --offset 2 --partition 0
在这里其实要明白的一个点就是,生产者把数据写入topic的时候,默认是把数据在多个分区间轮询写入。
每一个消息都有一个序号,对应的消息的序号(offset)递增都是每个分区内管理的,消息的offset在topic中并不会有全局的递增号。所以offset是在各个分区内独立维护的,那么也就意味着每个分区中,都有offset=0的消息
消费组
消费组是 kafka 为了提高消费并行度的一种机制!
如果只有一个消费者,那么就会是这样
消费者轮询消费对应的分区。
而如果topic中数据量太大,而你需要多个并行处理任务去处理topic中的数据,那么就需要消费组。
消费组内的各个消费者之间,分担数据读取任务的最小单位是分区。
同一个分区只会被消费组内某一个消费者来负责读取。
而如果出现,消费者组 中消费者 大于 分区数,那么就会剩下来。
在kafka的底层逻辑中,任何一个消费者都有自己所属的组
组和组之间,没有任何关系,大家都可以消费到目标topic中所有的数据,但是组内的各个消费者,就只能读到自己所分配的 分区
如何让多个消费者组成一个组: 就是让这些消费者的 groupId 相同即可!
KAFKA 中的消费组,可以动态增减消费者
而且消费组中的消费者数量发生任意变动,都会重新分配分区消费任务
消费位移的记录
kafka 的消费者,可以记录自己所消费到的消息偏移量,记录的这个偏移量就叫(消费位移);
记录这个消费到的位置,作用就在于消费者重启后可以接续上一次消费到位置来继续往后面消费;
其实讲白了就是为了断点续传。
例如上图 消费者A,正在读,突然消费者组里新增了一个消费者,那么这个程序,读的进程会被中断,先停,重新分配一下分区,然后再来。
kafka消费者是有这个功能的,它会自己去记消费到那条消息了,万一消费者崩了,重启也知道从哪里继续消费。
其消费的本质是按照组来记偏移量,整个偏移量组内共享,并不是按照单个消费者来记,毕竟消费者组里的消费者可以动态收缩!!!
如果此时又来了一个新的消费者组来消费 topic,那么就没有对应的偏移量。
有一个比较经典的问题:
如果我们消费一个数据,已经读到了,但是还没有来得及更新偏移量,正要更新偏移量的时候,崩溃了,那么此时重启之后会发生什么?
此时就会被重复消费。
该模式主要是 先读后记,如果是先记后读,可能连读都读不到!!!
但是还有另外的一个情况,就是可能重复的数据不止一条。
比如 消费了好几条,再记录!一次去读一批,然后去更新偏移量。
kafka的消费者去读取数据,是消费者主动向broker去请求拉取,而不是broker服务器来推送(具体拉取多少条是有参数配置的)
如果拉取的速度比进行的速度要快的话,那么消费者就经常的处于饥饿的状态,如果进来的速度比我拉取的要快,那么就会造成数据大量的积压。
如果在不同机器启动同一个消费者组里的消息者,还是能够共享偏移量的
是因为偏移量数据并没有存储在本地磁盘上,在0.11.x之前,消费者确实是把自己消费到的位置(消费位移)记录到zk上,之后,是记录在kafka的一个内部的topic中( __consumer_offsets)。
类似于mysql,也是一样的逻辑,内部也有一些系统内部表
通过指定 formatter 工具类,来对__consumer_offsets 主题中的数据进行解析;
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
consumer去记录偏移量的时候,不是 读到一批数据就记录一次,也不是记录一次后再去读数据,而是周期性定期去提交当前的位移。
如果真的发生了更新,那就去改数字,没发生更新,就和原来一样。(周期性 5s 去提交当前的位移)
其实也可以从上面的记录中就可以看到,kafka的消费者,记录新的消费位移,并不是去修改上一次的,而是重新记录(追加新记录,像日志一样)。
所以就像之前说的kafka的配置文件里面,数据存储目录,不叫data.dirs 而是叫 log.dirs,kafka之所以把自己的数据存储目录称之为 log目录,是因为,他底层存储数据的特性,类似于 “日志” 数据只能不断追加。
这种日志也不能删除,只能将超过日期的日志进行截断,留下的各个消费者组的都有。当然针对这一点,因为消费者组再启动消费的时候,是可以显示指定起始偏移量,也就是说,可以忽略之前所记录的偏移量。
如果需要获取某个特定 consumer-group 的消费偏移量信息,则需要计算该消费组的偏移量记录所在
分区: Math.abs(groupID.hashCode()) % numPartitions __consumer_offsets 的分区数为:50
配置管理 kafka-config
kafka-configs.sh 脚本是专门用来进行动态参数配置操作的,这里的操作是运行状态修改原有的配置,
如此可以达到动态变更的目的;
动态配置的参数,会被存储在 zookeeper 上,因而是持久生效的
可用参数的查阅地址: https://kafka.apache.org/documentation/#configuration
kafka-configs.sh 脚本包含:变更 alter、查看 describe 这两种指令类型;
kafka-configs. sh 支持主题、 broker 、用户和客户端这 4 个类型的配置。
kafka-configs.sh 脚本使用 entity-type 参数来指定操作配置的类型,并且使 entity-name 参数来指定操
作配置的名称。
比如查看 topic 的配置可以按如下方式执行
bin/kafka-configs.sh --zookeeper doit01:2181 --describe --entity-type topics --entity-name tpc_2
比如查看 broker 的动态配置可以按如下方式执行:
bin/kafka-configs.sh --describe --entity-type brokers --entity-name 0 --zookeeper doit01:2181
entity-type 和 entity-name 的对应关系
示例:添加 topic 级别参数
bin/kafka-configs.sh --zookeeper doit:2181 --alter --entity-type topics --entity-name tpc22 --add-config cleanup.policy=compact,max.message.bytes=10000
使用 kafka-configs.sh 脚本来变更( alter )配置时,会在 ZooKeeper 中创建一个命名形式为:
/config//的节点,并将变更的配置写入这个节点
示例:添加 broker
kafka-configs.sh --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server doit01:9092,doit02:9092,doit03:9092
动态配置 topic 参数
通过管理命令,可以为已创建的 topic 增加、修改、删除 topic level 参
添加/修改 指定 topic 的配置参数:
kafka-topics.sh --topic doitedu-tpc2 --alter --config compression.type=gzip --zookeeper doit01:2181
如果利用 kafka-configs.sh 脚本来对 topic、producer、consumer、broker 等进行参数动态
添加、修改配置参数
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1 --alter --add-config compression.type=gzip
删除配置参数
bin/kafka-configs.sh --zookeeper doitedu01:2181 --entity-type topics --entity-name tpc_1 --alter --delete-config compression.type
kafka是如何做到可以动态修改配置的?
Kafka之所以能够动态配置,是因为它设计时考虑到了在运行时动态更改配置的需求。Kafka的配置信息存储在Zookeeper中,而不是像传统的配置文件那样静态地存储在本地磁盘上。这样一来,当需要更改配置时,只需要在Zookeeper上修改对应的配置节点,Kafka会自动检测到变化并按照新的配置进行运行。
Kafka实现动态配置的原理是基于Zookeeper的Watcher机制。当Kafka启动时,会将配置信息存储在Zookeeper的一个特定目录下,并且通过Watcher监听该目录的变化。当配置信息发生变化时,Zookeeper会通知Kafka,Kafka会重新加载新的配置并应用到运行中的服务。
以下是一个简化的伪代码示例,展示了Kafka动态配置的实现原理:
# Kafka启动时初始化配置 def initialize_config(): config = load_config_from_zookeeper() # 从Zookeeper加载配置 apply_config(config) # 应用配置 # 从Zookeeper加载配置 def load_config_from_zookeeper(): config_data = zookeeper.get('/kafka/config') # 从Zookeeper获取配置数据 return parse_config(config_data) # 解析配置数据 # 解析配置数据 def parse_config(config_data): # 将配置数据解析为可用的配置对象 config = Config() config.load_from_dict(config_data) return config # 应用配置 def apply_config(config): # 根据配置更新Kafka的运行时参数 update_kafka_config(config) # 监听配置变化 def watch_config_changes(): while True: changes = zookeeper.watch('/kafka/config') # 监听配置目录 config = parse_config(changes) # 解析配置变化 apply_config(config) # 应用配置 # 修改配置 def modify_config(config_changes): zookeeper.set('/kafka/config', config_changes) # 更新Zookeeper上的配置数据 # Kafka启动时初始化配置 initialize_config() # 启动监听配置变化的线程 start_thread(watch_config_changes) # 修改配置的示例 modify_config(new_config_changes)