【Redis源码】集群之主从复制replication(十)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 【Redis源码】集群之主从复制replication(十)

前言:

说到主从大家应该都不陌生,也应该都清楚主从解决服务的哪些问题。单台服务器的支撑能力是有限的,为了提高我们的QPS或者说数据的容灾。主从服务则起到了相应的作用。

不过主从复制也会有一些缺点,比如说“高可用问题”,“单服务器资源有限问题”。针对高可用问题我们后续解析redis集群的哨兵,针对单服务器问题,我们会解析redis Cluster分布式应用。

服务器:端口 备注
127.0.0.1:6379 主1
127.0.0.1:6380 从1 ,主服务器为主1
127.0.0.1:6381 从2,主服务器为从1

(一) 主从相关操作

1.1如何建立主从关系

建立主从关系可以大致可以分为三种形式:

1)通过配置redis.conf

slaveof 127.0.0.1 6379

2)通过启动slaveof参数

#redis-server --port  6380 --slaveof 127.0.0.1 6379

3)通过命令

>127.0.0.1:6381> slaveof 127.0.0.1 6380

说明:当我们使用slaveof时比如说127.0.0.1:6380 服务调用 slaveof 127.0.0.1 6379,其实这时候就是将127.0.0.1:6379 当作是127.0.0.1:6380 的主服务,127.0.0.1:6380 则是127.0.0.1:6379 的从服务。

1.2如何查看主从

info replication

主从信息介绍:

参数 说明
role slave 当前角色slave代表从,master代表主
master_host 127.0.0.1 主服务器地址
master_port 6380 主服务器端口
master_link_status up 当前主从同步状态up代表正常,down代表断开
master_last_io_seconds_ago 5 主库与从库交互时间,上次交互的时间,用于超时
master_sync_in_progress 0 是否与主服务器进行同步
slave_repl_offset 112 slave复制偏移量
slave_priority 100 slave优先级
slave_read_only 1 从库是否设置只读
connected_slaves 0 当前从服务器连接个数
master_replid c34fe3d100c141e99f9fc9b0c55e47955a93e632 我当前的复制ID
master_replid2 0000000000000000000000000000000000000000 从master继承的replid
master_repl_offset 112 主节点的复制偏移量
second_repl_offset -1 为replid2接受最多此偏移量
repl_backlog_active 1 是否开启积压复制缓冲区
repl_backlog_size 1048576 积压复制缓冲区大小
repl_backlog_first_byte_offset 113 复制缓冲区里偏移量的大小
repl_backlog_histlen 0 此值等于 master_repl_offset - repl_backlog_first_byte_offset,该值不会超过repl_backlog_size的大小

1.3相关参数设置

配置 说明
slave-read-only 默认情况下从为了保证节点一致,从是不能写入的。需要设置本参数
repl-disable-tcp-nodelay 主节点默认是立即将写命令同步到从节点,当网络较差时可开启 repl-disable-tcp-nodelay 参数,这样会合并tcp包,从而减少带宽消耗
requirepass 为了主从复制安全,设置密码

server.c

intprocessCommand(client *c) {
   //...省略
     
   /* 默认从不支持写入,需修改配置。
   server.repl_slave_ro参数为replica-read-only设置,默认情况下是不支持写入 */

   if (server.masterhost && server.repl_slave_ro &&
       !(c->flags & CLIENT_MASTER) &&
       c->cmd->flags & CMD_WRITE)
   {
       addReply(c, shared.roslaveerr);
       return C_OK;
   }
   //...省略
}

(二) 主从复制原理及源码分析

2.1 slaveof 建立主从过程

(1)保存主节点信息

当主从建立时会保存主信息到server.masterhost(连接地址)和server.masterport(端口)中,server为redisServer结构体。

(2) 建立socket连接

当server.repl_state 设置REPL_STATE_CONNECT宏时,则serverCron中调用replicationCron的函数中会调用connectWithMaster建立与主服务器的socket连接,并且server.repl_state参数设置为REPL_STATE_CONNECTING状态。

