在上篇文章中,我给你介绍了 AOF 重写过程,其中我带你重点了解了 AOF 重写的触发时机,以及 AOF 重写的基本执行流程。现在你已经知道,AOF 重写是通过重写子进程来完成的。
但是在上述的最后,我也提到了在 AOF 重写时,主进程仍然在接收客户端写操作,那么这些新写操作会记录到 AOF 重写日志中吗?如果需要记录的话,重写子进程又是通过什么方式向主进程获取这些写操作的呢?
接下来,我就来带你了解下 AOF 重写过程中所使用的管道机制,以及主进程和重写子进程的交互过程。这样一方面,你就可以了解 AOF 重写日志包含的写操作的完整程度,当你要使用 AOF 日志恢复 Redis 数据库时,就知道 AOF 能恢复到的程度是怎样的。另一方面,因为 AOF 重写子进程就是通过操作系统提供的管道机制,来和 Redis 主进程交互的,所以学完这节课之后,你还可以掌握管道技术,从而用来实现进程间的通信。
好了,接下来,我们就先来了解下管道机制。
如何使用管道进行父子进程间通信?
首先我们要知道,当进程 A 通过调用 fork 函数创建一个子进程 B,然后进程 A 和 B 要进行通信时,我们通常都需要依赖操作系统提供的通信机制,而管道(pipe)就是一种用于父子进程间通信的常用机制。
具体来说,管道机制在操作系统内核中创建了一块缓冲区,父进程 A 可以打开管道,并往这块缓冲区中写入数据。同时,子进程 B 也可以打开管道,从这块缓冲区中读取数据。这里,你需要注意的是,进程每次往管道中写入数据时,只能追加写到缓冲区中当前数据所在的尾部,而进程每次从管道中读取数据时,只能从缓冲区的头部读取数据。
其实,管道创建的这块缓冲区就像一个先进先出的队列一样,写数据的进程写到队列尾部,而读数据的进程则从队列头读取。下图就展示了两个进程使用管道进行数据通信的过程,你可以看下。
好了,了解了管道的基本功能后,我们再来看下使用管道时需要注意的一个关键点。管道中的数据在一个时刻只能向一个方向流动,这也就是说,如果父进程 A 往管道中写入了数据,那么此时子进程 B 只能从管道中读取数据。类似的,如果子进程 B 往管道中写入了数据,那么此时父进程 A 只能从管道中读取数据。而如果父子进程间需要同时进行数据传输通信,我们就需要创建两个管道了。
下面,我们就来看下怎么用代码实现管道通信。这其实是和操作系统提供的管道的系统调用 pipe 有关,pipe 的函数原型如下所示:
int pipe(int pipefd[2]);
你可以看到,pipe 的参数是一个数组 pipefd,表示的是管道的文件描述符。这是因为进程在往管道中写入或读取数据时,其实是使用 write 或 read 函数的,而 write 和 read 函数需要通过文件描述符才能进行写数据和读数据操作。
数组 pipefd 有两个元素 pipefd[0]和 pipefd[1],分别对应了管道的读描述符和写描述符。这也就是说,当进程需要从管道中读数据时,就需要用到 pipefd[0],而往管道中写入数据时,就使用 pipefd[1]。
这里我写了一份示例代码,展示了父子进程如何使用管道通信,你可以看下。其中注意,这份代码需要到linux上才能运行,因为Linux平台间的进程通信才有pipe管道的说法。
#include <sys/types.h> #include <unistd.h> #include <iostream> #include <cstring> using namespace std; int main() { int fd[2], nr = 0, nw = 0; char buf[128]; pid_t pid; pipe(fd); pid = fork(); if(pid == 0) { //子进程调用read从fd[0]描述符中读取数据 printf("child process wait for message\n"); nr = read(fd[0], buf, sizeof(buf)); printf("child process receive %s\n", buf); }else{ //父进程调用write往fd[1]描述符中写入数据 printf("parent process send message\n"); strcpy(buf, "Hello from parent"); nw = write(fd[1], buf, sizeof(buf)); printf("parent process send %d bytes to child.\n", nw); } return 0; }
从代码中,你可以看到,在父子进程进行管道通信前,我们需要在代码中定义用于保存读写描述符的数组 fd,然后调用 pipe 系统创建管道,并把数组 fd 作为参数传给 pipe 函数。紧接着,在父进程的代码中,父进程会调用 write 函数往管道文件描述符 fd[1]中写入数据,另一方面,子进程调用 read 函数从管道文件描述符 fd[0]中读取数据。
这里,为了便于你理解,我也画了一张图,你可以参考。
好了,现在你就了解了如何使用管道来进行父子进程的通信了。那么下面,我们就来看下在 AOF 重写过程中,重写子进程是如何用管道和主进程(也就是它的父进程)进行通信的。
AOF 重写子进程如何使用管道和父进程交互?
我们先来看下在 AOF 重写过程中,都创建了几个管道。
这实际上是 AOF 重写函数 rewriteAppendOnlyFileBackground 在执行过程中,通过调用 aofCreatePipes 函数来完成的,如下所示:
int rewriteAppendOnlyFileBackground(void) { … if (aofCreatePipes() != C_OK) return C_ERR; … }
这个 aofCreatePipes 函数是在aof.c文件中实现的,它的逻辑比较简单,可以分成三步。这个函数代码如下所示,你可以看下。
/* Create the pipes used for parent - child process IPC during rewrite. * We have a data pipe used to send AOF incremental diffs to the child, * and two other pipes used by the children to signal it finished with * the rewrite so no more data should be written, and another for the * parent to acknowledge it understood this new condition. */ // 在重写期间创建用于父子进程 IPC 的管道。我们有一个数据管道用于向子节点发送 AOF 增量差异, // 子节点使用另外两个管道来表示重写完成,因此不应写入更多数据,另一个管道用于父节点确认它理解这个新条件. int aofCreatePipes(void) { int fds[6] = {-1, -1, -1, -1, -1, -1}; int j; if (pipe(fds) == -1) goto error; /* parent -> children data. */ if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */ if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */ /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; server.aof_pipe_write_ack_to_parent = fds[3]; server.aof_pipe_read_ack_from_child = fds[2]; server.aof_pipe_write_ack_to_child = fds[5]; server.aof_pipe_read_ack_from_parent = fds[4]; server.aof_stop_sending_diff = 0; return C_OK; error: serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s", strerror(errno)); for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]); return C_ERR; }
- 第一步,aofCreatePipes 函数创建了包含 6 个文件描述符元素的数组 fds。就像我刚才给你介绍的,每一个管道会对应两个文件描述符,所以,数组 fds 其实对应了 AOF 重写过程中要用到的三个管道。紧接着,aofCreatePipes 函数就调用 pipe 系统调用函数,分别创建三个管道。
int aofCreatePipes(void) { int fds[6] = {-1, -1, -1, -1, -1, -1}; int j; if (pipe(fds) == -1) goto error; /* parent -> children data. */ if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */ if (pipe(fds+4) == -1) goto error; ... }
- 第二步,aofCreatePipes 函数会调用 anetNonBlock 函数(在anet.c文件中),将 fds数组的第一和第二个描述符(fds[0]和 fds[1])对应的管道设置为非阻塞。然后,aofCreatePipes 函数会调用 aeCreateFileEvent 函数,在数组 fds 的第三个描述符 (fds[2]) 上注册了读事件的监听,对应的回调函数是 aofChildPipeReadable。aofChildPipeReadable 函数也是在 aof.c 文件中实现的,我稍后会给你详细介绍它。
/* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
这样,在完成了管道创建、管道设置和读事件注册后,最后一步,aofCreatePipes 函数会将数组 fds 中的六个文件描述符,分别复制给 server 变量的成员变量,如下所示:
int aofCreatePipes(void) { … server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; server.aof_pipe_write_ack_to_parent = fds[3]; server.aof_pipe_read_ack_from_child = fds[2]; server.aof_pipe_write_ack_to_child = fds[5]; server.aof_pipe_read_ack_from_parent = fds[4]; … }
在这一步中,我们就可以从 server 变量的成员变量名中,看到 aofCreatePipes 函数创建的三个管道,以及它们各自的用途。
- fds[0]和 fds[1]:对应了主进程和重写子进程间用于传递操作命令的管道,它们分别对应读描述符和写描述符。
- fds[2]和 fds[3]:对应了重写子进程向父进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。
- fds[4]和 fds[5]:对应了父进程向重写子进程发送 ACK 信息的管道,它们分别对应读描述符和写描述符。
下图也展示了 aofCreatePipes 函数的基本执行流程,你可以再回顾下。
好了,了解了 AOF 重写过程中的管道个数和用途后,下面我们再来看下这些管道具体是如何使用的。
操作命令传输管道的使用
实际上,当 AOF 重写子进程在执行时,主进程还会继续接收和处理客户端写请求。这些写操作会被主进程正常写入 AOF 日志文件,这个过程是由 feedAppendOnlyFile 函数(在 aof.c 文件中)来完成。
feedAppendOnlyFile 函数在执行的最后一步,会判断当前是否有 AOF 重写子进程在运行。如果有的话,它就会调用 aofRewriteBufferAppend 函数(在 aof.c 文件中),如下所示:
/* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ // 追加到 AOF 缓冲区,这将会在重新进入事件循环之前刷新到磁盘上,因此在客户端在这之前将获得有关执行操作的肯定回复。 if (server.aof_state == AOF_ON) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
aofRewriteBufferAppend 函数的作用是将参数 buf,追加写到全局变量 server 的 aof_rewrite_buf_blocks 这个列表中。
这里,你需要注意的是,参数 buf 是一个字节数组,feedAppendOnlyFile 函数会将主进程收到的命令操作写入到 buf 中。而 aof_rewrite_buf_blocks 列表中的每个元素是 aofrwblock 结构体类型,这个结构体中包括了一个字节数组,大小是 AOF_RW_BUF_BLOCK_SIZE,默认值是 10MB。此外,aofrwblock 结构体还记录了字节数组已经使用的空间和剩余可用的空间。
以下代码展示了 aofrwblock 结构体的定义,你可以看下。
typedef struct aofrwblock { unsigned long used, free; //buf数组已用空间和剩余可用空间 char buf[AOF_RW_BUF_BLOCK_SIZE]; //宏定义AOF_RW_BUF_BLOCK_SIZE默认为10MB } aofrwblock;
这样一来,aofrwblock 结构体就相当于是一个 10MB 的数据块,记录了 AOF 重写期间主进程收到的命令,而 aof_rewrite_buf_blocks 列表负责将这些数据块连接起来。当 aofRewriteBufferAppend 函数执行时,它会从 aof_rewrite_buf_blocks 列表中取出一个 aofrwblock 类型的数据块,用来记录命令操作。
当然,如果当前数据块中的空间不够保存参数 buf 中记录的命令操作,那么 aofRewriteBufferAppend 函数就会再分配一个 aofrwblock 数据块。
好了,当 aofRewriteBufferAppend 函数将命令操作记录到 aof_rewrite_buf_blocks 列表中之后,它还会检查 aof_pipe_write_data_to_child 管道描述符上是否注册了写事件,这个管道描述符就对应了我刚才给你介绍的 fds[1]。
如果没有注册写事件,那么 aofRewriteBufferAppend 函数就会调用 aeCreateFileEvent 函数,注册一个写事件,这个写事件会监听 aof_pipe_write_data_to_child 这个管道描述符,也就是主进程和重写子进程间的操作命令传输管道。
当这个管道可以写入数据时,写事件对应的回调函数 aofChildWriteDiffData(在 aof.c 文件中)就会被调用执行。这个过程你可以参考下面的代码:
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ // 如果需要分配新块,那么会将数据追加到 AOF 重写缓冲区 void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, try appending * at least some piece into it. */ // 如果我们已经获得了至少一个已分配的块,请尝试在其中至少附加一些块进去。 if (block) { unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } // 第一个要分配的块,或者需要另一个块 if (len) { /* First block to allocate, or need another block. */ int numblocks; block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; listAddNodeTail(server.aof_rewrite_buf_blocks,block); /* Log every time we cross more 10 or 100 blocks, respectively * as a notice or warning. */ // 每次越过 10 或 100 个区块时记录,分别作为通知或警告。 numblocks = listLength(server.aof_rewrite_buf_blocks); if (((numblocks+1) % 10) == 0) { int level = ((numblocks+1) % 100) == 0 ? LL_WARNING : LL_NOTICE; serverLog(level,"Background AOF buffer size: %lu MB", aofRewriteBufferSize()/(1024*1024)); } } } /* Install a file event to send data to the rewrite child if there is * not one already. */ // 检查aof_pipe_write_data_to_child描述符上是否有事件 if (!server.aof_stop_sending_diff && aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { // 如果没有注册事件,那么注册一个写事件,回调函数是aofChildWriteDiffData aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); } }