【Redis源码】集群之分布式cluster原理(十四)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 集群之分布式cluster原理(十四)

前言:

接着上一章开始讲,上一章我们讲到如果配置一个cluster,以及cluster的基础命令。以及初始化一个cluster的流程。本章继续讲一些cluster原理。

(一)消息结构

1.1 数据包类型

以下是数据包的类型:

#define CLUSTERMSG_TYPE_PING 0          /* Ping包类型 */
#define CLUSTERMSG_TYPE_PONG 1          /* Pong包类型 (Ping的返回信息) */
#define CLUSTERMSG_TYPE_MEET 2          /* meet包类型 */
#define CLUSTERMSG_TYPE_FAIL 3          /* fail包类型 */
#define CLUSTERMSG_TYPE_PUBLISH 4       /* 发布订阅消息包类型 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* failover授权请求包 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* failover授权确认包 */
#define CLUSTERMSG_TYPE_UPDATE 7        /* update包,用于更新配置使用 */
#define CLUSTERMSG_TYPE_MFSTART 8       /* 手动failover包 */
#define CLUSTERMSG_TYPE_COUNT 9         /* 消息类型总数,用于计算边界. */

数据包类型在redis4.0中分为9总,redis5.0会多一种。其中CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST、CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK、CLUSTERMSG_TYPE_MFSTART三种包是没有包体结构的,也就是5,6,8类型。

1.2 clusterMsg消息结构:

typedefstruct {
   char sig[4];        /* Siganture "RCmb" (Redis Cluster message bus). */
   uint32_t totlen;    /* 消息总长度 */
   uint16_t ver;       /* 协议版本,当前设置为1。*/
   uint16_t port;      /* TCP端口号. */
   uint16_t type;      /* 消息类型 */
   uint16_t count;     /* data中的gossip session个数。(只在发送MEET、PING和PONG这三种消息时使用) */
   uint64_t currentEpoch;  /* 消息发送者纪元 */
   uint64_t configEpoch;   /* 如果消息发送者是一个主节点,那么该项为消息发送者配置纪元。
                             如果消息发送者是一个从节点,那么该项为发送者正在复制的主节点纪元。*/

   uint64_t offset;                        /* 复制偏移量. */
   char sender[CLUSTER_NAMELEN];           /* 发送节点名称 */
   unsignedchar myslots[CLUSTER_SLOTS/8]; /*消息发送者目前的槽指派信息*/
   char slaveof[CLUSTER_NAMELEN];          /*发送方如果是从,对应主的名称。*/
   char myip[NET_IP_STR_LEN];              /* 发送人IP地址,如果不是全部为0. */
   char notused1[34];                      /* 34 字节保留字节 */
   uint16_t cport;                         /* 发送方集群TCP 端口 */
   uint16_t flags;                         /* 发送人节点标志 */
   unsignedchar state;                    /* 消息发送者所在集群的状态 */
   unsignedchar mflags[3];                /* 消息标志: CLUSTERMSG_FLAG[012]_... */
   union clusterMsgData data;              /* 消息包内容 */
} clusterMsg;

clusterMsg结构中data对应包的内容,对应clusterMsgData结构这个是一个union结构。包的类型会根据type的不同映射对应的结构。

1.3 clusterMsgData结构:

union clusterMsgData {
   /* PING, MEET和 PONG包内容, 包是一个clusterMsgDataGossip结构数组。*/
   struct {
     
       clusterMsgDataGossip gossip[1];
   } ping;

   /* FAIL包内容 */
   struct {
       clusterMsgDataFail about;
   } fail;

   /* PUBLISH包内容 */
   struct {
       clusterMsgDataPublish msg;
   } publish;

   /* UPDATE包内容 */
   struct {
       clusterMsgDataUpdate nodecfg;
   } update;
};

clusterMsgData 结构体包含ping、fail、publish、update四种结构。其中ping结构提供给三种类型包使用,分别是ping、meet和pong。

1.4 clusterMsgDataGossip结构:

typedefstruct {
   char nodename[CLUSTER_NAMELEN];  //节点名称
   uint32_t ping_sent;              //发送ping时间
   uint32_t pong_received;          //返回pong时间
   char ip[NET_IP_STR_LEN];    /* 节点ip地址 */
   uint16_t port;              /* 节点端口 */
   uint16_t cport;             /* 节点监听集群端口 */
   uint16_t flags;             /* 节点状态 node->flags copy */
   uint32_t notused1;         //预留
} clusterMsgDataGossip;

clusterMsgDataGossip结构结构涵盖三种包格式:

1)ping包格式

ping包是一个心跳包,是redis集群中每个节点通过心跳包可以知道其他节点的当前状态并且保存到本节点状态中。

2)pong包格式

pong包是接收到ping包或者是meet包之后作为回复包类型。当进行主从切换之后,新的主节点会向集群中的所有节点直接发送一个pong包,通知从切换后节点角色的转换。

3)meet包格式

当执行cluster meet 命令之后,执行端会向ip:port指定的地址发送meet包。

1.5 clusterMsgDataFail结构:

typedefstruct {
   char nodename[CLUSTER_NAMELEN];  //节点名称
} clusterMsgDataFail;

clusterMsgDataFail用于fail包,fail包用来通知集群中某个节点处于故障状态。

1.6 clusterMsgDataPublish结构:

typedefstruct {
   uint32_t channel_len;        //渠道名称长度
   uint32_t message_len;        //消息长度
   unsignedchar bulk_data[8];  //渠道和消息内容
} clusterMsgDataPublish;

clusterMsgDataPublish结构用于发布/订阅包使用。当向集群中任意一个节点发送publish信息后,该节点会向集群中所有节点广播一条publish包。

