分布式实时消息队列Kafka(三)生产分区规则

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 分布式实时消息队列Kafka(三)生产分区规则

分布式实时消息队列Kafka(三)

知识点01:课程回顾

  1. 请简述Kafka的集群架构及角色功能?
  • Kafka:分布式主从架构
  • 主: Controller:管理集群中的Topic、分区、副本选举
  • 从:Broker:对外接受读写请求,存储分区数据
  • Zookeeper
  • 辅助选举Active的主节点:Crontroller
  • 存储核心元数据
  1. 请简述Kafka中Topic管理的脚本及常用选项参数?
  • 使用命令行中的脚本命令实现管理
  • 脚本:kafka-topics.sh
  • 常用选项
  • –topic
  • –create
  • –list
  • –describe
  • –delete
  • –alter:调整Topic的配置
  • –bootstrap-server / --broker-list
  1. 请简述如何使用Kafka Simple Java API 实现数据生产?描述具体的类及方法
  • step1:构建生产者连接对象:KafkaProducer
  • 需要配置对象:管理配置,例如连接地址:Properties
  • step2:KafkaProducer:send:生产数据到Kafka中
  • 需要构建一个生产的数据对象:ProducerRecord
  • ProducerRecord(Topic,Value)
  • ProducerRecord(Topic,Key,Value)
  • ProducerRecord(Topic,Partition,Key,Value)
  1. 请简述如何使用Kafka Simple Java API 实现数据消费?描述具体的类及方法
  • step1:构建消费者连接对象:KafkaConsumer
  • 需要配置对象:管理配置,例如连接地址:Properties
  • step2:消费者需要订阅Topic
  • KafkaConsumer:subscribe(List)
  • step3:消费数据
  • KafkaConsumer:poll
  • ConsumerRecords:拉取到的所有数据
  • ConsumerRecord:消费到的每一条数据
  • topic
  • partition
  • offset
  • key
  • value
  1. 请简述Kafka生产数据时如何保证生产数据不丢失?
  • acks:返回的确认,当接收方收到数据以后,就会返回一个确认的消息
  • 生产者向Kafka生产数据,根据配置要求Kafka返回ACK
  • ack=0:生产者不管Kafka有没有收到,直接发送下一条
  • 优点:快
  • 缺点:容易导致数据丢失,概率比较高
  • ack=1:生产者将数据发送给Kafka,Kafka等待这个分区leader副本写入成功,返回ack确认,生产者发送下一条
  • 优点:性能和安全上做了平衡
  • 缺点:依旧存在数据丢失的概率,但是概率比较小
  • ack=all/-1:生产者将数据发送给Kafka,Kafka等待这个分区所有副本全部写入,返回ack确认,生产者发送下一条
  • 优点:数据安全
  • 缺点:慢
  • 如果Kafka没有返回ACK怎么办?
  • 生产者会等待Kafka返回ACK,有一个超时时间,如果Kafka在规定时间内没有返回ACK,说明数据丢失了
  • 生产者有重试机制,重新发送这条数据给Kafka
  • 问题:如果ack在中途丢失,Kafkahi导致数据重复问题,怎么解决?

知识点02:课程目标

  1. 生产数据的分区规则?【重点】
  • 常见的规则
  • MapReduce:Hash分区
  • 优点:相同的Key会进入同一个分区
  • reduce0:part-r-00000
hadoop    6
spark   7
  • reduce1:part-r-00001
hbase   9
hive    10
  • 缺点:数据倾斜的问题
  • 如果所有Key的Hash取余的结果一样,导致数据分配不均衡的问题
  • Hbase:范围分区
  • 轮询分区
  • 优点:数据分配更加均衡
  • 缺点:相同Key的数据进入不同的分区中
  • 随机分区
  • 槽位分区
  • 有几种规则?
  • 每种规则的优缺点是什么?
  1. Kafka中消费数据如何保证不丢失不重复【重点】
  • Kafka中的消费者是如何消费数据?
  • 消费者根据每个分区的Offset来进行消费
  • 问题:数据丢失和数据重复的问题
  • 如何解决数据安全的问题?
  • 如果保证每次消费的offset是正确的offset?

