【Redis源码】集群之主从复制replication(十)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 【Redis源码】集群之主从复制replication(十)

前言:

说到主从大家应该都不陌生,也应该都清楚主从解决服务的哪些问题。单台服务器的支撑能力是有限的,为了提高我们的QPS或者说数据的容灾。主从服务则起到了相应的作用。

不过主从复制也会有一些缺点,比如说“高可用问题”,“单服务器资源有限问题”。针对高可用问题我们后续解析redis集群的哨兵,针对单服务器问题,我们会解析redis Cluster分布式应用。

服务器:端口 备注
127.0.0.1:6379 主1
127.0.0.1:6380 从1 ,主服务器为主1
127.0.0.1:6381 从2,主服务器为从1

(一) 主从相关操作

1.1如何建立主从关系

建立主从关系可以大致可以分为三种形式:

1)通过配置redis.conf

slaveof 127.0.0.1 6379

2)通过启动slaveof参数

#redis-server --port  6380 --slaveof 127.0.0.1 6379

3)通过命令

>127.0.0.1:6381> slaveof 127.0.0.1 6380

说明:当我们使用slaveof时比如说127.0.0.1:6380 服务调用 slaveof 127.0.0.1 6379,其实这时候就是将127.0.0.1:6379 当作是127.0.0.1:6380 的主服务,127.0.0.1:6380 则是127.0.0.1:6379 的从服务。

1.2如何查看主从

info replication

主从信息介绍:

参数 说明
role slave 当前角色slave代表从,master代表主
master_host 127.0.0.1 主服务器地址
master_port 6380 主服务器端口
master_link_status up 当前主从同步状态up代表正常,down代表断开
master_last_io_seconds_ago 5 主库与从库交互时间,上次交互的时间,用于超时
master_sync_in_progress 0 是否与主服务器进行同步
slave_repl_offset 112 slave复制偏移量
slave_priority 100 slave优先级
slave_read_only 1 从库是否设置只读
connected_slaves 0 当前从服务器连接个数
master_replid c34fe3d100c141e99f9fc9b0c55e47955a93e632 我当前的复制ID
master_replid2 0000000000000000000000000000000000000000 从master继承的replid
master_repl_offset 112 主节点的复制偏移量
second_repl_offset -1 为replid2接受最多此偏移量
repl_backlog_active 1 是否开启积压复制缓冲区
repl_backlog_size 1048576 积压复制缓冲区大小
repl_backlog_first_byte_offset 113 复制缓冲区里偏移量的大小
repl_backlog_histlen 0 此值等于 master_repl_offset - repl_backlog_first_byte_offset,该值不会超过repl_backlog_size的大小

1.3相关参数设置

配置 说明
slave-read-only 默认情况下从为了保证节点一致,从是不能写入的。需要设置本参数
repl-disable-tcp-nodelay 主节点默认是立即将写命令同步到从节点,当网络较差时可开启 repl-disable-tcp-nodelay 参数,这样会合并tcp包,从而减少带宽消耗
requirepass 为了主从复制安全,设置密码

server.c

intprocessCommand(client *c) {
   //...省略
     
   /* 默认从不支持写入,需修改配置。
   server.repl_slave_ro参数为replica-read-only设置,默认情况下是不支持写入 */

   if (server.masterhost && server.repl_slave_ro &&
       !(c->flags & CLIENT_MASTER) &&
       c->cmd->flags & CMD_WRITE)
   {
       addReply(c, shared.roslaveerr);
       return C_OK;
   }
   //...省略
}

(二) 主从复制原理及源码分析

2.1 slaveof 建立主从过程

(1)保存主节点信息

当主从建立时会保存主信息到server.masterhost(连接地址)和server.masterport(端口)中,server为redisServer结构体。

(2) 建立socket连接

当server.repl_state 设置REPL_STATE_CONNECT宏时,则serverCron中调用replicationCron的函数中会调用connectWithMaster建立与主服务器的socket连接,并且server.repl_state参数设置为REPL_STATE_CONNECTING状态。

(3) 心跳检测PING

