【Redis源码】集群之分布式cluster建立集群关系(十三)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis源码】集群之分布式cluster建立集群关系(十三)

前言:

redis在redis3.0版本之后推出redis cluster模式集群,redis cluster是官方提供的分布式解决方案。当一个redis节点挂了可以快速切到另一个节点中。当遇到单机内存、并发瓶颈时可以考虑使用redis cluster。

集群的内容会比较长,这一章会分为两篇作为描述:

《集群之分布式cluster建立集群关系》与《集群之分布式cluster原理》

(一)cluster基础知识

1.1 了解cluser

一个redis集群通常由多个节点组成,起初每个阶段都是独立的个体。它们都在自己的集群当中。如果要构建一个真正的集群,我们必须将各个独立的节点连接在一起,构成一个包含多节点的集群。

如图中所示:

节点
第一组节点 127.0.0.1:6379 127.0.0.1:6389
第二组节点 127.0.0.1:6380 127.0.0.1:6390
第三组节点 127.0.0.1:6381 127.0.0.1:6391

我们要构成这些节点,可以通过命令或者配置构成。

1.2 配置cluster

1)配置信息

port 6389
#开启cluster模式
cluster-enabled yes

#配置节点之间超时时间
cluster-node-timeout 15000

#这个配置很重要,cluster开启必须重命名指定cluster-config-file
#不能与别的节点相同,否则会启动失败,最好按主机+端口命名
#其次,该文件保存了本节点与其他节点的信息及关系
cluster-config-file nodes-6389.conf

2)创建redis cluster

redis-cli --cluster create 127.0.0.1:6379 127.0.0.1:6380  127.0.0.1:6381 127.0.0.1:6389 127.0.0.1:6390 127.0.0.1:6391 --cluster-replicas 1

该条命令时redis5.0客户端才支持的。除了这种方式还可以使用redis-trib.rb

在创建节点过程中如果时遇到[ERR] Not all 16384 slots are covered by nodes.错误,

或者使用过程中遇到(error) CLUSTERDOWN The cluster is down。

可以执行如下命令:

redis-cli --cluster fix 127.0.0.1:6379

修复过程中,我们可以看到我们集群中有16384个槽可以分别指派给集群中的各个节点。

1.3 基础命令

#查看当前节点
CLUSTER NODES

#将 ip 和 port 所指定的节点添加到集群中
CLUSTER MEET   .

#从集群中移除 node_id 指定的节点
CLUSTER FORGET

#将当前节点设置为 node_id 指定的节点的从节点
CLUSTER REPLICATE

#将节点的配置文件保存到硬盘里面
CLUSTER SAVECONFIG

#将一个或多个槽(slot)指派(assign)给当前节点
CLUSTER ADDSLOTS  [slot ...]

#移除一个或多个槽对当前节点的指派
CLUSTER DELSLOTS  [slot ...]

#移除当前节点所有槽
CLUSTER FLUSHSLOTS

#将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,
#那么先让另一个节点删除该槽,然后再进行指派
CLUSTER SETSLOT  NODE

#将本节点的槽 slot 迁移到 node_id 指定的节点中
CLUSTER SETSLOT  MIGRATING

#从 node_id 指定的节点中导入槽 slot 到本节点
CLUSTER SETSLOT  IMPORTING

#取消对槽 slot 的导入(import)或者迁移(migrate)
CLUSTER SETSLOT  STABLE

#计算键 key 应该被放置在哪个槽上
CLUSTER KEYSLOT

#返回槽 slot 目前包含的键值对数量
CLUSTER COUNTKEYSINSLOT

#返回 count 个 slot 槽中的键
CLUSTER GETKEYSINSLOT  

(二)源码分析

2.1基础结构

