引言
集群的实际环境模拟可以参考我之前的文章 单机模拟集群(三主两从)
一、集群的工作原理
集群中的节点只能使用0号数据库,而单机数据库没有这个限制。集群中的节点本质上就是一个运行在集群模式下的Redis服务器,Redis服务器在启动的时候会根据redis.conf配置文件中的cluster-enabled 是否为yes 来决定是否开启服务器的集群模式。
1.1 槽指派
Redis集群通过分片的方式来保存数据库中的键值对:集群的整个数据库被分为16384个槽(slot),数据库中的每个键都属于这16384个槽中的一个,集群中的每个节点可以处理0~16384个槽。
当数据库中的16384个槽都有节点在处理时,集群处于上线状态;相反的,如果有任何一个槽没有得到处理,那么集群处于下线状态。
每当往数据库中添加新的键值对时,Redis都会对其key计算CRC16的值,再对16384取余,计算出槽位。如果是对节点数量进行取余,在发生动态扩缩容时就会发生错乱,所以,Redis采用了固定算法: CRC16(key)% 16384。
使用CLUSTER KEYSLOT key 命令,可以查看key属于哪个槽位。
127.0.0.1:7000> CLUSTER KEYSLOT name 2022
每个节点都会在自己的clusterNode.slots 数组中保存自己处理的槽信息,这个信息会通过消息的方式发送给集群中的其他节点。如果想知道某个槽i是否已经被指派,或者指派给了哪个节点,程序需要遍历clusterState.nodes字典中的所有clusterNode,检查他们的slots数组,直到找到处理槽i的节点位置,时间复杂度为O(n),n为nodes字典保存的节点数量。
为了优化这个查询,Redis在clusterState的slots数组中,为每个槽i(0 <= i <= 16383)保存处理自身的clusterNode节点信息,如果没有则为NULL,这是如果程序想知道槽i被谁处理的时间复杂度就是O(1)。
1.2 如何判断某个槽是不是当前节点处理
当我们在客户端的命令行输入命令时,redis会先计算该键的槽位,当计算出键所属的槽位i之后,节点就会检查自己 在clusterState.slots中的数组中的第i项,如果指向自己,说明该槽位i是由当前节点负责,可以执行客户端发送的命令;如果clusterState.slots[i] 不等于 clusterState.myself,说明槽位i是由其他节点负责,当前节点会根据clusterState.slots[i] 的指向找到对应的clusterNode结构所记录的ip和port,向客户端返回MOVED错误,指引客户端转向正确的节点。
1.3 MOVE错误
当节点发现键所在的槽并非由自己负责处理的时候,节点就会向客户端返回一个MOVED错误,指引客户端转向正在负责槽的节点。
MOVED slot ip:port
1.4 重新分片
Redis集群的重新分片操作主要用来动态扩缩容。重新分片操作可以将任意数量已经指派给某个节点(源节点)的槽修改为指派给另一个节点(目标节点),并且相关槽所属的键值对也会从源节点迁移到目标节点。
重新分片操作可以在线进行,在重新分片的过程中,集群不需要下线。源节点可目标节点都能处理命令请求。
1.5 ASK错误
在进行重新分片期间,槽位中的数据从源节点向目的节点迁移的过程中,可能会出现一部分数据在源节点中,一部分数据在目标节点。当客户端向源节点发送一个与数据库键有关的命令,并且要处理的数据库键属于正在被迁移的槽位时:
- 源节点会先在自己的数据库中找,如果找到就执行客户端的命令
- 如果没找到,源节点会检查自己的clusterState.migrating_slots_to[i],找到正在迁移的目标节点, 向客户端返回一个ASK错误,指引客户端转向正在迁移的目标节点,并再次发送要执行的命令。
ASK错误和MOVED错误的区别:
相同点:都会导致客户端转向;
不同点:MOVED错误表示槽i的复制权已经转移到另一个节点了,而ASK错误只是两个节点在迁移槽的过程中使用的临时措施。
1.6 复制与故障转移
设置从节点,让当前节点去复制node节点:
CLUSTER REPLICATE <node_id>
收到该命令的节点会执行以下步骤:
- 在自己的clusterState.nodes字典中找到node_id对应的clusterNode结构,并将自己的clusterState.myself.slaveof指针指向要复制的目标节点
- 修改自己clusterState.myself.flags 标识,打开 REDIS_NODE_SLVAE标识,表示这个节点已经由原来的主节点变成了从节点
- 最后,节点会调用复制代码,根据 clusterState.myself.slaveof 指向的 clusterNode 结构中的所保存的ip和port,对主节点进行复制
一个节点成为从节点,并开始复制某个主节点这一信息会通过消息的方式发送给集群中的其他节点,最终集群中的所有节点都会知道该节点正在复制某个主节点。
比如,现在要给7000主服务添加两个从服务器,分别为节点:7004 和 7005。节点 7004 和 7005 各自clusterState结构和clusterNode结构,如下图:
7004 和 7005 称为节点7000 从节点后,集群中的各个节点为节点7000创建的clusterNode结构的样子。
1.7 故障检测
集群中的每个节点都会定期向集群中的其他节点发送PING消息,以此来检测对方是否在线,如果收到PING的节点没有在规定的时间内回复PONG消息,那么发送PING消息的节点就会将该节点标记为疑似下线。具体就是在自己的clusterState.nodes字典中,找到疑似下线的clusterNode结构,把结构中flags属性打开REDIS_NODE_PFAIL标识,以此表示目标节点进入疑似下线。
1.8 故障转移
当一个节点发现自己正在复制的主节点进入了已下线状态,从节点将开始对已下线的主节点进行故障转移,故障转移步骤如下:
- 从复制下线主节点的从节点中选一个出来,执行SLAVEOF no one 命令,成为新的主节点
- 新的主节点会撤销所有对已下线主节点的槽指派,将这些槽指派全部指派给自己
- 新的主节点向集群广播一条PONG消息,告诉集群中其他节点,我是新的主节点并且已经接管了原来的槽位
- 新的主节点开始出来与自己负责处理的槽位相关的命令请求,故障转移完成。
二、集群创建的两种方式
1.1 手动创建集群
# 节点会面 cluster meet ip port # 分配槽位 cluster addslots slot # 分配主从 cluster replicate node-id
举例:创建一个集群,包含三个节点,分别为: 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 ,让节点7000去认识7001和7002
127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7001 OK 127.0.0.1:7000> CLUSTER MEET 127.0.0.1 7002 OK 127.0.0.1:7000> CLUSTER ADDSLOTS 0 1 2 3 ... 5000 OK
为 7001 节点添加槽位 5001 ~ 10000
127.0.0.1:7001> CLUSTER ADDSLOTS 5001 5002 ... 10000 OK
为 7002 节点添加槽位 10001 ~ 16383
127.0.0.1:7002> CLUSTER ADDSLOTS 10001 10002 ... 16383 OK
1.2 智能创建集群
redis-cli --cluster create host1:port1 ... hostN:portN --cluster-replicas <arg>
上面的命令表示,创建一个包含N个节点的集群,每个主服务器的从服务器的数量为 arg。
三、CLUSTER MEET命令实现
假设我们在A客户端的终端中输入CLUSTER ,A在收到命令后会与节点B进行握手,握手流程如下:
- 节点A会为节点B创建clusterNode结构,并将该结构加入到自己的clusterState.nodes字典里,节点A根据给定的B_ip和B_port 向节点B发送一条MEET消息
- 节点B收到A的MEET消息,为节点A创建clusterNode结构,并将该结构加入到自己的clusterState.nodes字典里,为节点A回复一条PONG消息
- 节点A正常收到节点B的PONG,就知道节点B已经成功接收了自己的MEET消息,再向节点B发送一条PING消息
- 节点B正常收到节点A的PING消息,就知道A成功收到了自己的PONG,握手完成。
- 节点A会将节点B的消息通过Gossip协议传播给集群中的其他节点,让其他节点也与B握手,最终一段时间后,节点B就会被集群中的所有节点认识。
三个节点的flags都是REDIS_CLUSTER_MASTER,说明三个节点都是主节点。节点7001 和 7002 也会创建类似下面的节点,不同的是myself的指向。
四、源码分析
clusterNode 这个结构保存集群中的一个节点的状态,节点的名字、创建时间、节点的ip和port等,定义如下:
typedef struct clusterNode { // 创建节点的时间 mstime_t ctime; /* Node object creation time. */ // 节点的名字,40位十六进制字符组成 char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ // 节点标识(主、从节点 以及 节点的状态 在线还是下线) int flags; /* CLUSTER_NODE_... */ // 当前配置纪元,用于故障转移 uint64_t configEpoch; /* Last configEpoch observed for this node */ // 节点处理的槽位 // 是个二进制数组,长度为2048个字节,共包含 16384个二进制位 // 索引范围:0~16383 通过查看对应bit位是否为1 如果索引i对应的bit为1,表示节点处理槽i unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ sds slots_info; /* Slots info represented by string. */ // 节点处理的槽位的个数,也就是slots数组中1的个数 int numslots; /* Number of slots handled by this node */ // 如果是主节点,从节点的个数 int numslaves; /* Number of slave nodes, if this is a master */ // 从节点的集合 struct clusterNode **slaves; /* pointers to slave nodes */ // 如果是从服务器,指向复制的主服务器 struct clusterNode *slaveof; /* pointer to the master node. Note that it may be NULL even if the node is a slave if we don't have the master node in our tables. */ // 最后一次发送ping的时间 mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ mstime_t data_received; /* Unix time we received any data */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned master condition */ // 节点的复制偏移量 long long repl_offset; /* Last known repl offset for this node. */ // 节点的IP地址和port char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known clients port (TLS or plain). */ int pport; /* Latest known clients plaintext port. Only used if the main clients port is for TLS. */ int cport; /* Latest known cluster port of this node. */ // 保存连接节点需要的相关信息 clusterLink *link; /* TCP/IP link with this node */ // 故障检测中使用,记录了其他所有的节点,对该节点的下线报告 list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode;
clusterLink 该结构保存了连接节点所需要的信息,如连接对象,收发缓冲区等,定义如下:
/* clusterLink encapsulates everything needed to talk with a remote node. */ typedef struct clusterLink { // 连接创建时间 mstime_t ctime; /* Link creation time */ // 连接对象 connection *conn; /* Connection to remote node */ // 发送缓冲区 sds sndbuf; /* Packet send buffer */ // 接收缓冲区 char *rcvbuf; /* Packet reception buffer */ // 接收缓冲区使用的大小 size_t rcvbuf_len; /* Used size of rcvbuf */ // 接收缓冲区实际分配的大小 size_t rcvbuf_alloc; /* Allocated size of rcvbuf */ // 与这个节点相关联的节点,没有就为NULL struct clusterNode *node; /* Node related to this link if any, or NULL */ } clusterLink;
clusterState,这个结构记录了在当前节点的视角下,集群目前的状态,如:上线还是下线,有多少个节点,集群当前的配置纪元是多少等,定义如下:
typedef struct clusterState { // 指向当前节点的指针 clusterNode *myself; /* This node */ // 集群当前的配置纪元,用于实现故障转移 uint64_t currentEpoch; // 集群当前状态 在线还是下线 int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ // 至少处理一个槽位的主节点的数量 int size; /* Num of master nodes with at least one slot */ // 集群节点名单,包含所有的节点 name --> clusterNode dict *nodes; /* Hash table of name -> clusterNode structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ // 用来实现重新分片 // 记录当前节点正在迁移到其他节点的槽 clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // 记录当前节点正在从其他节点导入的槽 // 假设importing_slots_from[i]的值不为NULL,指向clusterNode结构,表示当前节点正在从clusterNode指向的节点导入槽i clusterNode *importing_slots_from[CLUSTER_SLOTS]; // slots包含16384个项,每个项都是一个clusterNode的指针 // 如果slots[i] 指向一个clusterNode结构,说明槽i已经指派给clusterNode表示的节点了 // 如果为NULL,说明还没指派给任何节点 clusterNode *slots[CLUSTER_SLOTS]; uint64_t slots_keys_count[CLUSTER_SLOTS]; rax *slots_to_keys; /* The following fields are used to take the slave state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ int failover_auth_rank; /* This slave rank for current auth request. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ int cant_failover_reason; /* Why a slave is currently not able to failover. See the CANT_FAILOVER_* macros. */ /* Manual failover state in common. */ mstime_t mf_end; /* Manual failover time limit (ms unixtime). It is zero if there is no MF in progress. */ /* Manual failover state of master. */ clusterNode *mf_slave; /* Slave performing the manual failover. */ /* Manual failover state of slave. */ long long mf_master_offset; /* Master offset the slave needs to start MF or -1 if still not received. */ int mf_can_start; /* If non-zero signal that the manual failover can start requesting masters vote. */ // 最后一次投票的纪元 /* The following fields are used by masters to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ /* Messages received and sent by type. */ long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; // 疑似下线节点的个数 long long stats_pfail_nodes; /* Number of nodes in PFAIL status, excluding nodes without address. */ } clusterState;