从源码阅读AOF重写-下篇(二)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: 从源码阅读AOF重写-下篇

其实,刚才我介绍的写事件回调函数 aofChildWriteDiffData,它的主要作用是从 aof_rewrite_buf_blocks 列表中逐个取出数据块,然后通过 aof_pipe_write_data_to_child 管道描述符,将数据块中的命令操作通过管道发给重写子进程,这个过程如下所示:

/* Event handler used to send data to the child process doing the AOF
 * rewrite. We send pieces of our AOF differences buffer so that the final
 * write when the child finishes the rewrite will be small. */
// 事件handler 会发数据给子进程去让子进程重写AOF。我们发送我们的 AOF 差异缓冲区的片段,以便子进程完成重写时的最终写入会很小。
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);
    while(1) {
        // 从aof_rewrite_buf_blocks列表中取出数据块
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            // 调用write将数据块写入主进程和重写子进程间的管道
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

好了,这样一来,你就了解了主进程其实是在正常记录 AOF 日志时,将收到的命令操作写入 aof_rewrite_buf_blocks 列表中的数据块,然后再通过 aofChildWriteDiffData 函数将记录的命令操作通过主进程和重写子进程间的管道发给子进程。

下图也展示了这个过程,你可以再来回顾下。

然后,我们接着来看下重写子进程,是如何从管道中读取父进程发送的命令操作的。

这实际上是由 aofReadDiffFromParent 函数(在 aof.c 文件中)来完成的。这个函数会使用一个 64KB 大小的缓冲区,然后调用 read 函数,读取父进程和重写子进程间的操作命令传输管道中的数据。以下代码也展示了 aofReadDiffFromParent 函数的基本执行流程,你可以看下。

/* This function is called by the child rewriting the AOF file to read
 * the difference accumulated from the parent into a buffer, that is
 * concatenated at the end of the rewrite. */
// 子进程调用此函数,重写 AOF 文件,将父进程累积的差异读取到缓冲区中,在重写结束时将其连接起来
ssize_t aofReadDiffFromParent(void) {
    //管道默认的缓冲区大小
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;
    //调用read函数从aof_pipe_read_data_from_parent中读取数据
    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

那么,从代码中,你可以看到 aofReadDiffFromParent 函数会通过 aof_pipe_read_data_from_parent 描述符读取数据。然后,它会将读取的操作命令追加到全局变量 server 的 aof_child_diff 字符串中。而在 AOF 重写函数 rewriteAppendOnlyFile 的执行过程最后,aof_child_diff 字符串会被写入 AOF 重写日志文件,以便我们在使用 AOF 重写日志时,能尽可能地恢复重写期间收到的操作。

这个 aof_child_diff 字符串写入重写日志文件的过程,你可以参考下面给出的代码:

int rewriteAppendOnlyFile(char *filename) {
...
//将aof_child_diff中累积的操作命令写入AOF重写日志文件
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;
...
}

所以也就是说,aofReadDiffFromParent 函数实现了重写子进程向主进程读取操作命令。那么在这里,我们还需要搞清楚的问题是:aofReadDiffFromParent 函数会在哪里被调用,也就是重写子进程会在什么时候从管道中读取主进程收到的操作。

其实,aofReadDiffFromParent 函数一共会被以下三个函数调用。

  • rewriteAppendOnlyFileRio 函数:这个函数是由重写子进程执行的,它负责遍历 Redis 每个数据库,生成 AOF 重写日志,在这个过程中,它会不时地调用 aofReadDiffFromParent 函数。
  • rewriteAppendOnlyFile 函数:这个函数是重写日志的主体函数,也是由重写子进程执行的,它本身会调用 rewriteAppendOnlyFileRio 函数。此外,它在调用完 rewriteAppendOnlyFileRio 函数后,还会多次调用 aofReadDiffFromParent 函数,以尽可能多地读取主进程在重写日志期间收到的操作命令。
  • rdbSaveRio 函数:这个函数是创建 RDB 文件的主体函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数。

从这里,我们可以看到,Redis 源码在实现 AOF 重写过程中,其实会多次让重写子进程向主进程读取新收到的操作命令,这也是为了让重写日志尽可能多地记录最新的操作,提供更加完整的操作记录。

最后,我们再来看下重写子进程和主进程间用来传递 ACK 信息的两个管道的使用。

ACK 管道的使用

刚才在介绍主进程调用 aofCreatePipes 函数创建管道时,你就了解到了,主进程会在 aof_pipe_read_ack_from_child 管道描述符上注册读事件。这个描述符对应了重写子进程向主进程发送 ACK 信息的管道。同时,这个描述符是一个读描述符,表示主进程从管道中读取 ACK 信息。

其实,重写子进程在执行 rewriteAppendOnlyFile 函数时,这个函数在完成日志重写,以及多次向父进程读取操作命令后,就会调用 write 函数,向 aof_pipe_write_ack_to_parent 描述符对应的管道中写入“!”,这就是重写子进程向主进程发送 ACK 信号,让主进程停止发送收到的新写操作。这个过程如下所示:

int rewriteAppendOnlyFile(char *filename) {
    ...
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    ...
}

一旦重写子进程向主进程发送 ACK 信息的管道中有了数据,aof_pipe_read_ack_from_child 管道描述符上注册的读事件就会被触发,也就是说,这个管道中有数据可以读取了。那么,aof_pipe_read_ack_from_child 管道描述符上,注册的回调函数 aofChildPipeReadable(在 aof.c 文件中)就会执行。

这个函数会判断从 aof_pipe_read_ack_from_child 管道描述符读取的数据是否是“!”,如果是的话,那它就会调用 write 函数,往 aof_pipe_write_ack_to_child 管道描述符上写入“!”,表示主进程已经收到重写子进程发送的 ACK 信息,同时它会给重写子进程回复一个 ACK 信息。这个过程如下所示:

/* This event handler is called when the AOF rewriting child sends us a
 * single '!' char to signal we should stop sending buffer diffs. The
 * parent sends a '!' as well to acknowledge. */
// 当 AOF 重写子进程向我们发送一个“!”字符时,将调用此事件处理程序表示我们应该停止发送缓冲区差异。父进程也发送一个“!”作为确认收到
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

好了,到这里,我们就了解了,重写子进程在完成日志重写后,是先给主进程发送 ACK 信息。然后主进程在 aof_pipe_read_ack_from_child 描述符上监听读事件发生,并调用 aofChildPipeReadable 函数向子进程发送 ACK 信息。

最后,重写子进程执行的 rewriteAppendOnlyFile 函数,会调用 syncRead 函数,从 aof_pipe_read_ack_from_parent 管道描述符上,读取主进程发送给它的 ACK 信息,如下所示:

int rewriteAppendOnlyFile(char *filename) {
...
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1  || byte != '!') goto werr
...
}

下图也展示了 ACK 管道的使用过程,你可以再回顾下。

这样一来,重写子进程和主进程之间就通过两个 ACK 管道,相互确认重写过程结束了。

小结

今天这节课,我主要给你介绍了在 AOF 重写过程中,主进程和重写子进程间的管道通信。这里,你需要重点关注管道机制的使用,以及主进程和重写子进程使用管道通信的过程。

在这个过程中,AOF 重写子进程和主进程是使用了一个操作命令传输管道和两个 ACK 信息发送管道

  • 操作命令传输管道是用于主进程写入收到的新操作命令,以及用于重写子进程读取操作命令,而 ACK 信息发送管道是在重写结束时,重写子进程和主进程用来相互确认重写过程的结束。最后,重写子进程会进一步将收到的操作命令记录到重写日志文件中。

这样一来,AOF 重写过程中主进程收到的新写操作,就不会被遗漏了。因为一方面,这些新写操作会被记录在正常的 AOF 日志中,另一方面,主进程会将新写操作缓存在 aof_rewrite_buf_blocks 数据块列表中,并通过管道发送给重写子进程。这样,就能尽可能地保证重写日志具有最新、最完整的写操作了。

最后,我也再提醒你一下,今天这节课我们学习的管道其实属于匿名管道,是用在父子进程间进行通信的。如果你在实际开发中,要在非父子进程的两个进程间进行通信,那么你就需要用到命名管道了。而命名管道会以一个文件的形式保存在文件系统中,并会有相应的路径和文件名。这样,非父子进程的两个进程通过命名管道的路径和文件名,就可以打开管道进行通信了。

问题:这节课,我给你介绍了重写子进程和主进程间进行操作命令传输、ACK 信息传递用的三个管道。那么,你在 Redis 源码中还能找到其他使用管道的地方吗?

这道题目,是希望你能更多地了解下管道在 Redis 中的应用。有不少同学都找到了多个使用管道的地方,我在这里总结下。

  • 首先,创建 RDB、AOF 重写和主从复制时会用到管道。

在 RDB 文件的创建函数 rdbSaveBackground、AOF 重写的函数 rewriteAppendOnlyFileBackground,以及把 RDB 通过 socket 传给从节点的函数 rdbSaveToSlavesSockets 中,它们都会调用 openChildInfoPipe 函数,创建一个管道 child_info_pipe,这个管道的描述符数组,保存在了全局变量 server 中。

当 RDB 创建结束或是 AOF 文件重写结束后,这两个函数会调用 sendChildInfo 函数,通过刚才创建的管道 child_info_pipe,把子进程写时复制的实际数据量发送给父进程。

下面的代码展示了 rdbSaveBackgroundrewriteAppendOnlyFileBackgroundrdbSaveToSlavesSockets 这三个函数使用管道的主要代码,你可以看下。

  • rdbSaveBackground
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
  openChildInfoPipe();
  if ((childpid = fork()) == 0) {
  server.child_info_data.cow_size = private_dirty; //记录实际的写时复制数据量
  sendChildInfo(CHILD_INFO_TYPE_RDB); //将写时复制数据量发送给父进程
  }
}
  • rdbSaveToSlavesSockets
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
    openChildInfoPipe();
    if ((childpid = fork()) == 0) {
        server.child_info_data.cow_size = private_dirty; //记录实际的写时复制数据量
        sendChildInfo(CHILD_INFO_TYPE_RDB); //将写时复制数据量发送给父进程
    }
}
  • rewriteAppendOnlyFileBackground