#define CLUSTER_SLOTS 16384  //对应卡槽最大数量
typedefstructclusterNode {
   mstime_t ctime;              /* 创建节点时间. */
   char name[CLUSTER_NAMELEN];  /* 节点名称 hex 字节串, 40个字节 */
   int flags;                   /* 节点标识,CLUSTER_NODE_... */
   uint64_t configEpoch;        /* 节点当前的配置纪元,用于实现故障转移 */
   unsignedchar slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
   int numslots;                         /* 此节点处理的插槽数 */
   int numslaves;                        /* 如果这是主节点,则从节点的数量 */
   structclusterNode **slaves;         /*  节点从节点指针 */
   structclusterNode *slaveof;         /* 指向主节点的指针。*/
   mstime_t ping_sent;      /* 最新一个 ping 时间 */
   mstime_t pong_received;  /* 最新一个回复 pong 时间*/
   mstime_t fail_time;      /* 设置失败标志的Unix时间 */
   mstime_t voted_time;     /* 上一次投票时间 */
   mstime_t repl_offset_time;  /* 我们收到此节点偏移量的Unix时间 */
   mstime_t orphaned_time;     /* 孤立主条件开始的时间 */
   longlong repl_offset;      /* 此节点的最后一个已知复制偏移量. */
   char ip[NET_IP_STR_LEN];  /* 节点IP地址 */
   int port;                   /* 节点端口 */
   int cport;                  /* 此节点的最新已知群集端口. */
   clusterLink *link;          /* 节点 TCP/IP 连接信息  */
   list *fail_reports;         /* 失败节点列表 */
} clusterNode;

CLUSTER_SLOTS宏是我们cluster卡槽数量,link保存了连接节点所需的有关信息, 比如套接字描述符, 输入缓冲区和输出缓冲区:
typedefstructclusterLink {
   mstime_t ctime;             /* Link 创建时间 */
   int fd;                     /* TCP socket 描述符 */
   sds sndbuf;                 /* 输出缓冲区 */
   sds rcvbuf;                 /* 输入缓冲区 */
   structclusterNode *node;   /* 与这个连接相关联的节点,如果没有的话就为 NULL */
} clusterLink;

每个连接都会都会维护一个clusterState状态。
typedefstructclusterState {
   clusterNode *myself;   /* 指向当前节点指针 */
   uint64_t currentEpoch; /* 集群当前的配置纪元,用于故障恢复 */
   int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */
   int size;             /* 集群中至少处理着一个槽的节点的数量 */
   dict *nodes;          /* 集群节点名单 对应 clusterNode 结构体 */
   //...省略
} clusterState;

2.2 初始化cluster

cluster命令方法:

voidclusterCommand(client *c) {
   if (server.cluster_enabled == 0) { //判断是否开启cluster
       addReplyError(c,"This instance has cluster support disabled");
       return;
   }
   //判断参数必须是4个或者5个,且第二个参数等于meet
   if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
       /* CLUSTER MEET   [cport] */
       longlong port, cport;

       if (getLongLongFromObject(c->argv[3], &port) != C_OK) { //获得端口参数
           addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
                               (char*)c->argv[3]->ptr);
           return;
       }

       if (c->argc == 5) { //5个参数时
           if (getLongLongFromObject(c->argv[4], &cport) != C_OK) { // 获得cport
               addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
                                   (char*)c->argv[4]->ptr);
               return;
           }
       } else {
           cport = port + CLUSTER_PORT_INCR; //默认清楚port + 10000
       }

       if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 && .  //握手
           errno == EINVAL)
       {
           addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                           (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
       } else {
           addReply(c,shared.ok);
       }
   }
   //。。。省略
}

握手函数源码