(3) 心跳检测PING

当调用connectWithMaster建立连接时,会创建事件调用syncWithMaster,建立连接成功后server.repl_state的状态为REPL_STATE_CONNECTING会发起一个PING检测心跳。并且server.repl_state状态会更改为REPL_STATE_RECEIVE_PONG,接收到两端有效回复后 一个肯定的+PONG回复或验证。此时server.repl_state 状态变为REPL_STATE_SEND_AUTH;

(4) 验证授权

验证授权会有两种情况,一种是没有账号密码直接server.repl_state变为REPL_STATE_SEND_PORT状态。另外一种是登录服务端授权后server.repl_state变为REPL_STATE_SEND_PORT状态。

(5) 信息同步

信息同步时会同步端口、ip地址等信息.

(6) 接收rdb载入

(7)连接建立完毕

2.2 建立过程源码分析

server.h 主从同步到状态

#define REPL_STATE_NONE 0       /*  未开启主从同步情况 */
#define REPL_STATE_CONNECT 1    /* 待发起连接主服务器 */
#define REPL_STATE_CONNECTING 2  /* 主服务器连接成功 */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* 已经发起PING操作,等待接收主服务器PONG回复 */
#define REPL_STATE_SEND_AUTH 4    /*待发起主服务器密码验证 */
#define REPL_STATE_RECEIVE_AUTH 5 /* 已经发起主服务器认证“auth 密码”操作,等待主服务器回复 */
#define REPL_STATE_SEND_PORT 6    /* 待发送端口号 REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* 已发起端口号,等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_IP 8      /* 待发送ip地址, REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9   /* 已发送ip地址,等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10    /* 主从复制进行优化升级 REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /*等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12    /* 待发送 PSYNC命令 */
#define REPL_STATE_RECEIVE_PSYNC 13 /* 等待 PSYNC命令回复 */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14      /* 正在接收rdb文件 */
#define REPL_STATE_CONNECTED 15     /* 数据载入成功,主从复制建立完毕 */

replication.c 第一步 保存信息

voidreplicaofCommand(client *c) {
   //..省略
   if (!strcasecmp(c->argv[1]->ptr,"no") &&
       !strcasecmp(c->argv[2]->ptr,"one")) {   //slaveof on one 取消主从
       if (server.masterhost) {
           replicationUnsetMaster();
           sds client = catClientInfoString(sdsempty(),c);
           serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
               client);
           sdsfree(client);
       }
   } else {
       //...省略
       replicationSetMaster(c->argv[1]->ptr, port);  //设置主从信息
       //...省略
   }
   addReply(c,shared.ok);
}

voidreplicationSetMaster(char *ip, int port) {
   int was_master = server.masterhost == NULL;

   sdsfree(server.masterhost);
   server.masterhost = sdsnew(ip);   //保存master host信息
   server.masterport = port;         //保存master 端口信息
   //...省略
   server.repl_state = REPL_STATE_CONNECT; //设置等待连接状态
   server.repl_down_since = 0;
}

保存信息到server.masterhost和server.masterport中,并且设置server.repl_state状态未等待连接状态。

replication.c 第二步创建socket连接

voidreplicationCron(void) {
   staticlonglong replication_cron_loops = 0;
   //。。。省略

   /* 如果是REPL_STATE_CONNECT状态,连接到主服务器 */
   if (server.repl_state == REPL_STATE_CONNECT) {
       serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
           server.masterhost, server.masterport);
       if (connectWithMaster() == C_OK) { //创建socket连接
           serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
       }
   }
   // ... 省略
}

intconnectWithMaster(void) {
   int fd;

   fd = anetTcpNonBlockBestEffortBindConnect(NULL,
       server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); //建立master socket连接
   if (fd == -1) {
       serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
           strerror(errno));
       return C_ERR;
   }
   //创建事件syncWithMaster方法
   if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
           AE_ERR)
   {
       close(fd);
       serverLog(LL_WARNING,"Can't create readable event for SYNC");
       return C_ERR;
   }

   server.repl_transfer_lastio = server.unixtime;
   server.repl_transfer_s = fd;
   server.repl_state = REPL_STATE_CONNECTING;  //设置连接中状态
   return C_OK;
}

