事件的类型
I/O 多路复用程序可以监听多个套接字的 ae.h/AE_READABLE
事件和 ae.h/AE_WRITABLE
事件,这两类事件和套接字操作之间的对应关系如下:
- 当套接字变得可读时(客户端对套接字执行 write 操作,或者执行 close 操作),或者有新的可应答(acceptable)套接字出现时(客户端对服务器的监听套接字执行connect操作),套接字产生 AE_READABLE 事件。
- 当套接字变得可写时(客户端对套接字执行 read 操作),套接字产生AE_WRITABLE事件。
如果套接字同时可读可写,那么服务器先读套接字,后写套接字。
文件事件处理器
1、连接应答处理器
networking.c/acceptTcpHandler
函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为 sys/socket.h/accept
函数的包装。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } anetCloexec(cfd); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }
2、命令请求处理器
networking.c/readQueryFromClient
函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容,具体实现为 unistd.h/read
函数的包装。
void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; /* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; atomicIncr(server.stat_net_input_bytes, nread); if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); }
3、命令回复处理器
networking.c/sendReplyToClient
函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write
函数的包装。
/* Write event handler. Just send data to the client. */ void sendReplyToClient(connection *conn) { client *c = connGetPrivateData(conn); writeToClient(c,1); }
定时事件
实际上redis支持的是周期任务事件,即执行完之后不会删除,而是在重新插入链表。
定时器采用链表的方式进行管理,新定时任务插入链表表头。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); }
具体定时事件处理如下:
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; monotime now = getMonotonicUs(); //删除定时器 while(te) { long long id; /* Remove events scheduled for deletion. */ // 下一轮中事件进行删除 if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue; } /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } if (te->when <= now) { int retval; id = te->id; te->refcount++; // timeProc 返回值 retval 为事件事件执行的间隔 retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { // 如果超时,那么标记为删除 te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; }
参考资料
- 《Scalable IO in Java》Doug Lea
- 《Redis 设计与实现》 黄健宏