/* Show some info about non-empty databases */ run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { serverLog(LL_VERBOSE,“DB %d: %lld keys (%lld volatile) in %lld slots HT.”,j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } /* Show information about connected clients */ if (!server.sentinel_mode) { run_with_period(5000) { serverLog(LL_DEBUG, “%lu clients connected (%lu replicas), %zu bytes in use”, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } /* We need to do a few operations on clients asynchronously. */ clientsCron(); /* Handle background operations on Redis databases. */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while • a BGSAVE was in progress. */ if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (hasActiveChildProcess() || ldbPendingChildren()) { checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if • we have to save/rewrite now. */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, • the given amount of seconds, and if the latest bgsave was • successful or if, in case of an error, at least • CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,“%d changes in %d seconds. Saving…”, sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { serverLog(LL_NOTICE,“Starting automatic rewriting of AOF on %lld%% growth”,growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync • completed. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and • clear the AOF error in case of success to make the DB writable again, • however to try every second is enough in case of ‘hz’ is set to • an higher frequency. */ run_with_period(1000) { if (server.aof_last_write_status == C_ERR) flushAppendOnlyFile(0); } /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don’t check return value, just use the side effect.*/ /* Replication cron function – used to reconnect to master, • detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ if (server.sentinel_mode) sentinelTimer(); /* Cleanup expired MIGRATE cached sockets. */ run_with_period(1000) { migrateCloseTimedoutSockets(); } /* Stop the I/O threads if we don’t have enough pending work. */ stopThreadedIOIfNeeded(); /* Resize tracking keys table if needed. This is also done at every • command execution, but we want to be sure that if the last command • executed changes the value via CONFIG SET, the server will perform • the operation even if completely idle. */ if (server.tracking_clients) trackingLimitUsedSlots(); /* Start a scheduled BGSAVE if the corresponding flag is set. This is • useful when we are forced to postpone a BGSAVE because an AOF • rewrite is in progress. • • Note: this code must be after the replicationCron() call above so • make sure when refactoring this file to keep this order. This is useful • because we want to give priority to RDB savings for replication. */ if (!hasActiveChildProcess() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) server.rdb_bgsave_scheduled = 0; } /* Fire the cron loop modules event. */ RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, 0, &ei); server.cronloops++; return 1000/server.hz; }
后台生成RDB文件
rdb 通过 rdbSaveBackground 函数负责在后台生成 RDB 文件(bigsave的底层也是这个),创建一个子进程(前面那个函数最终也会调用到这里),由子进程将数据快照保存到磁盘中,父进程继续该干嘛干嘛。
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; if (hasActiveChildProcess()) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); openChildInfoPipe(); if ((childpid = redisFork()) == 0) { int retval; /* Child */ redisSetProcTitle(“redis-rdb-bgsave”); redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSave(filename,rsi); if (retval == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_RDB, “RDB”); } exitFromChild((retval == C_OK) ? 0 : 1); } else { /* Parent */ if (childpid == -1) { closeChildInfoPipe(); server.lastbgsave_status = C_ERR; serverLog(LL_WARNING,“Can’t save in background: fork: %s”, strerror(errno)); return C_ERR; } serverLog(LL_NOTICE,“Background saving started by pid %d”,childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; return C_OK; } return C_OK; /* unreached */ }
生成RDB文件
上面的函数最终会执行如下代码(这个代码属于save命令,后面再说关于这个命令):
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */ int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; rio rdb; int error = 0; snprintf(tmpfile,256,“temp-%d.rdb”, (int) getpid()); fp = fopen(tmpfile,“w”); if (!fp) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Failed opening the RDB file %s (in server root dir %s) " “for saving: %s”, filename, cwdp ? cwdp : “unknown”, strerror(errno)); return C_ERR; } rioInitWithFile(&rdb,fp); startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } /* Make sure data will not remain on the OS’s output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only • if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Error moving temp DB file %s on the final " “destination %s (in server root dir %s): %s”, tmpfile, filename, cwdp ? cwdp : “unknown”, strerror(errno)); unlink(tmpfile); stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,“DB saved on disk”); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,“Write error saving DB on disk: %s”, strerror(errno)); fclose(fp); unlink(tmpfile); stopSaving(0); return C_ERR; }
如果在生产环节中直接使用save,会导致主进程长时间阻塞,所以不应在生产环节中使用该命令。
将redis数据写入RDB文件中
上面那个函数最终乎调用到这个函数(真实一环扣一环呀):
/* Produces a dump of the database in RDB format sending it to the specified