知识点03:生产分区规则

  • 目标掌握Kafka生产者生产数据的分区规则
  • 路径
  • 问题:为什么生产数据的方式不同,分区的规则就不一样?
- ProducerRecord(Topic,Value)
- ProducerRecord(Topic,Key,Value)
- ProducerRecord(Topic,Partition,Key,Value)
  • step1:先判断是否指定了分区
- ProducerRecord(Topic,Partition,Key,Value)
  • step2:再判断是否给定了Key
  • 指定了Key
- ProducerRecord(Topic,Key,Value)
  • 没有指定Key
- ProducerRecord(Topic,Value)
  • 实施
  • 如果指定了分区的规则:写入所指定的分区中

  • 如果没指定分区
  • 默认调用的是DefaultPartitioner分区器中partition这个方法
  • 如果指定了Key:按照Key的Hash取余分区的个数,来写入对应的分区
  • 如果没有指定Key
  • 2.x之前:轮询分区
  • 优点:数据分配相对均衡
Topic   part    key   value
topic   0     1   itcast1
topic   1     2   itcast2
topic   2     3   itcast3
topic   0     4   itcast4
topic   1     5   itcast5
topic   2     6   itcast6
topic   0     7   itcast7
topic   1     8   itcast8
topic   2     9   itcast9
  • 缺点:性能非常差
  • Kafka生产者写入数据:先将数据放入一个缓存中,与分区构建一个连接,发送一个批次的数据
  • 第一条数据:先构建0分区的连接,第二条不是0分区的,所以直接构建一个批次,发送第一条
  • 第二条数据:先构建1分区的连接,第三条不是1分区的,所以直接构建一个批次,发送第二条
  • ……
  • 每条数据需要构建一个批次,9条数据,9个批次,每个批次一条数据
  • 批次多,每个批次数据量少,性能比较差
  • 希望:批次少,每个批次数据量多,性能比较好
  • 2.x之后:黏性分区
  • 设计:让数据尽量的更加均衡,实现少批次多数据
  • 规则
  • 第一次:将所有数据随机选择一个分区,全部写入这个分区中,将这次的分区编号放入缓存中
bigdata01 1 37  null  itcast0
bigdata01 1 38  null  itcast1
bigdata01 1 39  null  itcast2
bigdata01 1 40  null  itcast3
bigdata01 1 41  null  itcast4
bigdata01 1 42  null  itcast5
bigdata01 1 43  null  itcast6
bigdata01 1 44  null  itcast7
bigdata01 1 45  null  itcast8
bigdata01 1 46  null  itcast9
  • 第二次开始根据缓存中是否有上一次的编号
  • 有:直接使用上一次的编号
  • 如果没有:重新随机选择一个
  • 小结
  • Kafka中生产数据的分区规则是什么?
  • 是否指定了分区
  • 就写入对应的分区
  • 如果没有执行分区
  • 是否指定了Key:Key的Hash取余分区
  • 没有指定Key:黏性分区
  • 尽量保证数据均衡前提下,实现少批次多数据

知识点04:自定义开发生产分区器

  • 目标掌握Kafka自定义开发生产分区器,以随机分区为例
  • 路径
  • step1:开发一个类实现Partitioner接口
  • step2:实现partition方法
  • step3:生产者加载分区器
  • 实施
  • 开发一个随机分区器
package bigdata.itcast.cn.kafka.partition;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
/**
 * @ClassName UserPartition
 * @Description TODO 自定义分区器,实现随机分区
 * @Date 2021/3/31 9:21
 * @Create By     Frank
 */