replication.c 第三步到第七步

voidsyncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
   char tmpfile[256], *err = NULL;
   int dfd = -1, maxtries = 5;
   int sockerr = 0, psync_result;
   socklen_t errlen = sizeof(sockerr);
   UNUSED(el);
   UNUSED(privdata);
   UNUSED(mask);

   /* 未开启主从同步情况 */
   if (server.repl_state == REPL_STATE_NONE) {
       close(fd);
       return;
   }


   /* 第三步:发起ping 到master */
   if (server.repl_state == REPL_STATE_CONNECTING) {
       serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
       /* 删除可写事件以使可读事件保持不变已经注册 */
       aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
       server.repl_state = REPL_STATE_RECEIVE_PONG; //设置等待PONG回复
   
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); //发起ping到master
       if (err) goto write_error;
       return;
   }

   /* 等待PONG回复 */
   if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //..省略
       server.repl_state = REPL_STATE_SEND_AUTH; //设置等待授权状态
   }

   /* 第四步:发起auth命令到master授权 */
   if (server.repl_state == REPL_STATE_SEND_AUTH) {
       if (server.masterauth) { //设置密码情况
           err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
           if (err) goto write_error;
           server.repl_state = REPL_STATE_RECEIVE_AUTH;
           return;
       } else {  //未设置密码情况
           server.repl_state = REPL_STATE_SEND_PORT;   //设置待发送端口号
       }
   }

   /* 等待auth授权结果 */
   if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_PORT; //设置待发送端口号
   }

   /* 第五步:信息同步 ,同步端口 */
   if (server.repl_state == REPL_STATE_SEND_PORT) {
       sds port = sdsfromlonglong(server.slave_announce_port ?
           server.slave_announce_port : server.port);
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "listening-port",port, NULL);  //同步端口
       sdsfree(port);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_PORT;
       return;
   }

   /* 第五步:信息同步 ,回复同步端口 */
   if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       sdsfree(err);
       server.repl_state = REPL_STATE_SEND_IP;
   }

   /* 第五步:信息同步 ,同步端口 */
   if (server.repl_state == REPL_STATE_SEND_IP &&
       server.slave_announce_ip == NULL)
   {
           server.repl_state = REPL_STATE_SEND_CAPA;
   }

   /* 同步ip地址 */
   if (server.repl_state == REPL_STATE_SEND_IP) {
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "ip-address",server.slave_announce_ip, NULL);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_IP;
       return;
   }

   /* 回复同步ip地址. */
   if (server.repl_state == REPL_STATE_RECEIVE_IP) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_CAPA;
   }

   /*  告诉master 当前 (slave) 的支持能力。
    *  REPLCONF capa eof capa  psync2
    *
    * EOF:支持EOF风格的RDB传输,用于无盘复制。
    * PSYNC2:支持PSYNC v2, 服务端返回标示 +CONTINUE    
    *
    * master 会忽略它不支持的能力. */

   if (server.repl_state == REPL_STATE_SEND_CAPA) {
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "capa","eof","capa","psync2",NULL);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_CAPA;
       return;
   }

   /* 回复支持能力. */
   if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_PSYNC;
   }

   /* slaveTryPartialResynchronization() 函数中启动有两个作用
    * 1.获取主读取主运行ID和偏移量,并发起 PSYNC  {replid} {offset}命令复制.。
    * 2.读取PSYNC命令状态,判断是部分同步还是完整同步。
    */

   if (server.repl_state == REPL_STATE_SEND_PSYNC) {
       if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
           err = sdsnew("Write error sending the PSYNC command.");
           goto write_error;
       }
       server.repl_state = REPL_STATE_RECEIVE_PSYNC;
       return;
   }
   
   // ...省略
   
   // 读取状态
   psync_result = slaveTryPartialResynchronization(fd,1);
   if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */


   /* 为批量传输准备合适的临时文件 */
   while(maxtries--) {
       snprintf(tmpfile,256,
           "temp-%d.%ld.rdb",(int)server.unixtime,(longint)getpid());
       dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
       if (dfd != -1) break;
       sleep(1);
   }
   if (dfd == -1) {
       serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
       goto error;
   }

   /* 创建事件调用readSyncBulkPayload函数,该函数为接收和加载rdb文件,加载完成后会更新状态为REPL_STATE_CONNECTED */
   if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
           == AE_ERR)
   {
       serverLog(LL_WARNING,
           "Can't create readable event for SYNC: %s (fd=%d)",
           strerror(errno),fd);
       goto error;
   }
   //..省略
}

