消息队列
🐓 消息队列的作用与使用场景
消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯
1.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
2.应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
3.流量削锋
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
秒杀业务根据消息队列中的请求信息,再做后续处理
4.日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
(1)Kafka:接收用户日志的消息队列
(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因
5.消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室。
(1)点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
(2)聊天室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
🐓 RabbitMQ如何保证消息的顺序性
消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:
比如通过mysql binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。
举例场景:
RabbitMQ:①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
②一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
解决方案:
①拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
②一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
一个queue对应一个consumer,采用多线程
🐓 RabbitMQ如何确保消息可靠性
消息可靠性一般来说由3方面来保证:
1.生产者
RabbitMQ提供transaction事务和confirm模式来确保生产者不丢消息;
ransaction事务机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit()),然而,这种方式有个缺点:吞吐量下降。
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;
rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,可以进行重试操作。
2. 消息队列本身
可以进行消息持久化, 即使rabbitMQ挂了,重启后也能恢复数据
如果要进行消息持久化,那么需要对以下3种实体均配置持久化
(1)Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
(2) Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
(3) message
发送消息时通过设置deliveryMode=2持久化消息
3. 消费者
消费者丢数据一般是因为采用了自动确认消息模式,消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;改为手动确认消息即可!手动确认模式下消费失败时,不将其重新放入队列(确认重试也不会成功的情形),打印错误信息后,通知相关人员,人工介入处理。
🐓 如何防止RabbitMQ消息重复消费
保证消息幂等性。幂等性概念:一个请求,不管重复来多少次,结果是不会改变的。
RabbitMQ、RocketMQ、Kafka等任何队列不保证消息不重复,如果业务需要消息不重复消费,则需要消费端处理业务消息要保持幂等性
方式一:Redis的setNX()
做消息id去重 java版本目前不支持设置过期时间
//Redis中操作,判断是否已经操作过 TODO boolean flag = jedis.setNX(key); if(flag){ //消费 }else{ //忽略,重复消费 }
方式二:redis的 Incr 原子操作
key自增,大于0 返回值大于0则说明消费过,(key可以是消息的md5取值, 或者如果消息id设计合理直接用id做key)
int num = jedis.incr(key); if(num == 1){ //消费 }else{ //忽略,重复消费 }
方式三:数据库去重表
设计一个去重表,某个字段使用Message的key做唯一索引,因为存在唯一索引,所以重复消费会失败
CREATE TABLE `message_record` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `key` varchar(128) DEFAULT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `key` (`key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
🐓 Kafka都有哪些常用的参数?
Kafka的参数主要包括生产者和消费者的配置参数
生产者的常用参数
1.acks:表示消息的确认机制,用于确定消息是否成功发送。有0、1、-1三种模式,0表示Producer请求立即返回,不需要等待Leader的任何确认;1表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功;-1表示分区Leader必须等待消息被成功写入到所有的ISR副本中才认为Producer请求成功。
2.buffer.memory:用于指定Producer端用于缓存消息的缓冲区大小。
3.compression.type:用于设置消息的压缩方式,目前支持none、gzip、snappy和lz4。
4.retries:Producer发送消息失败重试的次数。
5.batch.size:Producer按照batch进行发送,当batch满了后,Producer会把消息发送出去。
6.linger.ms:表示Producer发送消息时的等待时间,为了减少网络IO,提升整体的性能。
7.topic.metadata.refresh.interval.ms:定期的获取元数据的时间。
8.message.send.max.retries:消息发送最大尝试次数。
9.request.timeout.ms:确认超时时间。
10.partitioner.class:根据 Key 提供一个分区策略。
消费者常用参数
1.bootstrap.servers:这是一个必填的参数,用于指定消费者的Kafka集群地址。
2.group.id:消费者组ID,用于将消费者组成一个消费者组,共同消费一个topic下的消息。
3.key.deserializer 和 value.deserializer:这两个参数用于指定消息的键值对的反序列化类。
4.auto.offset.reset:这个参数用于设置消费者在启动时如何处理未初始化的偏移量。如果消费者的偏移量不存在,可以选择抛出异常、从最新消息开始读取、从最早消息开始读取等。
5.enable.auto.commit:这个参数用于控制消费者是否自动提交消费的偏移量。如果设置为true,则会自动提交;如果设置为false,则需要手动提交。
6.session.timeout.ms:消费者会话超时时间,如果在此时间内没有向broker发送心跳,broker会认为消费者已经宕机,会触发新的rebalance。
7.max.poll.records:一次拉取请求中拉取的最大消息数,默认值为500条。
8.max.poll.interval.ms:两次连续的poll请求的最小时间间隔,防止消费者过于频繁的请求broker,默认值为300000ms。
9.fetch.min.bytes:消费者从broker读取消息时最小的数据量,如果数据量小于这个阈值,broker会等待直到有足够的数据再返回给消费者。
10.fetch.max.wait.ms:消费者读取时最长等待时间,从而避免长时间阻塞。
🐓 kafka的消费组是干什么的
消费者组的主要特点包括:
1.负载均衡:消费者组能够将消息负载均衡地分配给组内的各个消费者,使得每个消费者处理相等的负载量。这种负载均衡机制可以根据消费者的处理能力自动调整分配给每个消费者的消息量,从而提高整体的处理效率。
2.容错性:在消费者组中,每个消费者都独立消费分配给该消费者组的消息。在消费过程中,消费者之间不会相互干扰,不会重复消费同一条消息,也不会漏掉任何一条消息。这种机制确保了消息处理的可靠性和一致性,当某个消费者出现故障时,其他消费者可以继续处理消息,保证了系统的容错性。
3.扩展性:通过将消费者组织成组,可以实现动态的扩展。当需要增加或减少处理能力时,只需添加或删除消费者实例,而不需要对整个系统进行重新设计或配置。
4.灵活性:消费者组允许单个消费者订阅主题的部分分区(Partition),而不是必须订阅主题的所有分区。这种灵活性使得可以根据实际需求调整消费者的订阅模式,从而更好地利用集群资源。
5.有序性:在消费者组中,每个分区只能由同一个消费者组中的一个消费者实例来消费。这种有序性确保了消息处理的顺序一致性,有助于维护数据的一致性和完整性。
🐓 Kafka是怎么处理消息的?比如一个topic只能有一个消息吗?
处理消息步骤
1.生产者发送消息:Kafka的生产者将消息发送到Kafka集群中的一个或多个主题(Topic)中。这些消息可以是任何类型的数据,例如文本、图像、音频或视频等。
2.分区与存储:Kafka主题由一个或多个分区(Partition)组成,每个分区都是一个有序的、不可变的消息序列。当生产者发送消息时,Kafka根据消息的Key值将其路由到一个特定的分区中。Kafka仅保证在一个分区内部,消息是有序的,对于多个分区,无法保证消息的顺序。
3.副本与容错:Kafka的每个分区都有多个副本,这些副本分布在不同的Broker上,以提高系统的可用性和容错性。如果某个Broker发生故障,其他Broker上的副本可以继续提供服务。
4.消费者消费消息:Kafka的消费者从消费者组中获取订阅的主题的消息,并处理这些消息。消费者使用Kafka提供的API与Kafka集群进行交互,从指定的主题和分区中拉取消息进行消费。
主题只能包含一个消息的问题,实际上每个主题可以有多个分区,每个分区包含一系列有序的、不可变的消息。因此,一个主题可以包含多个分区,每个分区包含多条消息。消费者可以订阅主题的一个或多个分区来消费其中的消息。
🐓 Kafka如何保证消息不丢失
1.分区持久化存储:Kafka的分区是持久化的,这意味着它们保存在磁盘上,并且在Broker重启后仍然可用。这种持久化存储机制确保了即使在系统故障的情况下,消息也不会丢失。
2.副本机制:Kafka提供了副本机制来确保数据的安全性。每个分区都有多个副本,分布在不同的Broker上。如果某个Broker发生故障,其他Broker上的副本可以继续提供服务,确保消息不会丢失。
3.确认机制:Kafka提供了生产者和消费者的确认机制。生产者在发送消息时可以选择是否等待Broker的确认,而消费者可以在消费消息后提交偏移量。这种确认机制可以确保消息被成功发送和消费,避免消息丢失。
4.事务:Kafka支持事务,这意味着在消息被写入磁盘之前,生产者或消费者可以提交或回滚事务。这种机制可以确保消息的原子性操作,避免因故障导致的数据不一致问题。
5.消息压缩:Kafka支持消息压缩,可以有效地减少存储空间的使用,同时还可以在消费时自动解压缩。这种机制可以在保证消息不丢失的同时节省存储空间。
🐓 Kafka如何保证消费的幂等性
Kafka保证消费的幂等性主要依赖于以下几个机制:
1.唯一偏移量:Kafka为每个分区维护了一个唯一的偏移量,表示消费者在该分区中的消费进度。消费者在消费消息时,会根据偏移量来确定从哪个位置开始读取消息。如果消费者在消费过程中发生故障,再次启动时可以从上次提交的偏移量继续消费,确保不会重复消费已处理过的消息。
2.幂等性协议:Kafka消费者使用了幂等性协议来处理消息。这个协议确保了无论消费者处理消息的顺序如何,最终的结果都是一致的。这意味着即使有多个消费者实例同时消费同一个分区的消息,每个消息只会被处理一次,不会出现重复处理的情况。
3.事务:Kafka支持事务,确保了生产者发送消息和消费者消费消息的操作是原子的。这意味着在一个事务中发送或消费的消息要么全部成功,要么全部失败,不会出现部分成功的情况。这种原子性操作有助于保证消息处理的幂等性。
4.去重机制:Kafka引入了去重机制来避免重复处理消息。消费者在消费消息时,可以根据唯一标识符来判断是否已经处理过该消息,从而避免重复处理。这种去重机制可以有效地防止因网络故障或消费者故障导致的数据重复问题。
🐓 Kafka慢的时候是什么原因
导致Kafka速度慢的原因有
1.硬件限制:如果Kafka集群的硬件资源不足,例如CPU、内存、磁盘或网络带宽等,可能会导致性能瓶颈,影响消息的写入和读取速度。
2.生产者发送速度:如果生产者发送消息的速度过快,超过了Kafka集群的处理能力,会导致消息堆积,进而影响消费者消费速度。
3.消费者处理速度:如果消费者的处理速度较慢,无法跟上Kafka集群的处理速度,也会导致消息堆积,进而影响消费速度。
4.网络延迟:如果生产者和消费者之间的网络延迟较大,或者网络带宽有限,会导致消息传输速度变慢,进而影响消费速度。
5.Kafka配置:Kafka的配置参数对性能也有很大影响。例如,如果分区数量配置不当,或者日志文件大小设置不合理,都可能导致性能问题。
6.数据压缩:Kafka支持数据压缩,但如果压缩率设置得过高,会导致CPU占用率增加,进而影响性能。
7.磁盘I/O限制:如果Kafka的数据存储在HDD上,磁盘I/O可能会成为性能瓶颈。如果使用SSD,可以显著提高I/O性能。
🐓 Kafka和Rabbitmq区别
Kafka和RabbitMQ是两种广泛使用的消息队列系统,它们在设计和使用上有一些显著的区别。
1.语言与平台:Kafka主要由Scala语言开发,主要用于处理活跃的流式数据和大数据量的数据处理。而RabbitMQ则由Erlang语言开发,主要应用于实时、对可靠性要求较高的消息传递场景。
2.吞吐量与延迟:Kafka在吞吐量和延迟方面表现优秀。它内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量大。而RabbitMQ在吞吐量方面可能不如Kafka,且对于某些低延迟场景可能不太适合。
3.可靠性:RabbitMQ通过消息持久化、镜像队列和消息确认机制来保证消息的可靠性。而Kafka也提供了消息持久化和复制机制来保证可靠性,但它的设计重点更多在于吞吐量和数据流处理。
4.使用与维护:RabbitMQ使用起来相对复杂一些,它需要额外的配置和安装一些插件才能提供完整的功能。而Kafka则相对简单,一般只需要简单的配置就可以使用。
5.社区与支持:RabbitMQ有庞大的用户群和活跃的社区,对于遇到的问题可以快速得到解决。虽然Kafka的社区也很大,但在某些行业和领域中,RabbitMQ的使用者可能比Kafka更多。
6.功能与特性:RabbitMQ提供了更多的特性和功能,如消息确认机制、死信队列、优先级队列等。而Kafka则主要专注于数据流处理和高吞吐量,提供了简单而高效的发布-订阅模型。
🐓 如果让你写一个消息队列,该如何进行架构设计
面试官心理分析
1.你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个mq的架构原理
2.看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来
类似问题
如果让你来设计一个spring框架你会怎么做?如果让你来设计一个dubbo框架你会怎么做?如果让你来设计一个mybatis框架你会怎么做?
回答思路
1.首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统
2.其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的。
3.其次你考虑一下你的mq的可用性啊?
4.能不能支持数据0丢失啊?
面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。
ElasticSearch
🐓 ElasticSearch 怎么解决中文分词
Elasticsearch是一个基于Lucene的搜索引擎。它提供了具有HTTP Web界面和无架构JSON文档的分布式,多租户能力的全文搜索引擎。Elasticsearch是用Java开发的,根据Apache许可条款作为开源发布。
采用IK中文分词器插件进行中文分词处理。
🐓 Elasticsearch中的倒排索引是什么
倒排索引:
根据字面意思可以知道他和正序索引是反的。在搜索引擎中每个文件都对应一个文件ID,文件内容被表示为一系列关键词的集合(文档要除去一些无用的词,比如’的’这些,剩下的词就是关键词,每个关键词都有自己的ID)。例如“文档1”经过分词,提取了3个关键词,每个关键词都会记录它所在在文档中的出现频率及出现位置。
那么上面的文档及内容构建的倒排索引结果会如下图(注:这个图里没有记录展示该词在出现在哪个文档的具体位置)
如何来查询呢?
比如我们要查询‘搜索引擎’这个关键词在哪些文档中出现过。首先我们通过倒排索引可以查询到该关键词出现的文档位置是在1和3中;然后再通过正排索引查询到文档1和3的内容并返回结果。
Minio
🐓 Minio和传统存储的区别
Minio是一款高性能、分布式的对象存储系统,而传统存储通常采用集中式的架构,将所有的数据存储在一个中心化的存储设备中。以下是Minio和传统存储的主要区别:
1.设计目标:Minio一开始就针对性能要求更高的私有云标准进行软件架构设计,它采用了更易用的方式进行设计,能实现对象存储所需要的全部功能,在性能上也更加强劲,不会为了更多的业务功能而妥协,失去Minio的易用性、高效性。而传统的存储系统则更多是为了满足数据存储和读取的需求。
2.可扩展性:Minio采用分布式的架构,将数据分散存储在多个节点上,这种分布式的设计使得Minio具有更高的可扩展性和可靠性,可以轻松处理大规模的数据集。传统存储系统在面对大规模数据时,可能会遇到扩展性的限制。
3.应用场景:Minio最适合存储非结构化数据,如照片、视频、日志文件、备份和容器/VM映像。同时,它在机器学习、大数据、私有云、混合云等方面的存储技术上也独树一帜。而传统存储系统在处理这些非结构化数据时可能没有Minio表现得那么出色。
4.灵活性:Minio更简单的实现具有弹性伸缩能力的原生对象存储服务。它既可以在X86等低成本机器上运行,也可以在更高级的硬件上运行,表现出良好的灵活性。相比之下,传统存储系统的运行环境可能更加受限。
🐓 Minio的优点
Minio是一个高性能、分布式的对象存储系统,具有以下优点:
1.高性能:Minio在标准硬件上实现了高吞吐量和低延迟的对象存储服务。它采用了分布式的架构设计,将数据分散存储在多个节点上,从而实现数据的并行处理和高速传输。这种设计使得Minio具有出色的读写性能和低延迟,能够满足对大规模数据处理和高并发访问的需求。
2.可扩展性:Minio的分布式架构使得它可以轻松地扩展存储容量和处理能力。通过添加新的节点,用户可以线性地扩展Minio集群的存储容量和吞吐量。这种可扩展性使得Minio适用于处理大规模数据集和应对不断增长的存储需求。
3.数据保护:Minio提供了多种数据保护机制,包括数据冗余、故障转移和数据校验等。它使用分布式的冗余机制将数据复制到多个节点上,以防止数据丢失。同时,Minio还支持数据校验和恢复功能,可以检测和修复存储中的数据错误。这些保护机制确保了数据的可靠性和完整性。
4.兼容性:Minio使用S3兼容的API,这是亚马逊S3对象存储服务使用的标准接口。这意味着用户可以直接使用现有的S3工具和应用程序与Minio进行集成。S3接口提供了简单、灵活的数据访问方式,可以通过HTTP协议直接上传、下载和管理对象。这种兼容性使得Minio成为与云原生应用程序和工具集成的理想选择。
5.简单易用:Minio的安装部署非常简单,只需要几行命令即可完成。同时,Minio提供了丰富的文档和社区支持,使得用户可以快速上手并解决遇到的问题。Minio的UI界面也很直观,提供了友好的用户界面来进行管理和监控。
6.容器化支持:Minio符合一切原生云计算的架构和构建过程,并且包含最新的云计算的全新的技术和概念。其中包括支持Kubernetes 、Docker、微服和多租户的的容器技术。
7.可定制化:Minio提供了多种配置选项,可以根据实际需求进行定制化配置,以满足不同的使用场景。例如,可以调整副本数量、磁盘空间大小等参数来优化存储性能和成本。
8.社区支持:Minio有一个活跃的社区,社区成员来自世界各地。这个社区提供了丰富的资源和支持,包括文档、教程、案例、讨论等,可以帮助用户解决遇到的问题并获取帮助。
Zookeeper
🐓 ZooKeeper是什么
ZooKeeper是一个开放源码的分布式协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等功能。
Zookeeper保证了如下分布式一致性特性:
1.顺序一致性
2.原子性
3.单一视图
4.可靠性
5.实时性(最终一致性)
客户端的读请求可以被集群中的任意一台机器处理,如果读请求在节点上注册了监听器,这个监听器也是由所连接的zookeeper机器来处理。对于写请求,这些请求会同时发给其他zookeeper机器并且达成一致后,请求才会返回成功。因此,随着zookeeper的集群机器增多,读请求的吞吐会提高但是写请求的吞吐会下降。
有序性是zookeeper中非常重要的一个特性,所有的更新都是全局有序的,每个更新都有一个唯一的时间戳,这个时间戳称为zxid(Zookeeper Transaction Id)。而读请求只会相对于更新有序,也就是读请求的返回结果中会带有这个zookeeper最新的zxid。
🐓 Zookeeper Watcher 机制
Zookeeper允许客户端向服务端的某个Znode注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher通知状态和事件类型做出业务上的改变。
工作机制:
1.客户端注册watcher
2.服务端处理watcher
3.客户端回调watcher
Watcher特性总结:
1.一次性
无论是服务端还是客户端,一旦一个Watcher被触发,Zookeeper都会将其从相应的存储中移除。这样的设计有效的减轻了服务端的压力,不然对于更新非常频繁的节点,服务端会不断的向客户端发送事件通知,无论对于网络还是服务端的压力都非常大。
2.客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程。
3.轻量
Watcher通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。
客户端向服务端注册Watcher的时候,并不会把客户端真实的Watcher对象实体传递到服务端,仅仅是在客户端请求中使用boolean类型属性进行了标记。
watcher event异步发送watcher的通知事件从server发送到client是异步的,这就存在一个问题,不同的客户端和服务器之间通过socket进行通信,由于网络延迟或其他因素导致客户端在不通的时刻监听到事件,由于Zookeeper本身提供了ordering guarantee,即客户端监听事件后,才会感知它所监视znode发生了变化。所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。
注册watcher getData、exists、getChildren
触发watcher create、delete、setData
当一个客户端连接到一个新的服务器上时,watch将会被以任意会话事件触发。当与一个服务器失去连接的时候,是无法接收到watch的。而当client重新连接时,如果需要的话,所有先前注册过的watch,都会被重新注册。通常这是完全透明的。只有在一个特殊情况下,watch可能会丢失:对于一个未创建的znode的exist watch,如果在客户端断开连接期间被创建了,并且随后在客户端连接上之前又删除了,这种情况下,这个watch事件可能会被丢失。
🐓 ZooKeeper分布式锁的实现原理
使用zookeeper创建临时序列节点来实现分布式锁,适用于顺序执行的程序,大体思路就是创建临时序列节点,找出最小的序列节点,获取分布式锁,程序执行完成之后此序列节点消失,通过watch来监控节点的变化,从剩下的节点的找到最小的序列节点,获取分布式锁,执行相应处理。