探究Kafka原理-6.CAP理论实践(上)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 探究Kafka原理-6.CAP理论实践

消息的精准消费


在前面学到的手动提交位移的时机选择的时候


  • 数据处理完成之前先提交偏移量

可能会发生漏处理的现象(数据丢失)


反过来说,这种方式实现了: at most once 的数据处理(传递)语义


  • 数据处理完成之后再提交偏移量

可能会发生重复处理的现象(数据重复)


反过来说,这种方式实现了: at least once 的数据处理(传递)语义


当然,数据处理(传递)的理想语义是: exactly once(精确一次)


Kafka 也能做到 exactly once(基于 kafka 的事务机制)


实现中,可以记录为 消息存储在一张表,然后偏移量存储在一张表,但是还是有可能出现问题,除非绑定为原子操作。

相当于 偏移量的更新 和 业务数据的落地绑定成一个事务

begin transaction
insert into tb1 values();
insert into t_offset values();
commit

还有一种办法就是 利用幂等性,重复就重复,但是插入数据库中的机会就只有一次,那么就能达到最终一致的效果。


所以解决数据重复的问题,有两种解决办法:


1.利用事务

2.利用幂等性


而解决数据丢失的问题,主要有三种解决办法:


  1. 启用Kafka的事务机制:Kafka提供了事务机制,可以将消息的处理和偏移量的提交放在同一个事务中进行,确保消息的处理和偏移量的提交是原子性的。通过事务机制,可以避免在数据处理完成之前就提交偏移量而导致数据丢失的问题。
  2. 手动控制偏移量的提交:可以在应用程序中手动控制偏移量的提交时机。例如,可以在消息处理完成并且已经被确认成功后再提交偏移量。这样可以确保消息得到正确处理后再进行偏移量的提交,避免数据丢失的问题。
  3. 使用At-Least-Once语义:在消费者的配置中设置enable.auto.commit为false,然后手动提交偏移量。在消息处理过程中,确保消息处理的幂等性,即多次处理同一条消息的结果是一致的。这样即使存在重复消息的情况,也能保证数据最终被处理一次。
public class Consumer实现ExactlyOnce手段1{
    public static void main(String[] args){
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
    props.put("bootstrap.servers", "doitedu01:9092");
        // 制定 consumer group
    props.put("group.id", "g1");
        // key 的反序列化类
    props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        // value 的反序列化类
    props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        // 是否自动提交 offset 
    props.put("enable.auto.commit", "false");
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
    props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 创建一个jdbc连接
        Connection conn = DriverManager.getConnection("jdbc:mysql://test:3306/abc","root","123456");
        conn.setAutoCommit(false);
        PreparedStatement pstData = conn.prepareStatement("insert into stu_info values (?,?,?,?)");
        PreparedStatement pstOffset = conn.prepareStatement("insert into t_offsets values (?,?) on DUPLICATE KEY UPDATE offset = ?");
        // 需要把消费起始位置,初始化成上一次运行所记录的消费位移中去。
        // 而且还需要考虑一个问题,消费者再均衡时会发生什么(这是一个非常严重的问题!!!)启动的时候也是要发生再均衡的。
        // 这才是真正意义上的生产意义的代码
        consumer.subscribe(Arrays.asList("user-info"));
        boolean run = true;
        while(run){
            ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
                try{
                    String data = record.value();
                // 解析原始数据
                String[] fields = data.split(",");
                // 插入mysql
                ......
                  // 执行业务数据插入语句
                  pstData.execute();
                  // 更新mysql中记录的偏移量
                pstOffset.setString(1,record.topic+":"+record.partition());
                pstOffset.setLong(2,record.offset()+1);
                pstOffset.setLong(3,record.offset()+1);
                pstOffset.execute();
                // 数据没提交,mysql自动回滚
                conn.commit();
                }catch(Exception e){
                    conn.rollback();
                }
      }
        }
    }
}

起始位置的初始化、接续,要考虑两个环节:


1.程序启动时初始化

2.程序正常运行过程中发生了消费再均衡的过程,也需要进行起始位置的重新初始化