2.3 心跳包检测

主从节点在建立连接后,它们之间维护着长连接并彼此发送心跳命令:

(1) slave主发REPLCONF 进行ACK校验 【127.0.0.1:6380 往 127.0.0.1:6379】

当前往从发送REPLCONF ACK 19695 命令

(2)主往从发送PING 【127.0.0.1:6379 往 127.0.0.1:6380】

当前往主发送 PING 命令

replication.c 心跳包代码


voidreplicationCron(void) {  
   /* 不时地向主服务器发送ACK。
    * 请注意,我们不会定期向不支持PSYNC和复制偏移量的主机发送ack。
    */

   if (server.masterhost && server.master &&
       !(server.master->flags & CLIENT_PRE_PSYNC))
       replicationSendAck();

 
   /* master 每N秒钟给slave 一次ping */
   if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
       listLength(server.slaves))
   {
       ping_argv[0] = createStringObject("PING",4);
       replicationFeedSlaves(server.slaves, server.slaveseldb,
           ping_argv, 1);
       decrRefCount(ping_argv[0]);
   }
}

server.repl_ping_slave_period参数为redis.conf中的repl-ping-replica-period参数,定义心跳(PING)间隔,默认为10秒。

总结:

1.建立主从关键有三种形式:redis启动,redis配置,redis命令。

2.从库默认情况是不支持写入操作,需要redis.conf配置slave-read-only参数。

3.主从之间是存在心跳响应,主会往从发PING,从会往主发ACK校验。

4.主从模式如果是数据可靠性服务,可以提高可靠性解决数据容灾问题。

5.info replication命令可以查看相关主从信息和复制偏移量,解决主从中遇到复制失败问题。

6.slaveof no one命令可以取消主从复制。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
3月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
66 6
|
3月前
|
监控 NoSQL Redis
看完这篇就能弄懂Redis的集群的原理了
看完这篇就能弄懂Redis的集群的原理了
127 0
|
1月前
|
存储 NoSQL 大数据
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
33 3
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
2月前
|
NoSQL 网络协议 Redis
Redis的主从复制和哨兵模式
本文详细介绍了Redis的主从复制配置、原理(包括全量复制和增量复制)以及如何搭建一主二从的Redis集群,同时还探讨了Redis哨兵模式的概念、配置文件、以及如何配置一主二从三哨兵的Redis哨兵模式,以实现高可用性。
|
3月前
|
Web App开发 前端开发 关系型数据库
基于SpringBoot+Vue+Redis+Mybatis的商城购物系统 【系统实现+系统源码+答辩PPT】
这篇文章介绍了一个基于SpringBoot+Vue+Redis+Mybatis技术栈开发的商城购物系统,包括系统功能、页面展示、前后端项目结构和核心代码,以及如何获取系统源码和答辩PPT的方法。
|
3月前
|
消息中间件 存储 缓存
深入理解Redis集群主从复制原理
该文章主要探讨了Redis集群中的主从复制原理,包括为何需要主从复制、配置方法、复制流程以及一些高级特性。
深入理解Redis集群主从复制原理
|
3月前
|
NoSQL Redis
redis 6源码解析之 ziplist
redis 6源码解析之 ziplist
32 5