public class UserPartition implements Partitioner {
    /**
     * 返回这条数据对应的分区编号
     * @param topic:Topic的名
     * @param key:key的值
     * @param keyBytes:key的字节
     * @param value:value的值
     * @param valueBytes:value的字节
     * @param cluster:集群对象
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取Topic的分区数
        Integer count = cluster.partitionCountForTopic(topic);
        //构建一个随机对象
        Random random = new Random();
        //随机得到一个分区编号
        int part = random.nextInt(count);
        return part;
    }
    @Override
    public void close() {
        //释放资源
    }
    @Override
    public void configure(Map<String, ?> configs) {
        //获取配置
    }
}
  • 加载分区器
//指定分区器的类
props.put("partitioner.class","bigdata.itcast.cn.kafka.partition.UserPartition");
  • 小结
  • 如何构建一个自定义分区器
  • step1:构建一个类实现Partitioner接口
  • step2:实现partition方法:定义分区逻辑
  • step3:加载指定分区器即可

知识点05:消费者消费过程及问题

  • 目标掌握Kafka消费者消费过程及消费问题
  • 路径
  • step1:消费者是如何消费Topic中的数据的?
  • step2:如果消费者故障重启,消费者怎么知道自己上次消费的位置的?
  • 实施
  • Kafka中消费者消费数据的规则
  • 消费者消费Kafka中的Topic根据Offset进行消费,每次从上一次的位置继续消费
  • 第一次消费规则:由属性决定
auto.offset.reset=latest | earliest
latest:默认的值,从Topic每个分区的最新的位置开始消费
earliest:从最早的位置开始消费,每个分区的offset为0开始消费
  • 第二次消费开始:根据上一次消费的Offset位置+1继续进行消费

  • 问题1:消费者如何知道上一次消费的位置是什么?
  • 每个消费者都将自己上一次消费的offset记录自己的内存中
  • 问题2:如果因为网络资源原因,消费者故障了,重启消费者,原来内存中offset就没有了,消费者怎么知道上一次消费的位置?
  • Kafka Offset偏移量管理
  • Kafka将每个消费者消费的位置主动记录在一个Topic中:__consumer_offsets
  • 如果下次消费者没有给定请求offset,kafka就根据自己记录的offset来提供消费的位置

  • 提交的规则:根据时间自动提交
props.setProperty("enable.auto.commit", "true");//是否自动提交offset
props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
  • 小结
  • 消费者如何消费kafka中的Topic的数据
  • 第一次消费:根据属性
  • auto.offset.reset:lastest/earliest
  • 第二次消费开始:消费者在内存中记录上一次消费的offset + 1 = 这一次要消费的位置
  • 问题:如果消费者故障,重启,不知道上一次的位置怎么办?
  • Kafka中记录了每个消费者这一次要消费的位置
  • 由消费者定期的自动提交

知识点06:自动提交问题

  • 目标了解Kafka自动提交Offset存在的问题
  • 路径
  • step1:自动提交是否会出现数据丢失问题
  • step2:自动提交是否会出现数据重复问题
  • 实施
  • 自动提交的规则
  • 根据时间周期来提交下一次要消费的offset
props.setProperty("enable.auto.commit", "true");//是否自动提交offset
props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
  • 数据丢失的情况
  • 如果刚消费,还没处理,就达到提交周期,记录了当前 的offset
  • 最后处理失败,需要重启,重新消费处理
  • Kafka中已经记录消费过了,从上次消费的后面进行消费
  • 数据重复的情况
  • 如果消费并处理成功,但是没有提交offset,程序故障
  • 重启以后,kafka中记录的还是之前的offset,重新又消费一遍
  • 数据重复问题
  • 小结
  • 消费是否成功,是根据处理的结果来决定的
  • 提交offset是根据时间来决定了
  • 需要:根据处理的结果来决定是否提交offset
  • 如果消费并处理成功:提交offset
  • 如果消费处理失败:不提交offset

知识点07:实现手动提交Topic的Offset

  • 目标了解Kafka如何实现手动提交Topic的Offset实现
  • 路径
  • step1:关闭自动提交
  • step2:消费完整后手动提交
  • 实施
  • 关闭自动提交
props.setProperty("enable.auto.commit", "false");//是否自动提交offset
//        props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
  • 手动提交Offset
//处理完成以后,手动提交offset
            consumer.commitSync();
  • 小结
  • 关闭自动提交
  • 根据处理的结果来实现手动提交
  • 如果成功以后,再提交

知识点08:手动提交Topic Offset的问题

  • 目标了解Kafka实现手动提交Topic的Offset的问题
  • 路径
  • step1:Offset的设计层次
  • Offset是什么级别的概念?
  • 分区级别的概念
  • step2:手动提交Topic Offset出现数据重复问题
  • step3:解决方案是什么?
  • 实施
  • Offset的设计
  • Offset是分区级别,每个分区单独管理一套offset
  • 手动提交Topic Offset的过程中会出现数据重复
  • 举个栗子
  • 一个消费者,消费一个Topic,Topic有三个分区
  • 第一次消费
  • part0
0 hadoop
1 hive
  • part1
0 hive
1 spark
2 hue
  • part2
0 spark
1 hadoop
  • 问题:part0和part1都处理成功,当处理part2时候,程序故障,处理使用
  • offset有没有提交?没有提交
  • 重启消费者:Kafka中没有消费记录,但是消费者刚刚分区0和分区1已经消费成功了
  • 所有分区都重新消费
  • 原因
  • Offset是分区级别的
  • 提交offset是按照整个Topic级别来提交的
  • 解决
  • 提交offset的时候,按照分区来提交
  • 消费成功一个分区,就提交一个分区的offset
  • 小结
  • 导致问题:数据重复
  • 导致原因:offset是分区级别,提交时topic级别,只要有一个分区失败,整个提交失败,实际上部分分区已经处理成功了

知识点09:手动提交分区Offset的实现

  • 目标掌握Kafka实现手动提交Partition的Offset
  • 路径
  • step1:消费每个分区的数据
  • step2:处理输出每个分区的数据
  • step3:手动提交每个分区的Offset
  • 实施
  • 获取所有数据中的分区
Set<TopicPartition> partitions = records.partitions();
  • 从所有数据中取出每个分区的数据,输出,手动提交分区的Offset
//取出每个Partition的数据
            for (TopicPartition partition : partitions) {
                //将这个partition的数据从records中取出
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);
                //遍历这个分区的每一条数据
                //取出每一条数据
                long offset = 0;
                for (ConsumerRecord<String, String> record : partRecords) {
                    //获取topic
                    String topic = record.topic();
                    //获取分区
                    int part= record.partition();
                    //获取offset
                    offset = record.offset();
                    //获取Key
                    String key = record.key();
                    //获取Value
                    String value = record.value();
                    System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
                }
                //分区数据处理结束,提交分区的offset
                Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);
            }
  • 观察结果
  • 每个分区处理成功,就提交offset
  • 小结
  • 怎么实现基于分区提交offset
  • step1:获取所有数据
  • step2:获取所有分区
  • step3:处理每个分区的数据
  • step4:处理成功,提交分区处理的offset + 1
  • 注意:工作中:一般不将Offset由Kafka存储,一般自己存储
  • 如果处理成功:将offset存储在MySQL或者Zookeeper中
  • 如果重启程序:从MySQL或者Zookeeper读取上一次的offset来实现

知识点10:指定消费Topic分区的数据

  • 目标掌握Kafka如何实现消费指定分区的数据
  • 路径
  • step1:构建Topic分区对象
  • step2:指定消费Topic的分区
  • step3:输出消费结果
  • 实施
  • 构建Topic分区对象
//构建分区对象
TopicPartition part0 = new TopicPartition("bigdata01", 0);
TopicPartition part1 = new TopicPartition("bigdata01", 1);
  • 指定消费Topic分区
//指定消费某些分区的数据
consumer.assign(Arrays.asList(part0,part1));
  • 观察结果

  • 小结
  • 构建Topic的分区对象:TopicPartition
  • 消费者指定消费分区:consumer.assign(Collection)

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

7943)]

  • 小结
  • 构建Topic的分区对象:TopicPartition
  • 消费者指定消费分区:consumer.assign(Collection)

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>


目录
相关文章
|
13天前
|
消息中间件 分布式计算 算法
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
43 5
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
|
13天前
|
消息中间件 SQL 分布式计算
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
47 7
|
2天前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
13天前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
36 4
|
13天前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
34 3
|
13天前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
24 1
|
13天前
|
消息中间件 存储 分布式计算
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
16 1
|
4天前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
6天前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
2月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
94 2
基于Redis的高可用分布式锁——RedLock