当调用connectWithMaster建立连接时,会创建事件调用syncWithMaster,建立连接成功后server.repl_state的状态为REPL_STATE_CONNECTING会发起一个PING检测心跳。并且server.repl_state状态会更改为REPL_STATE_RECEIVE_PONG,接收到两端有效回复后 一个肯定的+PONG回复或验证。此时server.repl_state 状态变为REPL_STATE_SEND_AUTH;

(4) 验证授权

验证授权会有两种情况,一种是没有账号密码直接server.repl_state变为REPL_STATE_SEND_PORT状态。另外一种是登录服务端授权后server.repl_state变为REPL_STATE_SEND_PORT状态。

(5) 信息同步

信息同步时会同步端口、ip地址等信息.

(6) 接收rdb载入

(7)连接建立完毕

2.2 建立过程源码分析

server.h 主从同步到状态

#define REPL_STATE_NONE 0       /*  未开启主从同步情况 */
#define REPL_STATE_CONNECT 1    /* 待发起连接主服务器 */
#define REPL_STATE_CONNECTING 2  /* 主服务器连接成功 */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* 已经发起PING操作,等待接收主服务器PONG回复 */
#define REPL_STATE_SEND_AUTH 4    /*待发起主服务器密码验证 */
#define REPL_STATE_RECEIVE_AUTH 5 /* 已经发起主服务器认证“auth 密码”操作,等待主服务器回复 */
#define REPL_STATE_SEND_PORT 6    /* 待发送端口号 REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* 已发起端口号,等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_IP 8      /* 待发送ip地址, REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9   /* 已发送ip地址,等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10    /* 主从复制进行优化升级 REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /*等待主服务器回复 REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12    /* 待发送 PSYNC命令 */
#define REPL_STATE_RECEIVE_PSYNC 13 /* 等待 PSYNC命令回复 */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14      /* 正在接收rdb文件 */
#define REPL_STATE_CONNECTED 15     /* 数据载入成功,主从复制建立完毕 */

replication.c 第一步 保存信息

voidreplicaofCommand(client *c) {
   //..省略
   if (!strcasecmp(c->argv[1]->ptr,"no") &&
       !strcasecmp(c->argv[2]->ptr,"one")) {   //slaveof on one 取消主从
       if (server.masterhost) {
           replicationUnsetMaster();
           sds client = catClientInfoString(sdsempty(),c);
           serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
               client);
           sdsfree(client);
       }
   } else {
       //...省略
       replicationSetMaster(c->argv[1]->ptr, port);  //设置主从信息
       //...省略
   }
   addReply(c,shared.ok);
}

voidreplicationSetMaster(char *ip, int port) {
   int was_master = server.masterhost == NULL;

   sdsfree(server.masterhost);
   server.masterhost = sdsnew(ip);   //保存master host信息
   server.masterport = port;         //保存master 端口信息
   //...省略
   server.repl_state = REPL_STATE_CONNECT; //设置等待连接状态
   server.repl_down_since = 0;
}

保存信息到server.masterhost和server.masterport中,并且设置server.repl_state状态未等待连接状态。

replication.c 第二步创建socket连接

voidreplicationCron(void) {
   staticlonglong replication_cron_loops = 0;
   //。。。省略

   /* 如果是REPL_STATE_CONNECT状态,连接到主服务器 */
   if (server.repl_state == REPL_STATE_CONNECT) {
       serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
           server.masterhost, server.masterport);
       if (connectWithMaster() == C_OK) { //创建socket连接
           serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
       }
   }
   // ... 省略
}

intconnectWithMaster(void) {
   int fd;

   fd = anetTcpNonBlockBestEffortBindConnect(NULL,
       server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); //建立master socket连接
   if (fd == -1) {
       serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
           strerror(errno));
       return C_ERR;
   }
   //创建事件syncWithMaster方法
   if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
           AE_ERR)
   {
       close(fd);
       serverLog(LL_WARNING,"Can't create readable event for SYNC");
       return C_ERR;
   }

   server.repl_transfer_lastio = server.unixtime;
   server.repl_transfer_s = fd;
   server.repl_state = REPL_STATE_CONNECTING;  //设置连接中状态
   return C_OK;
}

replication.c 第三步到第七步

voidsyncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
   char tmpfile[256], *err = NULL;
   int dfd = -1, maxtries = 5;
   int sockerr = 0, psync_result;
   socklen_t errlen = sizeof(sockerr);
   UNUSED(el);
   UNUSED(privdata);
   UNUSED(mask);

   /* 未开启主从同步情况 */
   if (server.repl_state == REPL_STATE_NONE) {
       close(fd);
       return;
   }


   /* 第三步:发起ping 到master */
   if (server.repl_state == REPL_STATE_CONNECTING) {
       serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
       /* 删除可写事件以使可读事件保持不变已经注册 */
       aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
       server.repl_state = REPL_STATE_RECEIVE_PONG; //设置等待PONG回复
   
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); //发起ping到master
       if (err) goto write_error;
       return;
   }

   /* 等待PONG回复 */
   if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //..省略
       server.repl_state = REPL_STATE_SEND_AUTH; //设置等待授权状态
   }

   /* 第四步:发起auth命令到master授权 */
   if (server.repl_state == REPL_STATE_SEND_AUTH) {
       if (server.masterauth) { //设置密码情况
           err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
           if (err) goto write_error;
           server.repl_state = REPL_STATE_RECEIVE_AUTH;
           return;
       } else {  //未设置密码情况
           server.repl_state = REPL_STATE_SEND_PORT;   //设置待发送端口号
       }
   }

   /* 等待auth授权结果 */
   if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_PORT; //设置待发送端口号
   }

   /* 第五步:信息同步 ,同步端口 */
   if (server.repl_state == REPL_STATE_SEND_PORT) {
       sds port = sdsfromlonglong(server.slave_announce_port ?
           server.slave_announce_port : server.port);
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "listening-port",port, NULL);  //同步端口
       sdsfree(port);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_PORT;
       return;
   }

   /* 第五步:信息同步 ,回复同步端口 */
   if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       sdsfree(err);
       server.repl_state = REPL_STATE_SEND_IP;
   }

   /* 第五步:信息同步 ,同步端口 */
   if (server.repl_state == REPL_STATE_SEND_IP &&
       server.slave_announce_ip == NULL)
   {
           server.repl_state = REPL_STATE_SEND_CAPA;
   }

   /* 同步ip地址 */
   if (server.repl_state == REPL_STATE_SEND_IP) {
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "ip-address",server.slave_announce_ip, NULL);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_IP;
       return;
   }

   /* 回复同步ip地址. */
   if (server.repl_state == REPL_STATE_RECEIVE_IP) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_CAPA;
   }

   /*  告诉master 当前 (slave) 的支持能力。
    *  REPLCONF capa eof capa  psync2
    *
    * EOF:支持EOF风格的RDB传输,用于无盘复制。
    * PSYNC2:支持PSYNC v2, 服务端返回标示 +CONTINUE    
    *
    * master 会忽略它不支持的能力. */

   if (server.repl_state == REPL_STATE_SEND_CAPA) {
       err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
               "capa","eof","capa","psync2",NULL);
       if (err) goto write_error;
       sdsfree(err);
       server.repl_state = REPL_STATE_RECEIVE_CAPA;
       return;
   }

   /* 回复支持能力. */
   if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
       err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
       //。。。省略
       server.repl_state = REPL_STATE_SEND_PSYNC;
   }

   /* slaveTryPartialResynchronization() 函数中启动有两个作用
    * 1.获取主读取主运行ID和偏移量,并发起 PSYNC  {replid} {offset}命令复制.。
    * 2.读取PSYNC命令状态,判断是部分同步还是完整同步。
    */

   if (server.repl_state == REPL_STATE_SEND_PSYNC) {
       if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
           err = sdsnew("Write error sending the PSYNC command.");
           goto write_error;
       }
       server.repl_state = REPL_STATE_RECEIVE_PSYNC;
       return;
   }
   
   // ...省略
   
   // 读取状态
   psync_result = slaveTryPartialResynchronization(fd,1);
   if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */


   /* 为批量传输准备合适的临时文件 */
   while(maxtries--) {
       snprintf(tmpfile,256,
           "temp-%d.%ld.rdb",(int)server.unixtime,(longint)getpid());
       dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
       if (dfd != -1) break;
       sleep(1);
   }
   if (dfd == -1) {
       serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
       goto error;
   }

   /* 创建事件调用readSyncBulkPayload函数,该函数为接收和加载rdb文件,加载完成后会更新状态为REPL_STATE_CONNECTED */
   if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
           == AE_ERR)
   {
       serverLog(LL_WARNING,
           "Can't create readable event for SYNC: %s (fd=%d)",
           strerror(errno),fd);
       goto error;
   }
   //..省略
}

