前言:
接着上一章开始讲,上一章我们讲到如果配置一个cluster,以及cluster的基础命令。以及初始化一个cluster的流程。本章继续讲一些cluster原理。
(一)消息结构
1.1 数据包类型
以下是数据包的类型:
#define CLUSTERMSG_TYPE_PING 0 /* Ping包类型 */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong包类型 (Ping的返回信息) */
#define CLUSTERMSG_TYPE_MEET 2 /* meet包类型 */
#define CLUSTERMSG_TYPE_FAIL 3 /* fail包类型 */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* 发布订阅消息包类型 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* failover授权请求包 */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* failover授权确认包 */
#define CLUSTERMSG_TYPE_UPDATE 7 /* update包,用于更新配置使用 */
#define CLUSTERMSG_TYPE_MFSTART 8 /* 手动failover包 */
#define CLUSTERMSG_TYPE_COUNT 9 /* 消息类型总数,用于计算边界. */
数据包类型在redis4.0中分为9总,redis5.0会多一种。其中CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST、CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK、CLUSTERMSG_TYPE_MFSTART三种包是没有包体结构的,也就是5,6,8类型。
1.2 clusterMsg消息结构:
typedefstruct {
char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* 消息总长度 */
uint16_t ver; /* 协议版本,当前设置为1。*/
uint16_t port; /* TCP端口号. */
uint16_t type; /* 消息类型 */
uint16_t count; /* data中的gossip session个数。(只在发送MEET、PING和PONG这三种消息时使用) */
uint64_t currentEpoch; /* 消息发送者纪元 */
uint64_t configEpoch; /* 如果消息发送者是一个主节点,那么该项为消息发送者配置纪元。
如果消息发送者是一个从节点,那么该项为发送者正在复制的主节点纪元。*/
uint64_t offset; /* 复制偏移量. */
char sender[CLUSTER_NAMELEN]; /* 发送节点名称 */
unsignedchar myslots[CLUSTER_SLOTS/8]; /*消息发送者目前的槽指派信息*/
char slaveof[CLUSTER_NAMELEN]; /*发送方如果是从,对应主的名称。*/
char myip[NET_IP_STR_LEN]; /* 发送人IP地址,如果不是全部为0. */
char notused1[34]; /* 34 字节保留字节 */
uint16_t cport; /* 发送方集群TCP 端口 */
uint16_t flags; /* 发送人节点标志 */
unsignedchar state; /* 消息发送者所在集群的状态 */
unsignedchar mflags[3]; /* 消息标志: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data; /* 消息包内容 */
} clusterMsg;
clusterMsg结构中data对应包的内容,对应clusterMsgData结构这个是一个union结构。包的类型会根据type的不同映射对应的结构。
1.3 clusterMsgData结构:
union clusterMsgData {
/* PING, MEET和 PONG包内容, 包是一个clusterMsgDataGossip结构数组。*/
struct {
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL包内容 */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH包内容 */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE包内容 */
struct {
clusterMsgDataUpdate nodecfg;
} update;
};
clusterMsgData 结构体包含ping、fail、publish、update四种结构。其中ping结构提供给三种类型包使用,分别是ping、meet和pong。
1.4 clusterMsgDataGossip结构:
typedefstruct {
char nodename[CLUSTER_NAMELEN]; //节点名称
uint32_t ping_sent; //发送ping时间
uint32_t pong_received; //返回pong时间
char ip[NET_IP_STR_LEN]; /* 节点ip地址 */
uint16_t port; /* 节点端口 */
uint16_t cport; /* 节点监听集群端口 */
uint16_t flags; /* 节点状态 node->flags copy */
uint32_t notused1; //预留
} clusterMsgDataGossip;
clusterMsgDataGossip结构结构涵盖三种包格式:
1)ping包格式
ping包是一个心跳包,是redis集群中每个节点通过心跳包可以知道其他节点的当前状态并且保存到本节点状态中。
2)pong包格式
pong包是接收到ping包或者是meet包之后作为回复包类型。当进行主从切换之后,新的主节点会向集群中的所有节点直接发送一个pong包,通知从切换后节点角色的转换。
3)meet包格式
当执行cluster meet 命令之后,执行端会向ip:port指定的地址发送meet包。
1.5 clusterMsgDataFail结构:
typedefstruct {
char nodename[CLUSTER_NAMELEN]; //节点名称
} clusterMsgDataFail;
clusterMsgDataFail用于fail包,fail包用来通知集群中某个节点处于故障状态。
1.6 clusterMsgDataPublish结构:
typedefstruct {
uint32_t channel_len; //渠道名称长度
uint32_t message_len; //消息长度
unsignedchar bulk_data[8]; //渠道和消息内容
} clusterMsgDataPublish;
clusterMsgDataPublish结构用于发布/订阅包使用。当向集群中任意一个节点发送publish信息后,该节点会向集群中所有节点广播一条publish包。
1.7 clusterMsgDataUpdate结构:
typedefstruct {
uint64_t configEpoch; /* 配置纪元. */
char nodename[CLUSTER_NAMELEN]; /* 节点名称. */
unsignedchar slots[CLUSTER_SLOTS/8]; /* 服务的slots */
} clusterMsgDataUpdate;
clusterMsgDataUpdate结构主要用于update包,update包用于更新集群节点中的配置。
(二)数据迁移
2.1 操作数据迁移命令
#将本节点的槽 slot 迁移到 node_id 指定的节点中
CLUSTER SETSLOT MIGRATING
#从 node_id 指定的节点中导入槽 slot 到本节点
CLUSTER SETSLOT IMPORTING
2.2数据迁移原理
如果A节点操作CLUSTER SETSLOT命令迁移10000槽到B节点,此时A、B两个节点都会存在10000槽。
typedefstructclusterState {
//...省略
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; //迁移槽列表
clusterNode *importing_slots_from[CLUSTER_SLOTS]; //导入槽列表
clusterNode *slots[CLUSTER_SLOTS]; //当前槽列表
//...省略
}
clusterState中分别存放三种槽的列表:迁移槽列表、导入槽列表、当前槽列表。
2.3数据迁移源码分析
voidclusterCommand(client *c) {
//。。。省略
int slot;
clusterNode *n;
if (nodeIsSlave(myself)) { //判断是否为从
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}
if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return; //获得槽,没有则返回
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
if (server.cluster->slots[slot] != myself) { //判读槽必须是当前节点
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { //获取槽
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
server.cluster->migrating_slots_to[slot] = n; //设置迁移槽
} elseif (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
if (server.cluster->slots[slot] == myself) { //判读槽不能是当前节点
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[3]->ptr);
return;
}
server.cluster->importing_slots_from[slot] = n; //设置倒入槽
}
}
migrating:满足条件
1)必须是master;
2)必须槽为0 ~ 16383数字;
3)必须槽是本节点的;
importing:满足条件
1)必须是master;
2)必须槽为0 ~ 16383数字;
3)必须槽不是本节点的;
总结:
1.redis4.0中有9种消息包类型,redis5.0种有10种消息包类型。其中涉及故障转移的三种包状态是没有包结构的。
2.消息包类型是通过clusterMsg结构中的type去映射不同的data包结构体。由于data对应的clusterMsgData 结构体是一个union结构。
3. migrating迁移槽时,需满足几个条件。必须是master,必须槽是本节点的。
4. importing倒入槽时,需满足几个条件。必须是master,必须槽是其他节点的。