PreparedStatement pstQueryOffset = conn.prepareStatement("select offset from t_offsets where topic_partition = ?");
consumer.subscribe(Arrays.asList("user-info"),new ConsumerRebalanceListener(){
    // 被剥夺分区消费权后会调用下面的方法
    public void onPartitionsRevoked(Collection<TopicPartition> partitions){
        // 如果某些场景下,不能用事务去收拾残局的话,可以在这个方法里面收拾
    }
    // 被分配了新的分区消费权后调用的方法
    public void onPartitionsAssigned(Collection<TopicPartition> partitions){
        try{
            // 去查询mysql 中的 t_offsets表,得到自己拥有消费权的分区的消费位移记录
            for(TopicPartition topicPartition : partitions){
                pstQueryOffset.setString(1,topicPartition.topic() + ":" + topicPartition.partition());
                ResultSet resultSet = pstQueryOffset.executeQuery();
                resultSet.next();
                long offset = resultSet.getLong("offset");
                // 将消费初始位置初始化为 数据库中查询到的偏移量
                consumer.seek(topicPartition,offset);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
});

我们的这种方法比较彻底,根本就不用kafka去提交偏移量,而是将偏移量存储在mysql中。


consumer的消费位移提交方式:


1.全自动 auto.offset.commit = true; 定时提交到 consumer_offsets中。


2.半自动 auto.offset.commit = false; 然后手动触发提交,然后手动触发提交到 consumer.commitSync() -> 提交到consumer_offset中去。


提交到consumer_offset中的好处是初始化的时候会自动去找上一个offset


3.全手动 auto.offset.commit = false; 写自己的代码,去把消费者位移保存到你自己的地方 mysql等,将来初始化也需要自己自己去从自定义存储中查询到消费者的位移。


kafka 系统的 CAP 保证


分布式系统的 CAP 理论


CAP 理论作为分布式系统的基础理论,它描述的是一个分布式系统在以下三个特性中:


  • 一致性(Consistency)
  • 可用性(Availability)
  • 分区容错性(Partition tolerance)


最多满足其中的两个特性。也就是下图所描述的。分布式系统要么满足 CA,要么 CP,要么 AP。无法同时满足 CAP。


分区容错性:


指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。


事实上我们在设计分布式系统是都会考虑到 bug,硬件,网络等各种原因造成的故障,所以即使部分节点或者网络出现故障,我们要求整个系统还是要继续使用的


(不继续使用,相当于只有一个分区,那么也就没有后续的一致性和可用性了)


可用性:


一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。


一致性:


在分布式系统完成某写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。


kafka做的处理是,为了保证数据的一致性,数据的读写永远找分区里的leader,但是就会造成系统的可用性降低了,而kafka为了提升可用性,如果分区内其它副本挂了,其它的还可能成为leader,但是一致性可能降低,(因为有可能做不到完全同步)总之这三个特性很难完全的去满足。


Kafka 作为一个商业级消息中间件,数据可靠性和可用性是优先考虑的重点,兼顾数据一致性;


分区副本机制


kafka 从 0.8.0版本开始引入了分区版本,也就是说每个分区可以认为的配置几个副本(创建主题的时候指定replication-factor,也可以在broker级别进行配置 default.replication.factor);


在众多的分区副本里面有一个副本是Leader,其余的副本是follower,所有的读写操作都是经过Leader进行的,同时follower会定期地去leader复制数据(通过心跳机制去请求同步数据)。当Leader挂了的时候,其中一个follower会重新成为新的Leader。通过分区服务,引入了数据冗余,同时也提供了kafka的数据可靠性。


Kafka的分区多副本架构是Kafka可靠性保证的核心,把消息写入多个副本可以使Kafka在发生崩溃时仍能保证消息的持久性。


分区副本的数据一致性困难


kafka 让分区多副本同步的基本手段是: follower 副本定期向 leader 请求数据同步!


既然是定期同步,则 leader 和 follower 之间必然存在各种数据不一致的情景!


问题 1:分区副本间动态不一致



问题 2:消费者所见不一致


如果此时 leader 宕机,follower1 或 follower2 被选为新的 leader,则 leader 换届前后,消费者所能读取到的数据发生了不一致;


问题 3:分区副本间最终不一致



一致性问题解决方案(HW)


动态过程中的副本数据不一致,是很难解决的;


kafka 先尝试着解决上述“消费者所见不一致”及“副本间数据最终不一致”的问题;


解决方案的核心思想


在动态不一致的过程中,维护一条步进式的“临时一致线”(既所谓的 High Watermark)


高水位线 HW = ISR 副本中最小 LEO(最大结束偏移量 + 1)


底层逻辑就是:offset < HW 的message,是各副本间一致的且安全的!

(如上图所示:offset < hw : 3 的message,是所有副本都已经备份好的数据)


高水位涉及到了数据的一致性。

leader中会记录 remoteLEO

等到在请求的时候,携带的LEO就会变了

此时leader这边的hw将会变成 6 ,再来请求的时候发给的就是6了


等到follow再次同步的时候,才能知道hw是多少


探究Kafka原理-6.CAP理论实践(下):https://developer.aliyun.com/article/1413726

目录
相关文章
|
3月前
|
消息中间件 存储 负载均衡
kafka核心原理,藏在这 16 张图里
kafka核心原理,藏在这 16 张图里
25 0
|
1月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
168 4
|
3月前
|
消息中间件 存储 设计模式
Kafka原理篇:图解kakfa架构原理
Kafka原理篇:图解kakfa架构原理
74 1
|
19天前
|
消息中间件 监控 Kafka
【Kafka】Kafka 数据一致性原理
【4月更文挑战第7天】【Kafka】Kafka 数据一致性原理
|
29天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
1月前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
190 2
|
1月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
81 3
|
1月前
|
消息中间件 Java Kafka
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
438 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
44 0

热门文章

最新文章