Redis源码剖析之数据过期(expire)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis源码剖析之数据过期(expire)

我之前统计过我们线上某redis数据被访问的时间分布,大概90%的请求只会访问最新15分钟的数据,99%的请求访问最新1小时的数据,只有不到千分之一的请求会访问超过1天的数据。我们之前这份数据存了两天(近500g内存数据),如果算上主备的话用掉了120多个Redis实例(一个实例8g内存),光把过期时间从2天改成1天就能省下60多个redis实例,而且对原业务也没有啥太大影响。

当然Redis已经实现了数据过期的自动清理机制,我所要做的只是改下数据写入时的过期时间而已。假设Redis没有数据过期的机制,我们要怎么办? 大概一想就知道很麻烦,仔细想的话还得考虑很多的细节。所以我觉得过期数据在缓存系统中是不起眼但非常重要的功能,除了省事外,它也能帮我们节省很多成本。接下来我们看下Redis中是如何实现数据过期的。

实时清理

众所周知,Redis核心流程是单线程执行的,它基本上是处理完一条请求再出处理另外一条请求,处理请求的过程并不仅仅是响应用户发起的请求,Redis也会做好多其他的工作,当前其中就包括数据的过期。

Redis在读写某个key的时候,它就会调用expireIfNeeded()先判断这个key是否已经过期了,如果已过期,就会执行删除。

复制

int expireIfNeeded(redisDb *db, robj *key) {
    if (!keyIsExpired(db,key)) return 0;
    /* 如果是在slave上下文中运行,直接返回1,因为slave的key过期是由master控制的,
     * master会给slave发送数据删除命令。 
     * 
     * 如果返回0表示数据不需要清理,返回1表示数据这次标记为过期 */
    if (server.masterhost != NULL) return 1;
    if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;
    /* 删除key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                               dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}

判断是否过期也很简单,Redis在dictEntry中存储了上次更新的时间戳,只需要判断当前时间戳和上次更新时间戳之间的gap是否超过设定的过期时间即可。

我们重点来关注下这行代码。

复制

int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                               dbSyncDelete(db,key);

lazyfree_lazy_expire 是Redis的配置项之一,它的作用是是否开启惰性删除(默认不开启),很显然如果开启就会执行异步删除,接下来我们详细说下Redis的惰性删除。

惰性删除

何为惰性删除,从本质上讲惰性删除就是新开一个线程异步处理数据删除的任务。为什么要有惰性删除?众所周知,Redis核心流程是单线程执行,如果某个一步执行特别耗时,会直接影响到Redis的性能,比如删除一个几个G的hash key,那这个实例不直接原地升天。。 针对这种情况,需要新开启一个线程去异步删除,防止阻塞出Redis的主线程,当然Redis实际实现的时候dbAsyncDelete()并不完全是异步,我们直接看代码。

复制

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    /* 从db->expires中删除key,只是删除其指针而已,并没有删除实际值 */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    /*
    * 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为
    * 空就去执行下面的释放逻辑 
    */
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        /* Tells the module that the key has been unlinked from the database. */
        moduleNotifyKeyUnlink(key,val);
        /* lazy_free并不是完全异步的,而是先评估释放操作所需工作量,如果影响较小就直接在主线程中删除了 */
        size_t free_effort = lazyfreeGetFreeEffort(key,val);
        /* 如果释放这个对象需要做大量的工作,就把他放到异步线程里做
         * 但如果这个对象是共享对象(refcount > 1)就不能直接释放了,当然这很少发送,但有可能redis
         * 核心会调用incrRefCount来保护对象,然后调用dbDelete。这我只需要直接调用dictFreeUnlinkedEntry,
         * 等价于调用decrRefCount */
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
            dictSetVal(db->dict,de,NULL);
        }
    }
    /* 释放键值对所占用的内存,如果是lazyFree,val已经是null了,只需要释放key的内存即可 */
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        if (server.cluster_enabled) slotToKeyDel(key->ptr);
        return 1;
    } else {
        return 0;
    }
}
  1. 首先在db->expires中把这个key给删除掉,(db->expires保存了所有带有过期时间数据的key,方便做数据过期)
  2. 然后把这个数据节点从db中摘掉,数据实际还在内存里,只是查不到而已。
  3. 接下来就是要清理数据了,redis并不是直接把清理工作放到异步线程里做,而是调用lazyfreeGetFreeEffort()来评估清理工作对性能的影响,如果影响较小,就直接在主线程里做了。反之影响太大才会将删除的任务提交到异步线程里。
  4. 释放key和val占用的内存空间,如果是异步删除,val已经是null,这里只需要释放key占用的空间即可。