1.7 clusterMsgDataUpdate结构:

typedefstruct {
   uint64_t configEpoch;                 /* 配置纪元. */
   char nodename[CLUSTER_NAMELEN];       /* 节点名称. */
   unsignedchar slots[CLUSTER_SLOTS/8]; /* 服务的slots */
} clusterMsgDataUpdate;

clusterMsgDataUpdate结构主要用于update包,update包用于更新集群节点中的配置。

(二)数据迁移

2.1 操作数据迁移命令

#将本节点的槽 slot 迁移到 node_id 指定的节点中
CLUSTER SETSLOT  MIGRATING

#从 node_id 指定的节点中导入槽 slot 到本节点
CLUSTER SETSLOT  IMPORTING

2.2数据迁移原理

如果A节点操作CLUSTER SETSLOT命令迁移10000槽到B节点,此时A、B两个节点都会存在10000槽。

typedefstructclusterState {
   //...省略
   clusterNode *migrating_slots_to[CLUSTER_SLOTS];   //迁移槽列表
   clusterNode *importing_slots_from[CLUSTER_SLOTS]; //导入槽列表
   clusterNode *slots[CLUSTER_SLOTS];                //当前槽列表
   //...省略
}

clusterState中分别存放三种槽的列表:迁移槽列表、导入槽列表、当前槽列表。

2.3数据迁移源码分析

voidclusterCommand(client *c) {
   //。。。省略
   int slot;
   clusterNode *n;

   if (nodeIsSlave(myself)) { //判断是否为从
       addReplyError(c,"Please use SETSLOT only with masters.");
       return;
   }
   
   if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return; //获得槽,没有则返回
   
   if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
       if (server.cluster->slots[slot] != myself) { //判读槽必须是当前节点
           addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
           return;
       }
       if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { //获取槽
           addReplyErrorFormat(c,"I don't know about node %s",
               (char*)c->argv[4]->ptr);
           return;
       }
       server.cluster->migrating_slots_to[slot] = n;  //设置迁移槽
   } elseif (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
       if (server.cluster->slots[slot] == myself) { //判读槽不能是当前节点
           addReplyErrorFormat(c,
               "I'm already the owner of hash slot %u",slot);
           return;
       }
       if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
           addReplyErrorFormat(c,"I don't know about node %s",
               (char*)c->argv[3]->ptr);
           return;
       }
       server.cluster->importing_slots_from[slot] = n;  //设置倒入槽
   }
}

migrating:满足条件

1)必须是master;

2)必须槽为0 ~ 16383数字;

3)必须槽是本节点的;

importing:满足条件

1)必须是master;

2)必须槽为0 ~ 16383数字;

3)必须槽不是本节点的;

总结:

1.redis4.0中有9种消息包类型,redis5.0种有10种消息包类型。其中涉及故障转移的三种包状态是没有包结构的。

2.消息包类型是通过clusterMsg结构中的type去映射不同的data包结构体。由于data对应的clusterMsgData 结构体是一个union结构。

3. migrating迁移槽时,需满足几个条件。必须是master,必须槽是本节点的。

4. importing倒入槽时,需满足几个条件。必须是master,必须槽是其他节点的。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
1
分享
相关文章
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
分布式爬虫框架Scrapy-Redis实战指南
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
486 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
1月前
|
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
182 83
|
13天前
|
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁
Redisson 的看门狗机制是解决分布式锁续期问题的核心功能。当通过 `lock()` 方法加锁且未指定租约时间时,默认启用 30 秒的看门狗超时时间。其原理是在获取锁后创建一个定时任务,每隔 1/3 超时时间(默认 10 秒)通过 Lua 脚本检查锁状态并延长过期时间。续期操作异步执行,确保业务线程不被阻塞,同时仅当前持有锁的线程可成功续期。锁释放时自动清理看门狗任务,避免资源浪费。学习源码后需注意:避免使用带超时参数的加锁方法、控制业务执行时间、及时释放锁以优化性能。相比手动循环续期,Redisson 的定时任务方式更高效且安全。
62 24
【📕分布式锁通关指南 07】源码剖析redisson利用看门狗机制异步维持客户端锁
|
9天前
【📕分布式锁通关指南 08】源码剖析redisson可重入锁之释放及阻塞与非阻塞获取
本文深入剖析了Redisson中可重入锁的释放锁Lua脚本实现及其获取锁的两种方式(阻塞与非阻塞)。释放锁流程包括前置检查、重入计数处理、锁删除及消息发布等步骤。非阻塞获取锁(tryLock)通过有限时间等待返回布尔值,适合需快速反馈的场景;阻塞获取锁(lock)则无限等待直至成功,适用于必须获取锁的场景。两者在等待策略、返回值和中断处理上存在显著差异。本文为理解分布式锁实现提供了详实参考。
45 11
【📕分布式锁通关指南 08】源码剖析redisson可重入锁之释放及阻塞与非阻塞获取
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
30 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
Redis原理—5.性能和使用总结
本文详细探讨了Redis的阻塞原因、性能优化、缓存相关问题及数据库与缓存的一致性问题。同时还列举了不同缓存操作方案下的并发情况,帮助读者理解并选择合适的缓存管理策略。最终得出结论,在实际应用中应尽量采用“先更新数据库再删除缓存”的方案,并结合异步重试机制来保证数据的一致性和系统的高性能。
Redis原理—5.性能和使用总结
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
74 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
Redis原理—2.单机数据库的实现
本文概述了Redis数据库的核心结构和操作机制。
Redis原理—2.单机数据库的实现