2.3 心跳包检测

主从节点在建立连接后,它们之间维护着长连接并彼此发送心跳命令:

(1) slave主发REPLCONF 进行ACK校验 【127.0.0.1:6380 往 127.0.0.1:6379】

当前往从发送REPLCONF ACK 19695 命令

(2)主往从发送PING 【127.0.0.1:6379 往 127.0.0.1:6380】

当前往主发送 PING 命令

replication.c 心跳包代码


voidreplicationCron(void) {  
   /* 不时地向主服务器发送ACK。
    * 请注意,我们不会定期向不支持PSYNC和复制偏移量的主机发送ack。
    */

   if (server.masterhost && server.master &&
       !(server.master->flags & CLIENT_PRE_PSYNC))
       replicationSendAck();

 
   /* master 每N秒钟给slave 一次ping */
   if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
       listLength(server.slaves))
   {
       ping_argv[0] = createStringObject("PING",4);
       replicationFeedSlaves(server.slaves, server.slaveseldb,
           ping_argv, 1);
       decrRefCount(ping_argv[0]);
   }
}

server.repl_ping_slave_period参数为redis.conf中的repl-ping-replica-period参数,定义心跳(PING)间隔,默认为10秒。

总结:

1.建立主从关键有三种形式:redis启动,redis配置,redis命令。

2.从库默认情况是不支持写入操作,需要redis.conf配置slave-read-only参数。

3.主从之间是存在心跳响应,主会往从发PING,从会往主发ACK校验。

4.主从模式如果是数据可靠性服务,可以提高可靠性解决数据容灾问题。

5.info replication命令可以查看相关主从信息和复制偏移量,解决主从中遇到复制失败问题。

6.slaveof no one命令可以取消主从复制。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3天前
|
存储 监控 负载均衡
redis 集群 (主从复制 哨兵模式 cluster)
redis 集群 (主从复制 哨兵模式 cluster)
|
19天前
|
负载均衡 监控 NoSQL
Redis的几种主要集群方案
【5月更文挑战第15天】Redis集群方案包括主从复制(基础,读写分离,手动故障恢复)、哨兵模式(自动高可用,自动故障转移)和Redis Cluster(官方分布式解决方案,自动分片、容错和扩展)。此外,还有Codis、Redisson和Twemproxy等工具用于代理分片和负载均衡。选择方案需考虑应用场景、数据量和并发需求,权衡可用性、性能和扩展性。
196 2
|
19天前
|
存储 监控 负载均衡
保证Redis的高可用性是一个涉及多个层面的任务,主要包括数据持久化、复制与故障转移、集群化部署等方面
【5月更文挑战第15天】保证Redis高可用性涉及数据持久化、复制与故障转移、集群化及优化策略。RDB和AOF是数据持久化方法,哨兵模式确保故障自动恢复。Redis Cluster实现分布式部署,提高负载均衡和容错性。其他措施包括身份认证、多线程、数据压缩和监控报警,以增强安全性和稳定性。通过综合配置与监控,可确保Redis服务的高效、可靠运行。
190 2
|
19天前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
23 0
|
3天前
|
存储 负载均衡 监控
redis 集群模式(redis cluster)介绍
redis 集群模式(redis cluster)介绍
|
18天前
|
NoSQL 算法 Java
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
【redis源码学习】持久化机制,java程序员面试算法宝典pdf
|
19天前
|
存储 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(下)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
233 1
|
19天前
|
监控 NoSQL Redis
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群(上)
Redis源码、面试指南(5)多机数据库、复制、哨兵、集群
282 0
|
19天前
|
存储 NoSQL 调度
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(下)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
18 0
|
19天前
|
存储 NoSQL API
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(上)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
26 1