这里第三步中为什么异步删除不完全是异步? 我觉得还是得从异步任务提交bioCreateLazyFreeJob()中一窥端倪。

复制

void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
    va_list valist;
    /* Allocate memory for the job structure and all required
     * arguments */
    struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
    job->free_fn = free_fn;
    va_start(valist, arg_count);
    for (int i = 0; i < arg_count; i++) {
        job->free_args[i] = va_arg(valist, void *);
    }
    va_end(valist);
    bioSubmitJob(BIO_LAZY_FREE, job);
} 
void bioSubmitJob(int type, struct bio_job *job) {
    job->time = time(NULL);
    // 多线程需要加锁,把待处理的job添加到队列末尾
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

我理解,在异步删除的时候需要加锁将异步任务提交到队列里,如果加锁和任务提交所带来的性能影响大于直接删除的影响,那么异步删除还不如同步呢。

定期抽样删除

这里思考下另外一个问题,如果数据写入后就再也没有读写了,是不是实时清理的功能就无法触及到这些数据,然后这些数据就永远都会占用空间。针对这种情况,Redis也实现了定期删除的策略。众所周知,Redis核心流程是单线程执行,所以注定它不能长时间停下来去干某个特定的工作,所以Redis定期清理也是每次只做一点点。

复制

/* 有两种清理模式,快速清理和慢速清理 */
void activeExpireCycle(int type) {
    /* Adjust the running parameters according to the configured expire
     * effort. The default effort is 1, and the maximum configurable effort
     * is 10. */
    unsigned long
    effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
    config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
                           ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,  // 每次抽样的数据量大小 
    config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
                                 ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, // 每次清理的持续时间
    config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
                                  2*effort,  // 最大CPU周期使用率 
    config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
                                    effort; // 可接受的过期数据占比 
    /* This function has some global state in order to continue the work
     * incrementally across calls. */
    static unsigned int current_db = 0; /* Last DB tested. */
    static int timelimit_exit = 0;      /* Time limit hit in previous call? */
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */
    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    long long start = ustime(), timelimit, elapsed;
    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (checkClientPauseTimeoutAndReturnIfPaused()) return;
    // 快速清理 
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
        /* Don't start a fast cycle if the previous cycle did not exit
         * for time limit, unless the percentage of estimated stale keys is
         * too high. Also never repeat a fast cycle for the same period
         * as the fast cycle total duration itself. */
        // 如果上次执行没有触发timelimit_exit, 跳过执行
        if (!timelimit_exit &&
            server.stat_expired_stale_perc < config_cycle_acceptable_stale)
            return;
        // 两个快速清理周期内不执行快速清理
        if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
            return;
        last_fast_cycle = start;
    }
    /* We usually should test CRON_DBS_PER_CALL per iteration, with
     * two exceptions:
     *
     * 1) Don't test more DBs than we have.
     * 2) If last time we hit the time limit, we want to scan all DBs
     * in this iteration, as there is work to do in some DB and we don't want
     * expired keys to use memory for too much time. */
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;
    /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
     * time per iteration. Since this function gets called with a frequency of
     * server.hz times per second, the following is the max amount of
     * microseconds we can spend in this function. 
     * config_cycle_slow_time_perc是清理所能占用的CPU周期数配置,这里将周期数转化为具体的时间 */
    timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;
    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = config_cycle_fast_duration; /* in microseconds. */
    /* Accumulate some global stats as we expire keys, to have some idea
     * about the number of keys that are already logically expired, but still
     * existing inside the database. */
    long total_sampled = 0;
    long total_expired = 0;
    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
        /* Expired and checked in a single loop. */
        unsigned long expired, sampled;
        redisDb *db = server.db+(current_db % server.dbnum);
        /* Increment the DB now so we are sure if we run out of time
         * in the current DB we'll restart from the next. This allows to
         * distribute the time evenly across DBs. */
        current_db++;
        /* Continue to expire if at the end of the cycle there are still
         * a big percentage of keys to expire, compared to the number of keys
         * we scanned. The percentage, stored in config_cycle_acceptable_stale
         * is not fixed, but depends on the Redis configured "expire effort". */
        do {
            unsigned long num, slots;
            long long now, ttl_sum;
            int ttl_samples;
            iteration++;
            /* 如果没有可清理的,直接结束 */
            if ((num = dictSize(db->expires)) == 0) {
                db->avg_ttl = 0;
                break;
            }
            slots = dictSlots(db->expires);
            now = mstime();
            /* 如果slot的填充率小于1%,采样的成本太高,跳过执行,等待下次合适的机会。*/
            if (num && slots > DICT_HT_INITIAL_SIZE &&
                (num*100/slots < 1)) break;
            /* 记录本次采样的数据和其中过期的数量 */
            expired = 0;
            sampled = 0;
            ttl_sum = 0;
            ttl_samples = 0;
            // 每次最多抽样num个 
            if (num > config_keys_per_loop)
                num = config_keys_per_loop;
            /* 这里因为性能考量,我们访问了hashcode的的底层实现,代码和dict.c有些类型,
             * 但十几年内很难改变。 
             * 
             * 注意:hashtable很多特定的地方是空的,所以我们的终止条件需要考虑到已扫描的bucket
             * 数量。 但实际上扫描空bucket是很快的,因为都是在cpu 缓存行里线性扫描,所以可以多
             * 扫一些bucket */
            long max_buckets = num*20;
            long checked_buckets = 0;
            // 这里有采样数据和bucket数量的限制。 
            while (sampled < num && checked_buckets < max_buckets) {
                for (int table = 0; table < 2; table++) {
                    if (table == 1 && !dictIsRehashing(db->expires)) break;
                    unsigned long idx = db->expires_cursor;
                    idx &= db->expires->ht[table].sizemask;
                    dictEntry *de = db->expires->ht[table].table[idx];
                    long long ttl;
                    /* 遍历当前bucket中的所有entry*/
                    checked_buckets++;
                    while(de) {
                        /* Get the next entry now since this entry may get
                         * deleted. */
                        dictEntry *e = de;
                        de = de->next;
                        ttl = dictGetSignedIntegerVal(e)-now;
                        if (activeExpireCycleTryExpire(db,e,now)) expired++;
                        if (ttl > 0) {
                            /* We want the average TTL of keys yet
                             * not expired. */
                            ttl_sum += ttl;
                            ttl_samples++;
                        }
                        sampled++;
                    }
                }
                db->expires_cursor++;
            }
            total_expired += expired;
            total_sampled += sampled;
            /* 更新ttl统计信息 */
            if (ttl_samples) {
                long long avg_ttl = ttl_sum/ttl_samples;
                /* Do a simple running average with a few samples.
                 * We just use the current estimate with a weight of 2%
                 * and the previous estimate with a weight of 98%. */
                if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
                db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
            }
            /* We can't block forever here even if there are many keys to
             * expire. So after a given amount of milliseconds return to the
             * caller waiting for the other active expire cycle. 
             * 不能一直阻塞在这里做清理工作,如果超时了要结束清理循环*/
            if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
                elapsed = ustime()-start;
                if (elapsed > timelimit) {
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
            /*
             * 如果过期key数量超过采样数的10%+effort,说明过期测数量较多,要多清理下,所以
             * 继续循环做一次采样清理。     
             */
        } while (sampled == 0 ||
                 (expired*100/sampled) > config_cycle_acceptable_stale);
    }
    elapsed = ustime()-start;
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. */
    double current_perc;
    if (total_sampled) {
        current_perc = (double)total_expired/total_sampled;
    } else
        current_perc = 0;
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);
}