int rewriteAppendOnlyFileBackground(void) {
    openChildInfoPipe();  //创建管道
    if ((childpid = fork()) == 0) {
    if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
        server.child_info_data.cow_size = private_dirty; //记录实际写时复制的数据量
        sendChildInfo(CHILD_INFO_TYPE_AOF); //将写时复制的数据量发送给父进程
    } 
}

此外,在刚才介绍的 rdbSaveToSlavesSockets 函数中,它还会创建一个管道。当子进程把数据传给从节点后,子进程会使用这个管道,向父进程发送成功接收到所有数据传输的从节点 ID,你可以看看下面的代码。

int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
    if (pipe(pipefds) == -1) return C_ERR;
    server.rdb_pipe_read_result_from_child = pipefds[0];  //创建管道读端
    server.rdb_pipe_write_result_to_parent = pipefds[1]; //创建管道写端
    if ((childpid = fork()) == 0) {
    //数据传输完成后,通过管道向父进程传输从节点ID
    if (*len == 0 || write(server.rdb_pipe_write_result_to_parent,msg,msglen) != msglen) {
    }
}
  • 其次,Redis module 运行时会用到管道。

在 module 的初始化函数 moduleInitModulesSystem 中,它会创建一个管道 module_blocked_pipe,这个管道会用来唤醒由于处理 module 命令而阻塞的客户端。下面的代码展示了管道在 Redis module 中的使用,你可以看下。

