其实,刚才我介绍的写事件回调函数 aofChildWriteDiffData,它的主要作用是从 aof_rewrite_buf_blocks 列表中逐个取出数据块,然后通过 aof_pipe_write_data_to_child 管道描述符,将数据块中的命令操作通过管道发给重写子进程,这个过程如下所示:
/* Event handler used to send data to the child process doing the AOF * rewrite. We send pieces of our AOF differences buffer so that the final * write when the child finishes the rewrite will be small. */ // 事件handler 会发数据给子进程去让子进程重写AOF。我们发送我们的 AOF 差异缓冲区的片段,以便子进程完成重写时的最终写入会很小。 void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; UNUSED(el); UNUSED(fd); UNUSED(privdata); UNUSED(mask); while(1) { // 从aof_rewrite_buf_blocks列表中取出数据块 ln = listFirst(server.aof_rewrite_buf_blocks); block = ln ? ln->value : NULL; if (server.aof_stop_sending_diff || !block) { aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child, AE_WRITABLE); return; } if (block->used > 0) { // 调用write将数据块写入主进程和重写子进程间的管道 nwritten = write(server.aof_pipe_write_data_to_child, block->buf,block->used); if (nwritten <= 0) return; memmove(block->buf,block->buf+nwritten,block->used-nwritten); block->used -= nwritten; block->free += nwritten; } if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln); } }
好了,这样一来,你就了解了主进程其实是在正常记录 AOF 日志时,将收到的命令操作写入 aof_rewrite_buf_blocks 列表中的数据块,然后再通过 aofChildWriteDiffData 函数将记录的命令操作通过主进程和重写子进程间的管道发给子进程。
下图也展示了这个过程,你可以再来回顾下。
然后,我们接着来看下重写子进程,是如何从管道中读取父进程发送的命令操作的。
这实际上是由 aofReadDiffFromParent 函数(在 aof.c 文件中)来完成的。这个函数会使用一个 64KB 大小的缓冲区,然后调用 read 函数,读取父进程和重写子进程间的操作命令传输管道中的数据。以下代码也展示了 aofReadDiffFromParent 函数的基本执行流程,你可以看下。
/* This function is called by the child rewriting the AOF file to read * the difference accumulated from the parent into a buffer, that is * concatenated at the end of the rewrite. */ // 子进程调用此函数,重写 AOF 文件,将父进程累积的差异读取到缓冲区中,在重写结束时将其连接起来 ssize_t aofReadDiffFromParent(void) { //管道默认的缓冲区大小 char buf[65536]; /* Default pipe buffer size on most Linux systems. */ ssize_t nread, total = 0; //调用read函数从aof_pipe_read_data_from_parent中读取数据 while ((nread = read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) { server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread); total += nread; } return total; }
那么,从代码中,你可以看到 aofReadDiffFromParent 函数会通过 aof_pipe_read_data_from_parent 描述符读取数据。然后,它会将读取的操作命令追加到全局变量 server 的 aof_child_diff 字符串中。而在 AOF 重写函数 rewriteAppendOnlyFile 的执行过程最后,aof_child_diff 字符串会被写入 AOF 重写日志文件,以便我们在使用 AOF 重写日志时,能尽可能地恢复重写期间收到的操作。
这个 aof_child_diff 字符串写入重写日志文件的过程,你可以参考下面给出的代码:
int rewriteAppendOnlyFile(char *filename) { ... //将aof_child_diff中累积的操作命令写入AOF重写日志文件 if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) goto werr; ... }
所以也就是说,aofReadDiffFromParent 函数实现了重写子进程向主进程读取操作命令。那么在这里,我们还需要搞清楚的问题是:aofReadDiffFromParent 函数会在哪里被调用,也就是重写子进程会在什么时候从管道中读取主进程收到的操作。
其实,aofReadDiffFromParent 函数一共会被以下三个函数调用。
- rewriteAppendOnlyFileRio 函数:这个函数是由重写子进程执行的,它负责遍历 Redis 每个数据库,生成 AOF 重写日志,在这个过程中,它会不时地调用 aofReadDiffFromParent 函数。
- rewriteAppendOnlyFile 函数:这个函数是重写日志的主体函数,也是由重写子进程执行的,它本身会调用 rewriteAppendOnlyFileRio 函数。此外,它在调用完 rewriteAppendOnlyFileRio 函数后,还会多次调用 aofReadDiffFromParent 函数,以尽可能多地读取主进程在重写日志期间收到的操作命令。
- rdbSaveRio 函数:这个函数是创建 RDB 文件的主体函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数。
从这里,我们可以看到,Redis 源码在实现 AOF 重写过程中,其实会多次让重写子进程向主进程读取新收到的操作命令,这也是为了让重写日志尽可能多地记录最新的操作,提供更加完整的操作记录。
最后,我们再来看下重写子进程和主进程间用来传递 ACK 信息的两个管道的使用。
ACK 管道的使用
刚才在介绍主进程调用 aofCreatePipes 函数创建管道时,你就了解到了,主进程会在 aof_pipe_read_ack_from_child 管道描述符上注册读事件。这个描述符对应了重写子进程向主进程发送 ACK 信息的管道。同时,这个描述符是一个读描述符,表示主进程从管道中读取 ACK 信息。
其实,重写子进程在执行 rewriteAppendOnlyFile 函数时,这个函数在完成日志重写,以及多次向父进程读取操作命令后,就会调用 write 函数,向 aof_pipe_write_ack_to_parent 描述符对应的管道中写入“!”,这就是重写子进程向主进程发送 ACK 信号,让主进程停止发送收到的新写操作。这个过程如下所示:
int rewriteAppendOnlyFile(char *filename) { ... if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; ... }
一旦重写子进程向主进程发送 ACK 信息的管道中有了数据,aof_pipe_read_ack_from_child 管道描述符上注册的读事件就会被触发,也就是说,这个管道中有数据可以读取了。那么,aof_pipe_read_ack_from_child 管道描述符上,注册的回调函数 aofChildPipeReadable(在 aof.c 文件中)就会执行。
这个函数会判断从 aof_pipe_read_ack_from_child 管道描述符读取的数据是否是“!”,如果是的话,那它就会调用 write 函数,往 aof_pipe_write_ack_to_child 管道描述符上写入“!”,表示主进程已经收到重写子进程发送的 ACK 信息,同时它会给重写子进程回复一个 ACK 信息。这个过程如下所示:
/* This event handler is called when the AOF rewriting child sends us a * single '!' char to signal we should stop sending buffer diffs. The * parent sends a '!' as well to acknowledge. */ // 当 AOF 重写子进程向我们发送一个“!”字符时,将调用此事件处理程序表示我们应该停止发送缓冲区差异。父进程也发送一个“!”作为确认收到 void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { char byte; UNUSED(el); UNUSED(privdata); UNUSED(mask); if (read(fd,&byte,1) == 1 && byte == '!') { serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs."); server.aof_stop_sending_diff = 1; if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) { /* If we can't send the ack, inform the user, but don't try again * since in the other side the children will use a timeout if the * kernel can't buffer our write, or, the children was * terminated. */ serverLog(LL_WARNING,"Can't send ACK to AOF child: %s", strerror(errno)); } } /* Remove the handler since this can be called only one time during a * rewrite. */ aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE); }
好了,到这里,我们就了解了,重写子进程在完成日志重写后,是先给主进程发送 ACK 信息。然后主进程在 aof_pipe_read_ack_from_child 描述符上监听读事件发生,并调用 aofChildPipeReadable 函数向子进程发送 ACK 信息。
最后,重写子进程执行的 rewriteAppendOnlyFile 函数,会调用 syncRead 函数,从 aof_pipe_read_ack_from_parent 管道描述符上,读取主进程发送给它的 ACK 信息,如下所示:
int rewriteAppendOnlyFile(char *filename) { ... if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || byte != '!') goto werr ... }
下图也展示了 ACK 管道的使用过程,你可以再回顾下。
这样一来,重写子进程和主进程之间就通过两个 ACK 管道,相互确认重写过程结束了。
小结
今天这节课,我主要给你介绍了在 AOF 重写过程中,主进程和重写子进程间的管道通信。这里,你需要重点关注管道机制的使用,以及主进程和重写子进程使用管道通信的过程。
在这个过程中,AOF 重写子进程和主进程是使用了一个操作命令传输管道和两个 ACK 信息发送管道。
- 操作命令传输管道是用于主进程写入收到的新操作命令,以及用于重写子进程读取操作命令,而 ACK 信息发送管道是在重写结束时,重写子进程和主进程用来相互确认重写过程的结束。最后,重写子进程会进一步将收到的操作命令记录到重写日志文件中。
这样一来,AOF 重写过程中主进程收到的新写操作,就不会被遗漏了。因为一方面,这些新写操作会被记录在正常的 AOF 日志中,另一方面,主进程会将新写操作缓存在 aof_rewrite_buf_blocks 数据块列表中,并通过管道发送给重写子进程。这样,就能尽可能地保证重写日志具有最新、最完整的写操作了。
最后,我也再提醒你一下,今天这节课我们学习的管道其实属于匿名管道,是用在父子进程间进行通信的。如果你在实际开发中,要在非父子进程的两个进程间进行通信,那么你就需要用到命名管道了。而命名管道会以一个文件的形式保存在文件系统中,并会有相应的路径和文件名。这样,非父子进程的两个进程通过命名管道的路径和文件名,就可以打开管道进行通信了。
问题:这节课,我给你介绍了重写子进程和主进程间进行操作命令传输、ACK 信息传递用的三个管道。那么,你在 Redis 源码中还能找到其他使用管道的地方吗?
这道题目,是希望你能更多地了解下管道在 Redis 中的应用。有不少同学都找到了多个使用管道的地方,我在这里总结下。
- 首先,创建 RDB、AOF 重写和主从复制时会用到管道。
在 RDB 文件的创建函数 rdbSaveBackground、AOF 重写的函数 rewriteAppendOnlyFileBackground,以及把 RDB 通过 socket 传给从节点的函数 rdbSaveToSlavesSockets 中,它们都会调用 openChildInfoPipe 函数,创建一个管道 child_info_pipe,这个管道的描述符数组,保存在了全局变量 server 中。
当 RDB 创建结束或是 AOF 文件重写结束后,这两个函数会调用 sendChildInfo 函数,通过刚才创建的管道 child_info_pipe,把子进程写时复制的实际数据量发送给父进程。
下面的代码展示了 rdbSaveBackground、rewriteAppendOnlyFileBackground、rdbSaveToSlavesSockets 这三个函数使用管道的主要代码,你可以看下。
- rdbSaveBackground
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { … openChildInfoPipe(); if ((childpid = fork()) == 0) { … server.child_info_data.cow_size = private_dirty; //记录实际的写时复制数据量 sendChildInfo(CHILD_INFO_TYPE_RDB); //将写时复制数据量发送给父进程 … } }
- rdbSaveToSlavesSockets
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { … openChildInfoPipe(); if ((childpid = fork()) == 0) { … server.child_info_data.cow_size = private_dirty; //记录实际的写时复制数据量 sendChildInfo(CHILD_INFO_TYPE_RDB); //将写时复制数据量发送给父进程 … } }
- rewriteAppendOnlyFileBackground
int rewriteAppendOnlyFileBackground(void) { … openChildInfoPipe(); //创建管道 … if ((childpid = fork()) == 0) { … if (rewriteAppendOnlyFile(tmpfile) == C_OK) { … server.child_info_data.cow_size = private_dirty; //记录实际写时复制的数据量 sendChildInfo(CHILD_INFO_TYPE_AOF); //将写时复制的数据量发送给父进程 … } }
此外,在刚才介绍的 rdbSaveToSlavesSockets 函数中,它还会创建一个管道。当子进程把数据传给从节点后,子进程会使用这个管道,向父进程发送成功接收到所有数据传输的从节点 ID,你可以看看下面的代码。
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { … if (pipe(pipefds) == -1) return C_ERR; server.rdb_pipe_read_result_from_child = pipefds[0]; //创建管道读端 server.rdb_pipe_write_result_to_parent = pipefds[1]; //创建管道写端 … if ((childpid = fork()) == 0) { … //数据传输完成后,通过管道向父进程传输从节点ID if (*len == 0 || write(server.rdb_pipe_write_result_to_parent,msg,msglen) != msglen) { … } }
- 其次,Redis module 运行时会用到管道。
在 module 的初始化函数 moduleInitModulesSystem 中,它会创建一个管道 module_blocked_pipe,这个管道会用来唤醒由于处理 module 命令而阻塞的客户端。下面的代码展示了管道在 Redis module 中的使用,你可以看下。
void moduleInitModulesSystem(void) { ... if (pipe(server.module_blocked_pipe) == -1) {...} //创建管道 ...} int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { ... if (write(server.module_blocked_pipe[1],"A",1) != 1) {...} //向管道中写入“A”字符,表示唤醒被module阻塞的客户端 ...} void moduleHandleBlockedClients(void) { ... while (read(server.module_blocked_pipe[0],buf,1) == 1); //从管道中读取字符 }
- 最后,linuxMadvFreeForkBugCheck 函数会用到管道。
基于 arm64 架构的 Linux 内核有一个 Bug,这个 Bug 可能会导致数据损坏。而 Redis 源码就针对这个 Bug,打了一个补丁,这个补丁在 main 函数的执行过程中,会调用 linuxMadvFreeForkBugCheck 函数,这个函数会 fork 一个子进程来判断是否发现 Bug,而子进程会使用管道来和父进程交互检查结果。你也可以具体看下修复这个 Bug 的补丁。