代码有些长,大致总结下其执行流程,细节见代码注释。

  1. 首先通过配置或者默认值计算出几个参数,这几个参数直接或间接决定了这些执行的终止条件,分别如下。
  • config_keys_per_loop: 每次循环抽样的数据量
  • config_cycle_fast_duration: 快速清理模式下每次清理的持续时间
  • config_cycle_slow_time_perc: 慢速清理模式下每次清理最大消耗CPU周期数(cpu最大使用率)
  • config_cycle_acceptable_stale: 可接受的过期数据量占比,如果本次采样中过期数量小于这个阈值就结束本次清理。
  1. 根据上述参数计算出终止条件的具体值(最大采样数量和超时限制)。
  2. 遍历清理所有的db。
  3. 针对每个db在终止条件的限制下循环清理。

总结

Redis的数据过期策略比较简单,代码也不是特别多,但一如既然处处贯穿者性能的考虑。当然Redis只是提供了这样一个功能,如果想用好的话还得根据具体的业务需求和实际的数据调整过期时间的配置,就好比我在文章开头举的那个例子。

本文是Redis源码剖析系列博文,同时也有与之对应的Redis中文注释版,有想深入学习Redis的同学,欢迎star和关注。

Redis中文注解版仓库:https://github.com/xindoo/Redis

