void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (server.active_expire_enabled) { if (iAmMaster()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); } } /* Defrag keys gradually. */ activeDefragCycle(); /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; /* Don't test more DBs than we have. */ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; /* Resize */ for (j = 0; j < dbs_per_call; j++) { tryResizeHashTables(resize_db % server.dbnum); resize_db++; } /* Rehash */ if (server.activerehashing) { for (j = 0; j < dbs_per_call; j++) { int work_done = incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ break; } else { /* If this db didn't need rehash, we'll try the next one. */ rehash_db++; rehash_db %= server.dbnum; } } } } }
这里需要注意的是, 在定时任务中不能话费太长时间,防止其阻塞其他操作
int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ } /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 1; /* already used our millisecond for this loop... */ } return 0; }
rehash 注意点
- rehash 过程中 dictAdd , 只插入 ht_table[1] 中,确保 ht_table[0]. 只减不增。
entry->next = d->ht_table[htidx][index]; d->ht_table[htidx][index] = entry; d->ht_used[htidx]++;
- rehash 过程 dictFind 和 dictDelete , 需要涉及到 0 和 1 两个表。
- 字典默认的 hash 算法是 siphash
/* The default hashing function uses SipHash implementation * in siphash.c. */ uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k);
- redis 在持久化时,服务器这习惯呢拓展操作所3需要的负载因子并不相同,默认为 5 s.
rehash 后数据结构如下所示:
迭代器
redis 的 dict 迭代器分为两种类型,安全迭代器 (safe = 1) 和非安全迭代器(safe = 0)。
安全模式下,支持边遍历边修改(字典 key 的过期判断),支持字典的添加,删除,查找,但是不支持 rehash 操作(避免重复操作)
void keysCommand(client *c) { dictIterator *di; dictEntry *de; sds pattern = c->argv[1]->ptr; int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); di = dictGetSafeIterator(c->db->dict); allkeys = (pattern[0] == '*' && plen == 1); while((de = dictNext(di)) != NULL) { sds key = dictGetKey(de); robj *keyobj; if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { keyobj = createStringObject(key,sdslen(key)); if (!keyIsExpired(c->db,keyobj)) { addReplyBulk(c,keyobj); numkeys++; } decrRefCount(keyobj); } } dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); }
非安全模式下,只支持只读操作,使用字典的删除,添加,查找等方法可能造成不可预期的问题,如重复遍历元素或者漏掉元素,但支持 rehash 操作。
if (iter->safe) iter->d->iterators++; else iter->fingerprint- = dictFingerprint(iter->d); static void _dictRehashStep(dict *d) { if (d->pauserehash == 0) dictRehash(d,1); }
迭代器选择
1、为了避免元素的重复遍历,必须使用得带起的安全模式如 bgaofwrite 以及 bgsave 操作
2、遍历过程需要处理元素,此时也必须使用完全迭代器,比如 keys 命令。
3、允许便利过程中出现个别元素重复时,选择非安全迭代器,比如 scan.