创建共享对象
redis 非常注重内存消耗的,有些常用的对象,采用引用计数的方式进行复用
createSharedObjects(); /* Shared command responses */ shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n")); shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n")); shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n")); shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n")); shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n")); shared.space = createObject(OBJ_STRING,sdsnew(" ")); shared.colon = createObject(OBJ_STRING,sdsnew(":")); shared.plus = createObject(OBJ_STRING,sdsnew("+"));
根据系统限制调整你打开的文件数
adjustOpenFilesLimit();
网络模型 reactor 初始化
redis 支持 Unix 和 tcp 两种模型,当服务端和客户端都在本机的时候, Unix 域套接字更快,因为不需要协议头解析等
const char *clk_msg = monotonicInit(); serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg); server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ if (server.port != 0 && listenToPort(server.port,&server.ipfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); exit(1); } if (server.tls_port != 0 && listenToPort(server.tls_port,&server.tlsfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port); exit(1); } /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); anetCloexec(server.sofd); }
LRU 策略中过期池初始化
/* Create a new eviction pool. */ void evictionPoolAlloc(void) { struct evictionPoolEntry *ep; int j; ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE); for (j = 0; j < EVPOOL_SIZE; j++) { ep[j].idle = 0; ep[j].key = NULL; ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE); ep[j].dbid = 0; } EvictionPoolLRU = ep; }
初始化 rdb 和 aof 信息
server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_pipe_conns = NULL; server.rdb_pipe_numconns = 0; server.rdb_pipe_numconns_writing = 0; server.rdb_pipe_buff = NULL; server.rdb_pipe_bufflen = 0; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; server.child_info_nread = 0; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0;
初始化状态信息
resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.stat_current_cow_bytes = 0; server.stat_current_cow_updated = 0; server.stat_current_save_keys_processed = 0; server.stat_current_save_keys_total = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; server.stat_module_cow_bytes = 0; server.stat_module_progress = 0; for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; server.cron_malloc_stats.allocator_active = 0; server.cron_malloc_stats.allocator_resident = 0; server.lastbgsave_status = C_OK; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0;
注册事件
网络编程中,包括三类事件:文件事件、定时时间和信号事件
/* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { serverPanic("Unrecoverable error creating TLS socket accept handler."); } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " "blocked clients subsystem."); }
这里采用回调的方式来注册事件、结合 IO 复用技术时间高效的网络模型。
- 复制初始化等等
- 一些 lua 虚拟机脚本添加
慢日志初始化
/* Initialize the slow log. This function should be called a single time * at server startup. */ void slowlogInit(void) { server.slowlog = listCreate(); server.slowlog_entry_id = 0; listSetFreeMethod(server.slowlog,slowlogFreeEntry); }
后台任务线程创建
这里后台线程主要包含三类
- 异步关闭文件描述符
- AOF 异步刷盘
- 过期键异步删除
/* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }
lua 脚本
其他
外部模块加载
void moduleLoadFromQueue(void) { listIter li; listNode *ln; listRewind(server.loadmodule_queue,&li); while((ln = listNext(&li))) { struct moduleLoadQueueEntry *loadmod = ln->value; if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc) == C_ERR) { serverLog(LL_WARNING, "Can't load module from %s: server aborting", loadmod->path); exit(1); } }
磁盘加载数据
/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; errno = 0; /* Prevent a stale value from affecting error checking */ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ replicationCacheMasterUsingMyself(); selectDb(server.cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }
事件循环前准备
aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep);
beforeSleep
beforeSleep 包括一下几个步骤:
- 运行 fast cycle 模式,进行过期处理
- 向所有从节点发送 ack 请求
- 接阻塞从节点
- 处理阻塞的客户端
- 将 AOF 刷盘
- 处理挂起的请求
代码如下:
void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); size_t zmalloc_used = zmalloc_used_memory(); if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; /* Just call a subset of vital functions in case we are re-entering * the event loop from processEventsWhileBlocked(). Note that in this * case we keep track of the number of events we are processing, since * processEventsWhileBlocked() wants to stop ASAP if there are no longer * events to handle. */ if (ProcessingEventsWhileBlocked) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); processed += tlsProcessPendingData(); processed += handleClientsWithPendingWrites(); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; return; } /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */ if (server.cluster_enabled) clusterBeforeSleep(); /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Unblock all the clients blocked for synchronous replication * in WAIT. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); /* Check if there are clients unblocked by modules that implement * blocking commands. */ if (moduleCount()) moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. * * We also don't send the ACKs while clients are paused, since it can * increment the replication backlog, they'll be sent after the pause * if we are still the master. */ if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) { robj *argv[3]; argv[0] = shared.replconf; argv[1] = shared.getack; argv[2] = shared.special_asterick; /* Not used argument. */ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); server.get_ack_from_slaves = 0; } /* We may have recieved updates from clients about their current offset. NOTE: * this can't be done where the ACK is recieved since failover will disconnect * our clients. */ updateFailoverStatus(); /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); /* Write the AOF buffer on disk */ if (server.aof_state == AOF_ON) flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); /* Try to process blocked clients every once in while. Example: A module * calls RM_SignalKeyAsReady from within a timer callback (So we don't * visit processCommand() at all). */ handleClientsBlockedOnKeys(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ if (moduleCount()) moduleReleaseGIL(); /* Do NOT add anything below moduleReleaseGIL !!! */ }
进入事件主循环
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 写:sendReplyToClient // 读:readQueryFromClient
aeProcessEvents
中包含网络时间以及定时事件,这两类时间通过 IO 复用很好的继承在一起。
请求整体流程
参考与引用
- 《Redis 设计与实现》 黄健宏
- 图片来源于网络