Redis Server启动过程
今天,我们先来学习下 Redis server 的启动过程。
我们知道,main 函数是 Redis 整个运行程序的入口,并且 Redis 实例在运行时,也会从这个 main 函数开始执行。同时,由于 Redis 是典型的 Client-Server 架构,一旦 Redis 实例开始运行,Redis server 也就会启动,而 main 函数其实也会负责 Redis server 的启动运行。
其中,Redis 运行的基本控制逻辑是在server.c文件中完成的,而 main 函数就是在 server.c 中。
你可以掌握 Redis 针对以下三个问题的实现思路:
- Redis server 启动后具体会做哪些初始化操作?
- Redis server 初始化时有哪些关键配置项?
- Redis server 如何开始处理客户端请求?
好了,接下来,我们先从 main 函数开始,来了解下它在 Redis server 中的设计实现思路。
main 函数:Redis server 的入口一般来说,一个使用 C 开发的系统软件启动运行的代码逻辑,都是实现在了 main 函数当中,所以在正式了解 Redis 中 main 函数的实现之前,我想先给你分享一个小 Tips,就是你在阅读学习一个系统的代码时,可以先找下 main 函数,看看它的执行过程。那么,对于 Redis 的 main 函数来说,我把它执行的工作分成了五个阶段。
阶段一:基本初始化
在这个阶段,main 函数主要是完成一些基本的初始化工作,包括设置 server 运行的时区、设置哈希函数的随机种子等。这部分工作的主要调用函数如下所示:
// 设置时区 setlocale(LC_COLLATE,""); tzset(); /* Populates 'timezone' global. */ zmalloc_set_oom_handler(redisOutOfMemoryHandler); srand(time(NULL)^getpid()); srandom(time(NULL)^getpid()); gettimeofday(&tv,NULL); init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid()); crc64_init(); /* Store umask value. Because umask(2) only offers a set-and-get API we have * to reset it and restore it back. We do this early to avoid a potential * race condition with threads that could be creating files or directories. */ umask(server.umask = umask(0777)); // 设置随机种子 uint8_t hashseed[16]; getRandomBytes(hashseed,sizeof(hashseed)); dictSetHashFunctionSeed(hashseed);
这里,你需要注意的是,在 main 函数的开始部分,有一段宏定义覆盖的代码。这部分代码的作用是,如果定义了 REDIS_TEST 宏定义,并且 Redis server 启动时的参数符合测试参数,那么 main 函数就会执行相应的测试程序。
这段宏定义的代码如以下所示,其中的示例代码就是调用 ziplist 的测试函数 ziplistTest:
#ifdef REDIS_TEST // 如果启动参数有test和ziplist,那么就调用ziplistTest函数进行ziplist的测试 if (argc >= 3 && !strcasecmp(argv[1], "test")) { int accurate = 0; for (j = 3; j < argc; j++) { if (!strcasecmp(argv[j], "--accurate")) { accurate = 1; } } if (!strcasecmp(argv[2], "all")) { int numtests = sizeof(redisTests)/sizeof(struct redisTest); for (j = 0; j < numtests; j++) { redisTests[j].failed = (redisTests[j].proc(argc,argv,accurate) != 0); } /* Report tests result */ int failed_num = 0; for (j = 0; j < numtests; j++) { if (redisTests[j].failed) { failed_num++; printf("[failed] Test - %s\n", redisTests[j].name); } else { printf("[ok] Test - %s\n", redisTests[j].name); } } printf("%d tests, %d passed, %d failed\n", numtests, numtests-failed_num, failed_num); return failed_num == 0 ? 0 : 1; } else { redisTestProc *proc = getTestProcByName(argv[2]); if (!proc) return -1; /* test not found */ return proc(argc,argv,accurate); } return 0; } #endif
阶段二:检查哨兵模式,并检查是否要执行 RDB 检测或 AOF 检测
Redis server 启动后,可能是以哨兵模式运行的,而哨兵模式运行的 server 在参数初始化、参数设置,以及 server 启动过程中要执行的操作等方面,与普通模式 server 有所差别。所以,main 函数在执行过程中需要根据 Redis 配置的参数,检查是否设置了哨兵模式。如果有设置哨兵模式的话,main 函数会调用 initSentinelConfig 函数,对哨兵模式的参数进行初始化设置,以及调用 initSentinel 函数,初始化设置哨兵模式运行的 server。有关哨兵模式运行的 Redis server 相关机制,我会在后面详细介绍。下面的代码展示了 main 函数中对哨兵模式的检查,以及对哨兵模式的初始化,你可以看下:
/* We need to init sentinel right now as parsing the configuration file * in sentinel mode will have the effect of populating the sentinel * data structures with master nodes to monitor. */ //判断server是否设置为哨兵模式 if (server.sentinel_mode) { //初始化哨兵的配置 initSentinelConfig(); //初始化哨兵模式 initSentinel(); }
除了检查哨兵模式以外,main 函数还会检查是否要执行 RDB 检测或 AOF 检查,这对应了实际运行的程序是 redis-check-rdb 或 redis-check-aof。在这种情况下,main 函数会调用 redis_check_rdb_main 函数或 redis_check_aof_main 函数,检测 RDB 文件或 AOF 文件。你可以看看下面的代码,其中就展示了 main 函数对这部分内容的检查和调用:
/* Check if we need to start in redis-check-rdb/aof mode. We just execute * the program main. However the program is part of the Redis executable * so that we can easily execute an RDB check on loading errors. */ // 如果运行的是redis-check-rdb程序,调用redis_check_rdb_main函数检测RDB文件 if (strstr(argv[0],"redis-check-rdb") != NULL) redis_check_rdb_main(argc,argv,NULL); //如果运行的是redis-check-aof程序,调用redis_check_aof_main函数检测AOF文件 else if (strstr(argv[0],"redis-check-aof") != NULL) redis_check_aof_main(argc,argv);
阶段三:运行参数解析
在这一阶段,main 函数会对命令行传入的参数进行解析,并且调用 loadServerConfig 函数,对命令行参数和配置文件中的参数进行合并处理,然后为 Redis 各功能模块的关键参数设置合适的取值,以便 server 能高效地运行。
/* Load the server configuration from the specified filename. * The function appends the additional configuration directives stored * in the 'options' string to the config file before loading. * * Both filename and options can be NULL, in such a case are considered * empty. This way loadServerConfig can be used to just load a file or * just load a string. */ void loadServerConfig(char *filename, char config_from_stdin, char *options) { sds config = sdsempty(); char buf[CONFIG_MAX_LINE+1]; FILE *fp; /* Load the file content */ // 加载文件内容 if (filename) { if ((fp = fopen(filename,"r")) == NULL) { serverLog(LL_WARNING,"Fatal error, can't open config file '%s': %s", filename, strerror(errno)); exit(1); } while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); fclose(fp); } // 追加从命令行输入的参数 /* Append content from stdin */ if (config_from_stdin) { serverLog(LL_WARNING,"Reading config from stdin"); fp = stdin; while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); } /* Append the additional options */ // 追加额外的命令 if (options) { config = sdscat(config,"\n"); config = sdscat(config,options); } loadServerConfigFromString(config); sdsfree(config); }
阶段四:初始化 server
在完成对运行参数的解析和设置后,main 函数会调用 initServer 函数,对 server 运行时的各种资源进行初始化工作。这主要包括了 server 资源管理所需的数据结构初始化、键值对数据库初始化、server 网络框架初始化等。
void initServer(void) { int j; signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); makeThreadKillable(); if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); } // 从配置系统设置默认值后初始化 /* Initialization after setting defaults from the config system. */ server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; server.hz = server.config_hz; server.pid = getpid(); server.in_fork_child = CHILD_TYPE_NONE; server.main_thread_id = pthread_self(); server.current_client = NULL; server.errors = raxNew(); server.fixed_time_expire = 0; server.clients = listCreate(); server.clients_index = raxNew(); server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); server.clients_timeout_table = raxNew(); server.replication_allowed = 1; server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.client_pause_type = 0; server.paused_clients = listCreate(); server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); server.blocked_last_cron = 0; server.blocking_op_nesting = 0; if ((server.tls_port || server.tls_replication || server.tls_cluster) && tlsConfigure(&server.tls_ctx_config) == C_ERR) { serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info."); exit(1); } createSharedObjects(); adjustOpenFilesLimit(); const char *clk_msg = monotonicInit(); serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg); server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", strerror(errno)); exit(1); } server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 打开用户命令的TCP监听套接字 /* Open the TCP listening socket for the user commands. */ if (server.port != 0 && listenToPort(server.port,&server.ipfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port); exit(1); } if (server.tls_port != 0 && listenToPort(server.tls_port,&server.tlsfd) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port); exit(1); } // 打开监听Unix的套接字 /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); anetCloexec(server.sofd); } // 如果没有监听的socket那就放弃退出 /* Abort if there are no listening sockets at all. */ if (server.ipfd.count == 0 && server.tlsfd.count == 0 && server.sofd < 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } // 创建Redis数据库,并初始化数据库的内部状态 /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&dbExpiresDictType,NULL); server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } // 初始化LRU键的pool evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = dictCreate(&keylistDictType,NULL); server.cronloops = 0; server.in_eval = 0; server.in_exec = 0; server.propagate_in_transaction = 0; server.client_pause_in_transaction = 0; server.child_pid = -1; server.child_type = CHILD_TYPE_NONE; server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_pipe_conns = NULL; server.rdb_pipe_numconns = 0; server.rdb_pipe_numconns_writing = 0; server.rdb_pipe_buff = NULL; server.rdb_pipe_bufflen = 0; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; server.child_info_nread = 0; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.stat_current_cow_bytes = 0; server.stat_current_cow_updated = 0; server.stat_current_save_keys_processed = 0; server.stat_current_save_keys_total = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; server.stat_module_cow_bytes = 0; server.stat_module_progress = 0; for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; server.cron_malloc_stats.allocator_active = 0; server.cron_malloc_stats.allocator_resident = 0; server.lastbgsave_status = C_OK; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) { serverPanic("Unrecoverable error creating TLS socket accept handler."); } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " "blocked clients subsystem."); } /* Register before and after sleep handlers (note this needs to be done * before loading persistence since it is used by processEventsWhileBlocked. */ aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { serverLog(LL_WARNING, "Can't open the append-only file: %s", strerror(errno)); exit(1); } } /* 32 bit instances are limited to 4GB of address space, so if there is * no explicit limit in the user provided configuration we set a limit * at 3 GB using maxmemory with 'noeviction' policy'. This avoids * useless crashes of the Redis instance for out of memory. */ if (server.arch_bits == 32 && server.maxmemory == 0) { serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = MAXMEMORY_NO_EVICTION; } if (server.cluster_enabled) clusterInit(); replicationScriptCacheInit(); scriptingInit(1); slowlogInit(); latencyMonitorInit(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); }
而在调用完 initServer 后,main 函数还会再次判断当前 server 是否为哨兵模式。如果是哨兵模式,main 函数会调用 sentinelIsRunning 函数,设置启动哨兵模式。
/* This function gets called when the server is in Sentinel mode, started, * loaded the configuration, and is ready for normal operations. */ // 当服务器处于Sentinel模式时调用此函数,已加载配置,并已准备好进行正常操作。 void sentinelIsRunning(void) { int j; /* If this Sentinel has yet no ID set in the configuration file, we * pick a random one and persist the config on disk. From now on this * will be this Sentinel ID across restarts. */ // 如果这个哨兵还没有在配置文件中设置ID,我们随机选取一个并将配置持久保存在磁盘上。从现在开始,将在重新启动时使用此哨兵ID。 for (j = 0; j < CONFIG_RUN_ID_SIZE; j++) if (sentinel.myid[j] != 0) break; if (j == CONFIG_RUN_ID_SIZE) { /* Pick ID and persist the config. */ getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE); sentinelFlushConfig(); } /* Log its ID to make debugging of issues simpler. */ serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid); /* We want to generate a +monitor event for every configured master * at startup. */ // 我们希望在启动时为每个配置的主机生成一个监视器事件 sentinelGenerateInitialMonitorEvents(); }
否则的话,main 函数会调用 loadDataFromDisk 函数,从磁盘上加载 AOF 或者是 RDB 文件,以便恢复之前的数据。
/* Function called at startup to load RDB or AOF file in memory. */ void loadDataFromDisk(void) { long long start = ustime(); // 如果AOF配置开启的话 if (server.aof_state == AOF_ON) { // 加载AOF配置 if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } // 否则没有开启AOF,加载RDB文件 else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; errno = 0; /* Prevent a stale value from affecting error checking */ if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ // 从RDB文件还原复制 ID/偏移量 if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ // 如果我们是一个从机,那么从这些信息创建一个缓存的主机,以便允许部分从机重新同步主机。 replicationCacheMasterUsingMyself(); selectDb(server.cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }
阶段五:执行事件驱动框架
为了能高效处理高并发的客户端连接请求,Redis 采用了事件驱动框架,来并发处理不同客户端的连接和读写请求。所以,main 函数执行到最后时,会调用 aeMain 函数进入事件驱动框架,开始循环处理各种触发的事件。我把刚才介绍的五个阶段涉及到的关键操作,画在了下面的图中,你可以再回顾下。
那么,在这五个阶段当中,阶段三、四和五其实就包括了 Redis server 启动过程中的关键操作。所以接下来,我们就来依次学习下这三个阶段中的主要工作。
Redis 运行参数解析与设置
我们知道,Redis 提供了丰富的功能,既支持多种键值对数据类型的读写访问,还支持数据持久化保存、主从复制、切片集群等。而这些功能的高效运行,其实都离不开相关功能模块的关键参数配置。
举例来说,Redis 为了节省内存,设计了内存紧凑型的数据结构来保存 Hash、Sorted Set 等键值对类型。但是在使用了内存紧凑型的数据结构之后,如果往数据结构存入的元素个数过多或元素过大的话,键值对的访问性能反而会受到影响。因此,为了平衡内存使用量和系统访问性能,我们就可以通过参数,来设置和调节内存紧凑型数据结构的使用条件。
也就是说,掌握这些关键参数的设置,可以帮助我们提升 Redis 实例的运行效率。
不过,Redis 的参数有很多,我们无法在一节课中掌握所有的参数设置。所以下面,我们可以先来学习下 Redis 的主要参数类型,这样就能对各种参数形成一个全面的了解。同时,我也会给你介绍一些和 server 运行关系密切的参数及其设置方法,以便你可以配置好这些参数,让 server 高效运行起来。
Redis 的主要参数类型
首先,Redis 运行所需的各种参数,都统一定义在了server.h文件的 redisServer 结构体中。根据参数作用的范围,我把各种参数划分为了七大类型,包括通用参数、数据结构参数、网络参数、持久化参数、主从复制参数、切片集群参数、性能优化参数。具体你可以参考下面表格中的内容。
这样,如果你能按照上面的划分方法给 Redis 参数进行归类,那么你就可以发现,这些参数实际和 Redis 的主要功能机制是相对应的。所以,如果你要深入掌握这些参数的典型配置值,你就需要对相应功能机制的工作原理有所了解。
好,现在我们就了解了 Redis 的七大参数类型,以及它们基本的作用范围,那么下面我们就接着来学习下,Redis 是如何针对这些参数进行设置的。