分布式实时消息队列Kafka(三)
知识点01:课程回顾
- 请简述Kafka的集群架构及角色功能?
- Kafka:分布式主从架构
- 主: Controller:管理集群中的Topic、分区、副本选举
- 从:Broker:对外接受读写请求,存储分区数据
- Zookeeper
- 辅助选举Active的主节点:Crontroller
- 存储核心元数据
- 请简述Kafka中Topic管理的脚本及常用选项参数?
- 使用命令行中的脚本命令实现管理
- 脚本:kafka-topics.sh
- 常用选项
- –topic
- –create
- –list
- –describe
- –delete
- –alter:调整Topic的配置
- –bootstrap-server / --broker-list
- 请简述如何使用Kafka Simple Java API 实现数据生产?描述具体的类及方法
- step1:构建生产者连接对象:KafkaProducer
- 需要配置对象:管理配置,例如连接地址:Properties
- step2:KafkaProducer:send:生产数据到Kafka中
- 需要构建一个生产的数据对象:ProducerRecord
- ProducerRecord(Topic,Value)
- ProducerRecord(Topic,Key,Value)
- ProducerRecord(Topic,Partition,Key,Value)
- 请简述如何使用Kafka Simple Java API 实现数据消费?描述具体的类及方法
- step1:构建消费者连接对象:KafkaConsumer
- 需要配置对象:管理配置,例如连接地址:Properties
- step2:消费者需要订阅Topic
- KafkaConsumer:subscribe(List)
- step3:消费数据
- KafkaConsumer:poll
- ConsumerRecords:拉取到的所有数据
- ConsumerRecord:消费到的每一条数据
- topic
- partition
- offset
- key
- value
- 请简述Kafka生产数据时如何保证生产数据不丢失?
- acks:返回的确认,当接收方收到数据以后,就会返回一个确认的消息
- 生产者向Kafka生产数据,根据配置要求Kafka返回ACK
- ack=0:生产者不管Kafka有没有收到,直接发送下一条
- 优点:快
- 缺点:容易导致数据丢失,概率比较高
- ack=1:生产者将数据发送给Kafka,Kafka等待这个分区leader副本写入成功,返回ack确认,生产者发送下一条
- 优点:性能和安全上做了平衡
- 缺点:依旧存在数据丢失的概率,但是概率比较小
- ack=all/-1:生产者将数据发送给Kafka,Kafka等待这个分区所有副本全部写入,返回ack确认,生产者发送下一条
- 优点:数据安全
- 缺点:慢
- 如果Kafka没有返回ACK怎么办?
- 生产者会等待Kafka返回ACK,有一个超时时间,如果Kafka在规定时间内没有返回ACK,说明数据丢失了
- 生产者有重试机制,重新发送这条数据给Kafka
- 问题:如果ack在中途丢失,Kafkahi导致数据重复问题,怎么解决?
知识点02:课程目标
- 生产数据的分区规则?【重点】
- 常见的规则
- MapReduce:Hash分区
- 优点:相同的Key会进入同一个分区
- reduce0:part-r-00000
hadoop 6 spark 7
- reduce1:part-r-00001
hbase 9 hive 10
- 缺点:数据倾斜的问题
- 如果所有Key的Hash取余的结果一样,导致数据分配不均衡的问题
- Hbase:范围分区
- 轮询分区
- 优点:数据分配更加均衡
- 缺点:相同Key的数据进入不同的分区中
- 随机分区
- 槽位分区
- 有几种规则?
- 每种规则的优缺点是什么?
- Kafka中消费数据如何保证不丢失不重复【重点】
- Kafka中的消费者是如何消费数据?
- 消费者根据每个分区的Offset来进行消费
- 问题:数据丢失和数据重复的问题
- 如何解决数据安全的问题?
- 如果保证每次消费的offset是正确的offset?
知识点03:生产分区规则
- 目标:掌握Kafka生产者生产数据的分区规则
- 路径
- 问题:为什么生产数据的方式不同,分区的规则就不一样?
- ProducerRecord(Topic,Value) - ProducerRecord(Topic,Key,Value) - ProducerRecord(Topic,Partition,Key,Value)
- step1:先判断是否指定了分区
- ProducerRecord(Topic,Partition,Key,Value)
- step2:再判断是否给定了Key
- 指定了Key
- ProducerRecord(Topic,Key,Value)
- 没有指定Key
- ProducerRecord(Topic,Value)
- 实施
- 如果指定了分区的规则:写入所指定的分区中
- 如果没指定分区
- 默认调用的是DefaultPartitioner分区器中partition这个方法
- 如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区
- 如果没有指定Key
- 2.x之前:轮询分区
- 优点:数据分配相对均衡
Topic part key value topic 0 1 itcast1 topic 1 2 itcast2 topic 2 3 itcast3 topic 0 4 itcast4 topic 1 5 itcast5 topic 2 6 itcast6 topic 0 7 itcast7 topic 1 8 itcast8 topic 2 9 itcast9
- 缺点:性能非常差
- Kafka生产者写入数据:先将数据放入一个缓存中,与分区构建一个连接,发送一个批次的数据
- 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条
- 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
- ……
- 每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据
- 批次多,每个批次数据量少,性能比较差
- 希望:批次少,每个批次数据量多,性能比较好
- 2.x之后:黏性分区
- 设计:让数据尽量的更加均衡,实现少批次多数据
- 规则
- 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中
bigdata01 1 37 null itcast0 bigdata01 1 38 null itcast1 bigdata01 1 39 null itcast2 bigdata01 1 40 null itcast3 bigdata01 1 41 null itcast4 bigdata01 1 42 null itcast5 bigdata01 1 43 null itcast6 bigdata01 1 44 null itcast7 bigdata01 1 45 null itcast8 bigdata01 1 46 null itcast9
- 第二次开始根据缓存中是否有上一次的编号
- 有:直接使用上一次的编号
- 如果没有:重新随机选择一个
- 小结
- Kafka中生产数据的分区规则是什么?
- 是否指定了分区
- 就写入对应的分区
- 如果没有执行分区
- 是否指定了Key:Key的Hash取余分区
- 没有指定Key:黏性分区
- 尽量保证数据均衡前提下,实现少批次多数据
知识点04:自定义开发生产分区器
- 目标:掌握Kafka自定义开发生产分区器,以随机分区为例
- 路径
- step1:开发一个类实现Partitioner接口
- step2:实现partition方法
- step3:生产者加载分区器
- 实施
- 开发一个随机分区器
package bigdata.itcast.cn.kafka.partition; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; import java.util.Random; /** * @ClassName UserPartition * @Description TODO 自定义分区器,实现随机分区 * @Date 2021/3/31 9:21 * @Create By Frank */ public class UserPartition implements Partitioner { /** * 返回这条数据对应的分区编号 * @param topic:Topic的名 * @param key:key的值 * @param keyBytes:key的字节 * @param value:value的值 * @param valueBytes:value的字节 * @param cluster:集群对象 * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取Topic的分区数 Integer count = cluster.partitionCountForTopic(topic); //构建一个随机对象 Random random = new Random(); //随机得到一个分区编号 int part = random.nextInt(count); return part; } @Override public void close() { //释放资源 } @Override public void configure(Map<String, ?> configs) { //获取配置 } }
- 加载分区器
//指定分区器的类 props.put("partitioner.class","bigdata.itcast.cn.kafka.partition.UserPartition");
- 小结
- 如何构建一个自定义分区器
- step1:构建一个类实现Partitioner接口
- step2:实现partition方法:定义分区逻辑
- step3:加载指定分区器即可
知识点05:消费者消费过程及问题
- 目标:掌握Kafka消费者消费过程及消费问题
- 路径
- step1:消费者是如何消费Topic中的数据的?
- step2:如果消费者故障重启,消费者怎么知道自己上次消费的位置的?
- 实施
- Kafka中消费者消费数据的规则
- 消费者消费Kafka中的Topic根据Offset进行消费,每次从上一次的位置继续消费
- 第一次消费规则:由属性决定
auto.offset.reset=latest | earliest latest:默认的值,从Topic每个分区的最新的位置开始消费 earliest:从最早的位置开始消费,每个分区的offset为0开始消费
- 第二次消费开始:根据上一次消费的Offset位置+1继续进行消费
- 问题1:消费者如何知道上一次消费的位置是什么?
- 每个消费者都将自己上一次消费的offset记录自己的内存中
- 问题2:如果因为网络资源原因,消费者故障了,重启消费者,原来内存中offset就没有了,消费者怎么知道上一次消费的位置?
- Kafka Offset偏移量管理
- Kafka将每个消费者消费的位置主动记录在一个Topic中:__consumer_offsets
- 如果下次消费者没有给定请求offset,kafka就根据自己记录的offset来提供消费的位置
- 提交的规则:根据时间自动提交
props.setProperty("enable.auto.commit", "true");//是否自动提交offset props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
- 小结
- 消费者如何消费kafka中的Topic的数据
- 第一次消费:根据属性
- auto.offset.reset:lastest/earliest
- 第二次消费开始:消费者在内存中记录上一次消费的offset + 1 = 这一次要消费的位置
- 问题:如果消费者故障,重启,不知道上一次的位置怎么办?
- Kafka中记录了每个消费者这一次要消费的位置
- 由消费者定期的自动提交
知识点06:自动提交问题
- 目标:了解Kafka自动提交Offset存在的问题
- 路径
- step1:自动提交是否会出现数据丢失问题
- step2:自动提交是否会出现数据重复问题
- 实施
- 自动提交的规则
- 根据时间周期来提交下一次要消费的offset
props.setProperty("enable.auto.commit", "true");//是否自动提交offset props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
- 数据丢失的情况
- 如果刚消费,还没处理,就达到提交周期,记录了当前 的offset
- 最后处理失败,需要重启,重新消费处理
- Kafka中已经记录消费过了,从上次消费的后面进行消费
- 数据重复的情况
- 如果消费并处理成功,但是没有提交offset,程序故障
- 重启以后,kafka中记录的还是之前的offset,重新又消费一遍
- 数据重复问题
- 小结
- 消费是否成功,是根据处理的结果来决定的
- 提交offset是根据时间来决定了
- 需要:根据处理的结果来决定是否提交offset
- 如果消费并处理成功:提交offset
- 如果消费处理失败:不提交offset
知识点07:实现手动提交Topic的Offset
- 目标:了解Kafka如何实现手动提交Topic的Offset实现
- 路径
- step1:关闭自动提交
- step2:消费完整后手动提交
- 实施
- 关闭自动提交
props.setProperty("enable.auto.commit", "false");//是否自动提交offset // props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
- 手动提交Offset
//处理完成以后,手动提交offset consumer.commitSync();
- 小结
- 关闭自动提交
- 根据处理的结果来实现手动提交
- 如果成功以后,再提交
知识点08:手动提交Topic Offset的问题
- 目标:了解Kafka实现手动提交Topic的Offset的问题
- 路径
- step1:Offset的设计层次
- Offset是什么级别的概念?
- 分区级别的概念
- step2:手动提交Topic Offset出现数据重复问题
- step3:解决方案是什么?
- 实施
- Offset的设计
- Offset是分区级别,每个分区单独管理一套offset
- 手动提交Topic Offset的过程中会出现数据重复
- 举个栗子
- 一个消费者,消费一个Topic,Topic有三个分区
- 第一次消费
- part0
0 hadoop 1 hive
- part1
0 hive 1 spark 2 hue
- part2
0 spark 1 hadoop
- 问题:part0和part1都处理成功,当处理part2时候,程序故障,处理使用
- offset有没有提交?没有提交
- 重启消费者:Kafka中没有消费记录,但是消费者刚刚分区0和分区1已经消费成功了
- 所有分区都重新消费
- 原因
- Offset是分区级别的
- 提交offset是按照整个Topic级别来提交的
- 解决
- 提交offset的时候,按照分区来提交
- 消费成功一个分区,就提交一个分区的offset
- 小结
- 导致问题:数据重复
- 导致原因:offset是分区级别,提交时topic级别,只要有一个分区失败,整个提交失败,实际上部分分区已经处理成功了
知识点09:手动提交分区Offset的实现
- 目标:掌握Kafka实现手动提交Partition的Offset
- 路径
- step1:消费每个分区的数据
- step2:处理输出每个分区的数据
- step3:手动提交每个分区的Offset
- 实施
- 获取所有数据中的分区
Set<TopicPartition> partitions = records.partitions();
- 从所有数据中取出每个分区的数据,输出,手动提交分区的Offset
//取出每个Partition的数据 for (TopicPartition partition : partitions) { //将这个partition的数据从records中取出 List<ConsumerRecord<String, String>> partRecords = records.records(partition); //遍历这个分区的每一条数据 //取出每一条数据 long offset = 0; for (ConsumerRecord<String, String> record : partRecords) { //获取topic String topic = record.topic(); //获取分区 int part= record.partition(); //获取offset offset = record.offset(); //获取Key String key = record.key(); //获取Value String value = record.value(); System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value); } //分区数据处理结束,提交分区的offset Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)); consumer.commitSync(offsets); }
- 观察结果
- 每个分区处理成功,就提交offset
- 小结
- 怎么实现基于分区提交offset
- step1:获取所有数据
- step2:获取所有分区
- step3:处理每个分区的数据
- step4:处理成功,提交分区处理的offset + 1
- 注意:工作中:一般不将Offset由Kafka存储,一般自己存储
- 如果处理成功:将offset存储在MySQL或者Zookeeper中
- 如果重启程序:从MySQL或者Zookeeper读取上一次的offset来实现
知识点10:指定消费Topic分区的数据
- 目标:掌握Kafka如何实现消费指定分区的数据
- 路径
- step1:构建Topic分区对象
- step2:指定消费Topic的分区
- step3:输出消费结果
- 实施
- 构建Topic分区对象
//构建分区对象 TopicPartition part0 = new TopicPartition("bigdata01", 0); TopicPartition part1 = new TopicPartition("bigdata01", 1);
- 指定消费Topic分区
//指定消费某些分区的数据 consumer.assign(Arrays.asList(part0,part1));
- 观察结果
- 小结
- 构建Topic的分区对象:TopicPartition
- 消费者指定消费分区:consumer.assign(Collection)
附录一:Maven依赖
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> </repositories> <dependencies> <!-- Kafka的依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.4.1</version> </dependency> </dependencies>
7943)]
- 小结
- 构建Topic的分区对象:TopicPartition
- 消费者指定消费分区:consumer.assign(Collection)
附录一:Maven依赖
<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> </repositories> <dependencies> <!-- Kafka的依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.4.1</version> </dependency> </dependencies>