void moduleInitModulesSystem(void) {
    ...
    if (pipe(server.module_blocked_pipe) == -1) {...} //创建管道
    ...}
    int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
    ...
    if (write(server.module_blocked_pipe[1],"A",1) != 1) {...} //向管道中写入“A”字符,表示唤醒被module阻塞的客户端
    ...}
    void moduleHandleBlockedClients(void) {
    ...
    while (read(server.module_blocked_pipe[0],buf,1) == 1); //从管道中读取字符
}
  • 最后,linuxMadvFreeForkBugCheck 函数会用到管道。

基于 arm64 架构的 Linux 内核有一个 Bug,这个 Bug 可能会导致数据损坏。而 Redis 源码就针对这个 Bug,打了一个补丁,这个补丁在 main 函数的执行过程中,会调用 linuxMadvFreeForkBugCheck 函数,这个函数会 fork 一个子进程来判断是否发现 Bug,而子进程会使用管道来和父进程交互检查结果。你也可以具体看下修复这个 Bug 的补丁

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
7月前
|
存储 C++
【C++】多态(重写)的实现过程及其原理【核心知识点精讲】(22)
【C++】多态(重写)的实现过程及其原理【核心知识点精讲】(22)
|
测试技术 iOS开发 数据格式
WDA原理分析
1、什么是WDA WebDriverAgent是Facebook 在17年的 SeleniumConf 大会上推出了一款新的iOS移动测试框架。 下面摘录一段官方对于WebDriverAgent的介绍字段:(官方文档:https://github.com/facebook/WebDriverAgent) WebDriverAgent 在 iOS 端实现了一个 WebDriver server ,借助这个 server 我们可以远程控制 iOS 设备。
12097 0
|
7月前
|
Java 编译器
还没搞懂重写和重载吗?这篇文章可以帮助你
还没搞懂重写和重载吗?这篇文章可以帮助你
49 1
|
XML 缓存 Java
十一.Spring源码剖析-事务源码之@Transactionl解析
在上一章我们分析了Spring的AOP的源码,本篇文章是对事务的源码分析,我们都知道事务的管理是基于AOP实现的,所以有了上一篇的铺垫这一章会比较简单一点。 事务的源码我会分两章写,一张写Transcational的解析,一张写事务的执行流程。先上一个图,待会儿可以根据这个图来看源码
|
NoSQL Linux Redis
从源码阅读AOF重写-下篇(一)
从源码阅读AOF重写-下篇
164 0
|
NoSQL 调度 Redis
从源码阅读AOF重写-上篇
从源码阅读AOF重写-上篇
145 0
|
缓存 NoSQL Redis
redis灵魂拷问:聊一聊AOF日志重写
redis灵魂拷问:聊一聊AOF日志重写
258 0
redis灵魂拷问:聊一聊AOF日志重写
|
XML 缓存 Java
面试官:讲讲Spring框架Bean的加载过程
面试官:讲讲Spring框架Bean的加载过程
927 0
面试官:讲讲Spring框架Bean的加载过程
|
缓存 NoSQL Redis
持久化-AOF 重写工作原理|学习笔记
快速学习持久化-AOF 重写工作原理
持久化-AOF 重写工作原理|学习笔记
|
存储 设计模式 缓存
一直在使用JDK动态代理, 不明白原理如何实现?
一直在使用JDK动态代理, 不明白原理如何实现?
一直在使用JDK动态代理, 不明白原理如何实现?