🍊 Redis与数据库的数据一致性
关于redis与数据库的数据一致性,业界使用最多的是数据同步问题(双删策略)
🎉 双删策略
先更新数据库,再更新缓存;
同时有请求A和请求B进行更新操作,那么会出现:
- 线程A更新了数据库;
- 线程B更新了数据库;
- 线程B更新了缓存;
- 线程A更新了缓存;
缺点
这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑!
如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。
先删除缓存,再更新数据库;
同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:
(1)请求A进行写操作,删除缓存;
(2)请求B查询发现缓存不存在;
(3)请求B去数据库查询得到旧值;
(4)请求B将旧值写入缓存;
(5)请求A将新值写入数据库;
导致数据不一致的情形出现,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。
🎉 延时双删策略
解决方案:延时双删策略
(1)先淘汰缓存;
(2)再写数据库(这两步和原来一样);
(3)休眠1秒,再次淘汰缓存;
这么做,可以将1秒内所造成的缓存脏数据,再次删除!这个一秒如何得出来的呢?评估自己的项目的读数据业务逻辑的耗时,在读数据业务逻辑的耗时基础上,加几百ms即可,确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
MySQL的读写分离架构中
一个请求A进行更新操作,另一个请求B进行查询操作。
(1)请求A进行写操作,删除缓存;
(2)请求A将数据写入数据库了;
(3)请求B查询缓存发现,缓存没有值;
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值;
(5)请求B将旧值写入缓存;
(6)数据库完成主从同步,从库变为新值; 导致数据不一致,解决方案使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。
采用这种同步淘汰策略,吞吐量降低怎么办? ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。
🎉 异步延时删除策略
先更新数据库,再删除缓存; 一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生:
(1)缓存刚好失效;
(2)请求A查询数据库,得一个旧值;
(3)请求B将新值写入数据库;
(4)请求B删除缓存;
(5)请求A将查到的旧值写入缓存;
问题:会发生脏数据,但是几率不大,因为步骤(3)的写数据库操作比步骤(2)的读数据库操作耗时更短,才有可能使得步骤(4)先于步骤(5)。可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(3)耗时比步骤(2)更短,这一情形很难出现。
如何解决脏数据呢?给缓存设有效时间是一种方案。其次,采用策略2(先删除缓存,再更新数据库)里给出的异步延时删除策略,保证读请求完成以后,再进行删除操作。
第二次删除,如果删除失败怎么办? 这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库: (1)请求A进行写操作,删除缓存; (2)请求B查询发现缓存不存在; (3)请求B去数据库查询得到旧值; (4)请求B将旧值写入缓存; (5)请求A将新值写入数据库; (6)请求A试图去删除请求B写入对缓存值,结果失败了;ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。
解决方案一
(1)更新数据库数据;
(2)缓存因为种种问题删除失败;
(3)将需要删除的key发送至消息队列;
(4)自己消费消息,获得需要删除的key;
(5)继续重试删除操作,直到成功; 缺点:对业务线代码造成大量的侵入
解决方案二: 启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。
(1)更新数据库数据;
(2)数据库会将操作信息写入binlog日志当中;
(3)订阅程序提取出所需要的数据以及key;
(4)另起一段非业务代码,获得该信息;
(5)尝试删除缓存操作,发现删除失败;
(6)将这些信息发送至消息队列;
(7)重新从消息队列中获得该数据,重试操作;
订阅binlog程序在mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能。重试机制,采用的是消息队列的方式。如果对一致性要求不是很高,直接在程序中另起一个线程,每隔一段时间去重试。
🍊 Redis分布式锁底层实现
🎉 如何实现
redis使用setnx作为分布式锁,在多线程环境下面,只有一个线程会拿到这把锁,拿到锁的线程执行业务代码,执行业务代码需要一点时间,所以这段时间拒绝了很多等待获取锁的请求,直到有锁的线程最后释放掉锁,其他线程才能获取锁,这个就是redis的分布式锁的使用。
🎉 使用redis锁会有很多异常情况,如何处理这些异常呢
📝 1.redis服务挂掉了,抛出异常了,锁不会被释放掉,新的请求无法进来,出现死锁问题
添加try finally处理
📝 2.服务器果宕机了,导致锁不能被释放的现象
设置超时时间
📝 3.锁的过期时间比业务执行时间短,会存在多个线程拥有同一把锁的现象
如果有一个线程执行需要15s,过期时间只有10s,当执行到10s时第二个线程进来拿到这把锁,会出现多个线程拿到同一把锁执行。
续期超时时间,当一个线程执行5s后对超时时间续期10s,续期设置可以借助redission工具,加锁成功,后台新开一个线程,每隔10秒检查是否还持有锁,如果持有则延长锁的时间,如果加锁失败一直循环(自旋)加锁。
📝 4.锁的过期时间比业务执行时间短,锁永久失效
如果有一个线程执行需要15s,过期时间只有10s,当执行到10s时第二个线程进来拿到这把锁,会出现多个线程拿到同一把锁执行,在第一个线程执行完时会释放掉第二个线程的锁,以此类推,导致锁的永久失效。
给每个线程都设置一个唯一标识,避免出现程序执行的时间超过设置的过期时间,导致其他线程删除了自己的锁,只允许自己删除自己线程的锁
🍊 Redis热点数据缓存
热点数据缓存
当前key是一个热点key(例如一个热门的娱乐新闻),并发量非常大重建缓存不能在短时间完成,可能是一个复杂计算,例如复杂的SQL、多次IO、多个依赖等在缓存失效的瞬间,有大量线程来重建缓存,造成后端负载加大,甚至可能会让应用崩溃。
🎉 互斥锁(mutex)
解决方案一:互斥锁(mutex)
只允许一个线程重建缓存,其他线程等待重建缓存的线程执行完,重新从缓存获取数据。
1)从Redis获取数据,如果值不为空,则直接返回值;否则执行下面的2.1)和2.2)步骤 2.1)如果set(nx和ex)结果为true,说明此时没有其他线程重建缓存, 那么当前线程执行缓存构建逻辑 2.2)如果set(nx和ex)结果为false,说明此时已经有其他线程正在执 行构建缓存的工作,那么当前线程将休息指定时间(例如这里是50毫秒,取决于构建缓存的速度)后,重新执行函数,直到获取到数据。
优缺点:如果构建缓存过程出现问题或者时间较长,可能会存在死锁和线程池阻塞的风险,但是这种方法能够较好地降低后端存储负载,并在一致性上做得比较好。
🎉 永远不过期
解决方案二:永远不过期
从缓存层面来看,确实没有设置过期时间,所以不会出现热点key过期 后产生的问题,也就是“物理”不过期。从功能层面来看,为每个value设置一个逻辑过期时间,当发现超过逻 辑过期时间后,会使用单独的线程去构建缓存。
优缺点:由于没有设置真正的过期时间,实际上已经不存在热点key产生的一系列危害,但是会存在数据不一致的情况,同时代码复杂度会增大。
问题:怎么知道哪些数据是热点数据?因为本地缓存资源有限,不可能把所有的商品数据进行缓存,它只会缓存热点的数据。那怎么知道数据是热点数据呢?
利用redis4.x自身特性,LFU机制发现热点数据。实现很简单,只要把redis内存淘汰机制设置为allkeys-lfu或者volatile-lfu方式,再执行
./redis-cli --hotkeys
会返回访问频率高的key,并从高到底的排序,在设置key时,需要把商品id带上,这样就是知道是哪些商品了。
🍊 高并发
单机的 Redis,能够承载的 QPS大概就在上万到几万不等。对于缓存来说,一般都是用来支撑读高并发的。因此架构做成主从(master-slave)架构,一主多从,主负责写,并且将数据复制到其它的slave 节点,从节点负责读。所有的读请求全部走从节点。这样也可以很轻松实现水平扩容,支撑读高并发。
🍊 高可用
Redis哨兵集群实现高可用,哨兵是一个分布式系统,你可以在一个架构中运行多个哨兵进程,这些进程使用流言协议来接收关于主节点是否下线的信息,并使用投票协议来决定是否执行自动故障迁移,以及选择哪个备节点作为新的主节点。每个哨兵会向其它哨兵、主节点、备节点定时发送消息,以确认对方是否”活”着,如果发现对方在指定时间(可配置)内未回应,则暂时认为对方已挂.若“哨兵群”中的多数哨兵,都报告某一主节点没响应,系统才认为该主节点"彻底死亡",通过算法,从剩下的备节点中,选一台提升为主节点,然后自动修改相关配置。
🍊 哨兵机制
哨兵是一个分布式系统,你可以在一个架构中运行多个哨兵进程,这些进程使用流言协议来接收关于主节点是否下线的信息,并使用投票协议来决定是否执行自动故障迁移,以及选择哪个备节点作为新的主节点。每个哨兵会向其它哨兵、主节点、备节点定时发送消息,以确认对方是否”活”着,如果发现对方在指定时间(可配置)内未回应,则暂时认为对方已挂。
若“哨兵群”中的多数哨兵,都报告某一主节点没响应,系统才认为该主节点"彻底死亡",通过算法,从剩下的备节点中,选一台提升为主节点,然后自动修改相关配置。可以通过修改sentinel.conf配置文件,配置主节点名称,IP,端口号,选举次数,主服务器的密码,心跳检测毫秒数,做多少个节点等。
🎉 Redis 哨兵主备切换的数据丢失问题
📝 异步复制导致的数据丢失
master->slave 的复制是异步的,所以可能有部分数据还没复制到 slave,master 就宕机了,此时这部分数据就丢失了。 脑裂导致的数据丢失:某个 master 所在机器突然脱离了正常的网络,跟其他 slave 机器不能连接,但是实际上 master还运行着。此时哨兵可能就会认为 master 宕机了,然后开启选举,将其他 slave 切换成了 master。这个时候,集群里就会有两个master ,也就是所谓的脑裂。 此时虽然某个 slave 被切换成了 master,但是可能 client 还没来得及切换到新的master,还继续向旧 master 写数据。因此旧 master 再次恢复的时候,会被作为一个 slave 挂到新的 master上去,自己的数据会清空,重新从新的 master 复制数据。而新的 master 并没有后来 client写入的数据,因此,这部分数据也就丢失了
解决方案:
进行配置:min-slaves-to-write 1 min-slaves-max-lag 10
通过配置至少有 1 个 slave,数据复制和同步的延迟不能超过 10 秒,超过了master 就不会再接收任何请求了。
减少异步复制数据的丢失
一旦 slave 复制数据和 ack 延时太长,就认为可能 master 宕机后损失的数据太多了,那么就拒绝写请求,这样可以把 master宕机时由于部分数据未同步到 slave 导致的数据丢失降低的可控范围内。 减少脑裂的数据丢失:如果一个 master 出现了脑裂,跟其他slave 丢了连接,如果不能继续给指定数量的slave 发送数据,而且 slave 超过10 秒没有给自己ack消息,那么就直接拒绝客户端的写请求。因此在脑裂场景下,最多就丢失10 秒的数据。
🍊 集群模式
数据量很少的情况下,比如你的缓存一般就几个 G,单机就足够了,可以使用 replication,一个 master 多个 slaves,要几个 slave 跟你要求的读吞吐量有关,然后自己搭建一个 sentinel 集群去保证 Redis 主从架构的高可用性。
海量数据+高并发+高可用的场景的情况下,使用Redis cluster ,自动将数据进行分片,每个 master 上放一部分数据,它支撑 N个 Redis master node,每个 master node 都可以挂载多个 slave node。 这样整个 Redis就可以横向扩容了,如果你要支撑更大数据量的缓存,那就横向扩容更多的 master 节点,每个 master节点就能存放更多的数据了。而且部分 master 不可用时,还是可以继续工作的。
在 Redis cluster 架构下,使用cluster bus 进行节点间通信,用来进行故障检测、配置更新、故障转移授权。cluster bus 用了一种二进制的协议, gossip 协议,用于节点间进行高效的数据交换,占用更少的网络带宽和处理时间。
🎉 集群协议
集群元数据的维护:集中式、Gossip 协议
📝 集中式
集中式是将集群元数据(节点信息、故障等等)几种存储在某个节点上。集中式元数据集中存储的一个典型代表,就是大数据领域的 storm。它是分布式的大数据实时计算引擎,是集中式的元数据存储的结构,底层基于zookeeper对所有元数据进行存储维护。集中式的好处在于,元数据的读取和更新,时效性非常好,一旦元数据出现了变更,就立即更新到集中式的存储中,其它节点读取的时候就可以感知到;不好在于,所有的元数据的更新压力全部集中在一个地方,可能会导致元数据的存储有压力。
📝 gossip 协议
gossip 协议,所有节点都持有一份元数据,不同的节点如果出现了元数据的变更,就不断将元数据发送给其它的节点,让其它节点也进行元数据的变更。gossip好处在于,元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续打到所有节点上去更新,降低了压力;不好在于,元数据的更新有延时,可能导致集群中的一些操作会有一些滞后。
在 Redis cluster 架构下,每个节点都有一个专门用于节点间通信的端口,就是自己提供服务的端口号+10000,每个 Redis 要放开两个端口号,比如 7001,那么用于节点间通信的就是 17001 端口,17001端口号是用来进行节点间通信的,也就是 cluster bus 的东西。每个节点每隔一段时间都会往另外几个节点发送 ping 消息,同时其它几个节点接收到 ping 之后返回 pong 。
🍊 多级缓存架构
🍊 并发竞争
Redis 的并发竞争问题是什么?如何解决这个问题?了解 Redis 事务的 CAS 方案吗?
多客户端同时并发写一个 key,可能本来应该先到的数据后到了,导致数据版本错了;或者是多客户端同时获取一个 key,修改值之后再写回去,只要顺序错了,数据就错了。
CAS 类的乐观锁方案:某个时刻,多个系统实例都去更新某个 key。可以基于 zookeeper 实现分布式锁。每个系统通过 zookeeper 获取分布式锁,确保同一时间,只能有一个系统实例在操作某个 key,别人都不允许读和写。
你要写入缓存的数据,都是从 mysql 里查出来的,都得写入 mysql 中,写入 mysql 中的时候必须保存一个时间戳,从 mysql 查出来的时候,时间戳也查出来。每次要写之前,先判断一下当前这个 value 的时间戳是否比缓存里的 value 的时间戳要新。如果是的话,那么可以写,否则,就不能用旧的数据覆盖新的数据。
🍊 Redis cluster 的高可用与主备切换原理
如果一个节点认为另外一个节点宕机,这是属于主观宕机。如果多个节点都认为另外一个节点宕机了,那么就是客观宕机,跟哨兵的原理几乎一样,sdown,odown。流程为:如果一个节点认为某个节点pfail 了,那么会在 gossip ping 消息中, ping 给其他节点,如果超过半数的节点都认为 pfail 了,那么就会变成fail 。 每个从节点,都根据自己对 master 复制数据的 offset,来设置一个选举时间,offset越大(复制数据越多)的从节点,选举时间越靠前,优先进行选举。所有的 master node 开始 slave 选举投票,给要进行选举的slave 进行投票,如果大部分 master node (N/2 + 1) 都投票给了某个从节点,那么选举通过,那个从节点可以切换成master。从节点执行主备切换,从节点切换为主节点。
🍊 主从架构下的数据同步
🎉 主从复制/数据同步
master会启动一个后台线程,开始生成一份RDB快照文件,同时还会将从客户端收到的所有写命令缓存在内存中。RDB文件生成完毕之后,master会将这个RDB发送给slave,slave会先写入本地磁盘,然后再从本地磁盘加载到内存中。然后master会将内存中缓存的写命令发送给slave,slave也会同步这些数据。
🎉 主从架构下的数据部分复制(断点续传)
当redis是主从架构时,主节点同步数据到从节点进行持久化,这个过程可能会因为网络/IO等原因,导致连接中断,当主节点和从节点断开重连后,一般都会对整份数据进行复制,这个过程是比较浪费性能的。从redis2.8版本开始,redis改用可以支持部分数据复制的命令去主节点同步数据,主节点会在内存中创建一个复制数据用的缓存队列,缓存最近一段时间的数据,主节点和它所有的从节点都维护复制的数据下标和主节点的进程id,当网络连接断开后,从节点会请求主节点继续进行数据同步,从记录数据的下标开始同步数据。如果主节点进程id变化了,或者从节点数据下标太旧,不在主节点的缓存队列里,会进行一次全量数据的复制。
🎉 数据丢失发生的场景以及解决方案
- 异步复制导致的数据丢失:主节点到从节点的复制是异步的,主节点有部分数据还没复制到从节点,主节点就宕机了。
- 脑裂导致的数据丢失:脑裂导致的数据丢失:某个 主节点 所在机器突然脱离了正常的网络,跟其他从节点机器不能连接,但是实际上 主节点还运行着,这个时候哨兵可能就会认为 主节点 宕机了,然后开启选举,将其他从节点切换成了 主节点,集群里就会有两个主节点 ,也就是所谓的脑裂。虽然某个从节点被切换成了 主节点,但是可能 client 还没来得及切换到新的主节点,还继续向旧的主节点写数据,当旧的主节点再次恢复的时候,会被作为一个从节点挂到新的 主节点上去,自己的数据会清空,从新的主节点复制数据,新的主节点并没有后来 client写入的数据,这部分数据也就丢失了。
解决方案:
- 针对异步复制导致的数据丢失,可以通过控制复制数据的时长和ack的时间来控制,一旦从节点复制数据和 ack 延时太长,就认为可能主节点宕机后损失的数据太多了,那么就拒绝写请求,这样可以把主节点宕机时由于部分数据未同步到从节点导致的数据丢失降低的可控范围内。
- 针对脑裂导致的数据丢失:如果一个主节点出现了脑裂,跟其他从节点断了连接,如果不能继续给从节点发送数据,而且从节点超过10 秒没有给自己ack消息,那么就直接拒绝客户端的写请求,这样即便在脑裂场景下,最多就丢失10 秒的数据。在redis的配置文件里面有二个参数,min-slaves-to-write 3表示连接到master的最少slave数量,min-slaves-max-lag 10表示slave连接到master的最大延迟时间,通过这二个参数可以把数据丢失控制在承受范围以内。
🎉 主从/哨兵/集群区别
📝 主从架构
主数据库可以进行读写操作,当写操作导致数据变化的时候,会自动将数据同步给从数据库,从数据库一般是只读的,接受主数据库同步过来的数据。
📝 哨兵
当主数据库遇到异常中断服务后,需要通过手动的方式选择一个从数据库来升格为主数据库,让系统能够继续提供服务,难以实现自动化。 Redis 2.8中提供了哨兵工具来实现自动化的系统监控和故障恢复功能,哨兵的作用就是监控redis主、从数据库是否正常运行,主数据库出现故障,自动将从数据库转换为主数据库。
📝 集群
即使使用哨兵,redis每个实例也是全量存储,每个redis存储的内容都是完整的数据,浪费内存,有木桶效应。为了最大化利用内存,可以采用集群,就是分布式存储,每台redis存储不同的内容,Redis集群共有16384个槽,每个redis分得一些槽,客户端请求的key,根据公式,计算出映射到哪个分片上。
🎉 高可用/哨兵集群/主备切换
Redis哨兵集群实现高可用,哨兵是一个分布式系统,可以在一个架构中运行多个哨兵进程,这些进程使用流言协议来接收关于主节点是否下线的信息,并使用投票协议来决定是否进行自动故障迁移,选择哪个备节点作为新的主节点。每个哨兵会向其它哨兵、主节点、备节点定时发送消息,以确认对方是否”活”着,如果发现对方在指定时间内未回应,则暂时认为对方已挂。若“哨兵群”中的多数哨兵,都报告某一主节点没响应,系统才认为该主节点"彻底死亡",通过算法,从剩下的备节点中,选一台提升为主节点,然后自动修改相关配置,比如主节点名称,IP,端口号,选举次数,主服务器的密码,心跳检测毫秒数,做多少个节点等。
🌟 MQ知识点(RabbitMQ/RocketMQ/Kafka)
🍊 三种mq对比
使用消息队列有解耦,扩展性,削峰,异步等功能,市面上主流的几款mq,rabbitmq,rocketmq,kafka有各自的应用场景。kafka,有出色的吞吐量,比较强悍的性能,而且集群可以实现高可用,就是会丢数据,所以一般被用于日志分析和大数据采集。rabbitmq,消息可靠性比较高,支持六种工作模式,功能比较全面,但是由于吞吐量比较低,消息累积还会影响性能,加上erlang语言不好定制,所以一般使用于小规模的场景,大多数是中小企业用的比较多。rocketmq,高可用,高性能,高吞吐量,支持多种消息类型,比如同步,异步,顺序,广播,延迟,批量,过滤,事务等等消息,功能比较全面,只不过开源版本比不上商业版本的,加上开发这个中间件的大佬写的文档不多,文档不太全,这也是它的一个缺点,不过这个中间件可以作用于几乎全场景。
🍊 消息丢失
消息丢失,生产者往消息队列发送消息,消息队列往消费者发送消息,会有丢消息的可能,消息队列也有可能丢消息,通常MQ存盘时都会先写入操作系统的缓存页中,然后再由操作系统异步的将消息写入硬盘,这个中间有个时间差,就可能会造成消息丢失,如果服务挂了,缓存中还没有来得及写入硬盘的消息就会发生消息丢失。
不同的消息中间件对于消息丢失也有不同的解决方案,先说说最容易丢失消息的kafka吧。生产者发消息给Kafka Broker:消息写入Leader后,Follower是主动与Leader进行同步,然后发ack告诉生产者收到消息了,这个过程kafka提供了一个参数,request.required.acks属性来确认消息的生产,0表示不进行消息接收是否成功的确认,发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了。1表示当Leader接收成功时确认,只要Leader存活就可以保证不丢失,保证了吞吐量,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了。-1或者all表示Leader和Follower都接收成功时确认,可以最大限度保证消息不丢失,但是吞吐量低,降低了kafka的性能。一般在不涉及金额的情况下,均衡考虑可以使用1,保证消息的发送和性能的一个平衡。Kafka Broker 消息同步和持久化:Kafka通过多分区多副本机制,可以最大限度保证数据不会丢失,如果数据已经写入系统缓存中,但是还没来得及刷入磁盘,这个时候机器宕机,或者没电了,那就丢消息了,当然这种情况很极端。Kafka Broker 将消息传递给消费者:如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时消费者直接宕机了,未处理完的数据丢失了,下次也消费不到了。所以为了避免这种情况,需要将配置改为,先消费处理数据,然后手动提交,这样消息处理失败,也不会提交成功,没有丢消息。
rabbitmq整个消息投递的路径是producer—>rabbitmq broker—>exchange—>queue—>consumer。
生产者将消息投递到Broker时产生confirm状态,会出现二种情况,ack:表示已经被Broker签收。nack:表示表示已经被Broker拒收,原因可能有队列满了,限流,IO异常等。生产者将消息投递到Broker,被Broker签收,但是没有对应的队列进行投递,将消息回退给生产者会产生return状态。这二种状态是rabbitmq提供的消息可靠投递机制,生产者开启确认模式和退回模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。消费者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认。none自动确认模式很危险,当生产者发送多条消息,消费者接收到一条信息时,会自动认为当前发送的消息已经签收了,这个时候消费者进行业务处理时出现了异常情况,也会认为消息已经正常签收处理了,而队列里面显示都被消费掉了。所以真实开发都会改为手动签收,可以防止消息丢失。消费者如果在消费端没有出现异常,则调用channel.basicAck方法确认签收消息。消费者如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。通过一系列的操作,可以保证消息的可靠投递以及防止消息丢失的情况。
然后说一下rocketmq,生产者使用事务消息机制保证消息零丢失,第一步就是确保Producer发送消息到了Broker这个过程不会丢消息。发送half消息给rocketmq,这个half消息是在生产者操作前发送的,对下游服务的消费者是不可见的。这个消息主要是确认RocketMQ的服务是否正常,通知RocketMQ,马上要发一个消息了,做好准备。half消息如果写入失败就认为MQ的服务是有问题的,这个时候就不能通知下游服务了,给生产者的操作加上一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。然后执行本地事务,比如说下了个订单,把下单数据写入到mysql,返回本地事务状态给rocketmq,在这个过程中,如果写入数据库失败,可能是数据库崩了,需要等一段时间才能恢复,这个时候把订单一直标记为"新下单"的状态,订单的消息先缓存起来,比如Redis、文本或者其他方式,然后给RocketMQ返回一个未知状态,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,RocketMQ过一段时间来回查事务状态,在回查事务状态的时候,再尝试把数据写入数据库,如果数据库这时候已经恢复了,继续后面的业务。而且即便这个时候half消息写入成功后RocketMQ挂了,只要存储的消息没有丢失,等RocketMQ恢复后,RocketMQ就会再次继续状态回查的流程。第二步就是确保Broker接收到的消息不会丢失,因为RocketMQ为了减少磁盘的IO,会先将消息写入到os缓存中,不是直接写入到磁盘里面,消费者从os缓存中获取消息,类似于从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失。所以第二步,消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的。把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘,一旦同步刷盘返回成功,可以保证接收到的消息一定存储在本地的内存中。采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失。但是这里还会有一个问题,主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。所以在RocketMQ4.5以后的版本支持Dledge,DLedger是基于Raft协议选举Leader Broker的,当master节点挂了后,Dledger会接管Broker的CommitLog消息存储 ,在Raft协议中进行多台机器的Leader选举,发起一轮一轮的投票,通过多台机器互相投票选出来一个Leader,完成master节点往slave节点的消息同步。数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。第三步,Cunmser确保拉取到的消息被成功消费,就需要消费者不要使用异步消费,有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。用同步消费方式,消费者端先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,不再往其他消费者推送消息,在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。
🍊 消息重复消费
消息重复消费的问题
第一种情况是发送时消息重复,当一条消息已被成功发送到服务端并完成持久化,此时出现了网络抖动或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
第二种情况是投递时消息重复,消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,tMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
第三种情况是负载均衡时消息重复,比如网络抖动、Broker 重启以及订阅方应用重启,当MQ的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
那么怎么解决消息重复消费的问题呢?就是对消息进行幂等性处理。
在MQ中,是无法保证每个消息只被投递一次的,因为网络抖动或者客户端宕机等其他因素,基本都会配置重试机制,所以要在消费者端的业务上做消费幂等处理,MQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,业务上可以用这个MessageId加上业务的唯一标识来作为判断幂等的关键依据,例如订单ID。而这个业务标识可以使用Message的Key来进行传递。消费者获取到消息后先根据id去查询redis/db是否存在该消息,如果不存在,则正常消费,消费完后写入redis/db。如果存在,则证明消息被消费过,直接丢弃。
🍊 消息顺序
消息顺序的问题,如果发送端配置了重试机制,mq不会等之前那条消息完全发送成功,才去发送下一条消息,这样可能会出现发送了1,2,3条消息,但是第1条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。RocketMQ消息有序要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的分区队列,而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。而Broker中一个队列内的消息是可以保证有序的。在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据,默认不超过32条。因此也无法保证消息有序。RocketMQ 在默认情况下不保证顺序,要保证全局顺序,需要把 Topic 的读写队列数设置为 1,然后生产者和消费者的并发设置也是 1,不能使用多线程。所以这样的话高并发,高吞吐量的功能完全用不上。全局有序就是无论发的是不是同一个分区,我都可以按照你生产的顺序来消费。分区有序就只针对发到同一个分区的消息可以顺序消费。kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。RabbitMq没有属性设置消息的顺序性,不过我们可以通过拆分为多个queue,每个queue由一个consumer消费。或者一个queue对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理,保证消息的顺序性。
🍊 消息积压
线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。解决方案可以修改消费端程序,让其将收到的消息快速转发到其他主题,可以设置很多分区,然后再启动多个消费者同时消费新主题的不同分区。可以将这些消费不成功的消息转发到其它队列里去,类似死信队列,后面再慢慢分析死信队列里的消息处理问题。另外在RocketMQ官网中,还分析了一个特殊情况,如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。
🍊 延迟队列
消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。例如10分钟,内完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第10个消息时把订单回收,就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。这个就是延迟对列的应用场景。rabbittmq,rocketmq都可以通过设置ttl来设置延迟时间,kafka则是可以在发送延时消息的时候,先把消息按照不同的延迟时间段发送到指定的队列中,比如topic_1s,topic_5s,topic_10s,topic_2h,然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
mq设置过期时间,就会有消息失效的情况,如果消息在队列里积压超过指定的过期时间,就会被mq给清理掉,这个时候数据就没了。解决方案也有手动写程序,将丢失的那批数据,一点点地查出来,然后重新插入到 mq 里面去。
🍊 消息队列高可用
对于RocketMQ来说可以使用Dledger主从架构来保证消息队列的高可用,这个在上面也有提到过。然后在说说rabbitmq,它提供了一种叫镜像集群模式,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,可以在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。只不过消息需要同步到所有机器上,导致网络带宽压力和消耗很重。最后再说说kafka,它是天然的分布式消息队列,在Kafka 0.8 以后,提供了副本机制,一个 topic要求指定partition数量,每个 partition的数据都会同步到其它机器上,形成自己的多个 replica 副本,所有 replica 会选举一个 leader 出来,其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去。如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来。
🍊 RabbitMQ的工作模式
RabbitMQ 提供了 6 种工作模式,简单模式、work queues、Publish/Subscribe
发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算消息队列)
🎉 简单模式
一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
说明:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
🎉 Work queues 工作队列模式
一个生产者生产消息发送到队列里面,一个或者多个消费者从队列里面拿消息,进行消费消息。一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
说明:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。应用场景:过年过节12306抢票,发短信给用户,可以接入多个短信服务进行发送,提供任务的处理速度。
🎉 Pub/Sub 订阅模式
一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
说明:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
🎉 Routing 路由模式
一个生产者生产消息发送到交换机里面,并且指定一个路由key,队列与交换机的绑定是通过路由key进行绑定的,消费者在消费的时候需要根据路由key从交换机里面拿消息,进行消费消息。需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
说明:Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
🎉 Topics 通配符模式
一个生产者生产消息发送到交换机里面,并且使用通配符的形式(类似mysql里面的模糊查询,比如想获取一批带有item前缀的数据),队列与交换机的绑定是通过通配符进行绑定的,消费者在消费的时候需要根据根据通配符从交换机里面拿消息,进行消费消息。需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
说明:通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词。例如:Lazy.# 能够匹配 Lazy.insert.content或者 Lazy.insert,Lazy.* 只能匹配Lazy.insert。