intclusterStartHandshake(char *ip, int port, int cport) {
   clusterNode *n;
   char norm_ip[NET_IP_STR_LEN];
   structsockaddr_storagesa;

   /* IP健全性检查 */
   if (inet_pton(AF_INET,ip,
           &(((struct sockaddr_in *)&sa)->sin_addr)))
   {
       sa.ss_family = AF_INET;
   } elseif (inet_pton(AF_INET6,ip,
           &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
   {
       sa.ss_family = AF_INET6;
   } else {
       errno = EINVAL;
       return0;
   }

   /* 端口健全性检测 */
   if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
       errno = EINVAL;
       return0;
   }

   /* 网络ip地址转寒*/
   memset(norm_ip,0,NET_IP_STR_LEN);
   if (sa.ss_family == AF_INET)
       inet_ntop(AF_INET,
           (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
           norm_ip,NET_IP_STR_LEN);
   else
       inet_ntop(AF_INET6,
           (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
           norm_ip,NET_IP_STR_LEN);
   //判断是否正在握手中,防止重复握手
   if (clusterHandshakeInProgress(norm_ip,port,cport)) {
       errno = EAGAIN;
       return0;
   }

   /* 创建node节点结构*/
   n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
   memcpy(n->ip,norm_ip,sizeof(n->ip));
   n->port = port;
   n->cport = cport;
   //添加nodes节点到server.cluster->nodes中
   clusterAddNode(n);
   return1;
}

加入node到server.cluster->nodes后,serverCron中会调用clusterCron。该函数中会判断处于握手状态的节点是否握手超时,如果是。则调用clusterDelNode函数删除节点。如果节点的节点 TCP/IP 连接信息 等于空时(link == NULL),会发起tcp/ip连接,且将fd信息保存到节点link->fd中。已经ping信息。(该部分下一章详细讲解)

2.3键值设置流程

1)第一步通过6379端口登录

#redis-cli -p 6379 -c

2)下断点processCommand

intprocessCommand(client *c) {
    //。。。省略
   /* 如果启用群集,请在此处执行群集重定向。
   *但是,如果发生以下情况,则不执行重定向:
   *1)这个命令的发送者是我们的master。
   *2)命令没有键参数。*/

   if (server.cluster_enabled &&
       !(c->flags & CLIENT_MASTER) &&
       !(c->flags & CLIENT_LUA &&
         server.lua_caller->flags & CLIENT_MASTER) &&
       !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
         c->cmd->proc != execCommand))
   {
       int hashslot;
       int error_code;
       //获得key属于哪个slot,内部通过keyHashSlot函数计算slot
       clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                       &hashslot,&error_code);
       if (n == NULL || n != server.cluster->myself) { //判断node不是自己
           if (c->cmd->proc == execCommand) {
               discardTransaction(c);
           } else {
               flagTransaction(c);
           }
           clusterRedirectClient(c,n,hashslot,error_code); //跳转
           return C_OK;
       }
   }
   //。。。省略
}

计算slot分布函数,keyHashSlot

unsignedintkeyHashSlot(char *key, int keylen) {
   int s, e; /* start-end indexes of { and } */

   for (s = 0; s < keylen; s++)   //计算{的位置
       if (key[s] == '{') break;

   /* 没有{情况下直接 crc16(key) % 16384 = crc16(key) & 0x3FFF */
   if (s == keylen) return crc16(key,keylen) & 0x3FFF;

   /* 有'{'情况,计算}的位置  */
   for (e = s+1; e < keylen; e++)
       if (key[e] == '}') break;

   /* 没有 '}'或者不是{} 闭合 */
   if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

   /* {key}闭合时取中间的key */
   return crc16(key+s+1,e-s-1) & 0x3FFF;
}

函数中计算slot的分布情况,以crc16(key) % 16384 。

上图为计算slots

上图为跳转函数

上图为跳转后展示

总结:

1.cluster中是采用hash槽,有16384个槽可以分别指派给集群中的各个节点。每个节点都会记录哪些槽分配给自己,哪些槽指派给其他节点。在node_xxx.conf中也可以体现。

2.redis5.0客户端通过redis-cli --cluster 可以创建cluster集群,通过redis-cli --cluster fix 可以修复集群重新分配槽。

3.调用命令时,会发起调用processCommand函数,函数中如果开启cluster,计算的slot后得到node节点不是自己。则客户端连接则会跳转。

4.CLUSTER MEET命令可以加入cluster,加入时发起握手相关操作,检测服务正常后,加入节点信息到server.cluster->nodes中。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
1
分享
相关文章
登顶TPC-C|云原生数据库PolarDB技术揭秘:Limitless集群和分布式扩展篇
阿里云PolarDB云原生数据库在TPC-C基准测试中以20.55亿tpmC的成绩刷新世界纪录,展现卓越性能与性价比。其轻量版满足国产化需求,兼具高性能与低成本,适用于多种场景,推动数据库技术革新与发展。
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
分布式爬虫框架Scrapy-Redis实战指南
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
489 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
1月前
|
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
189 83
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
40 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用的算法是哈希槽分区算法。Redis集群中有16384个哈希槽(槽的范围是 0 -16383,哈希槽),将不同的哈希槽分布在不同的Redis节点上面进行管理,也就是说每个Redis节点只负责一部分的哈希槽。在对数据进行操作的时候,集群会对使用CRC16算法对key进行计算并对16384取模(slot = CRC16(key)%16383),得到的结果就是 Key-Value 所放入的槽,通过这个值,去找到对应的槽所对应的Redis节点,然后直接到这个对应的节点上进行存取操作
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
92 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
138 6
Redis,分布式缓存演化之路
|
1月前
|
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
51 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理

热门文章

最新文章