Redis源码剖析专栏:https://zxs.io/s/1h

如果觉得本文对你有用,欢迎一键三连

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供了 8 种数据淘汰策略,分为淘汰易失数据和淘汰全库数据两大类。易失数据淘汰策略包括:volatile-lru、volatile-lfu、volatile-ttl 和 volatile-random;全库数据淘汰策略包括:allkeys-lru、allkeys-lfu 和 allkeys-random。此外,还有 no-eviction 策略,禁止驱逐数据,当内存不足时新写入操作会报错。
62 16
|
2月前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
44 1
场景题:百万数据插入Redis有哪些实现方案?
|
1月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
59 14
|
27天前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
38 5
|
1月前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
49 13
|
1月前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
45 11
|
1月前
|
监控 NoSQL 测试技术
【赵渝强老师】Redis的AOF数据持久化
Redis 是内存数据库,提供数据持久化功能,支持 RDB 和 AOF 两种方式。AOF 以日志形式记录每个写操作,支持定期重写以压缩文件。默认情况下,AOF 功能关闭,需在 `redis.conf` 中启用。通过 `info` 命令可监控 AOF 状态。AOF 重写功能可有效控制文件大小,避免性能下降。
|
1月前
|
存储 监控 NoSQL
【赵渝强老师】Redis的RDB数据持久化
Redis 是内存数据库,提供数据持久化功能以防止服务器进程退出导致数据丢失。Redis 支持 RDB 和 AOF 两种持久化方式,其中 RDB 是默认的持久化方式。RDB 通过在指定时间间隔内将内存中的数据快照写入磁盘,确保数据的安全性和恢复能力。RDB 持久化机制包括创建子进程、将数据写入临时文件并替换旧文件等步骤。优点包括适合大规模数据恢复和低数据完整性要求的场景,但也有数据完整性和一致性较低及备份时占用内存的缺点。
|
2月前
|
消息中间件 缓存 NoSQL
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
59 2
|
2月前
|
存储 数据采集 监控
将百万数据插入到 Redis,有哪些实现方案
【10月更文挑战第15天】将百万数据插入到 Redis 是一个具有挑战性的任务,但通过合理选择实现方案和进行性能优化,可以高效地完成任务。
123 0