【Redis源码分析专题】(1)从本质分析你写入Redis中的数据为什么不见了?

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 【Redis源码分析专题】(1)从本质分析你写入Redis中的数据为什么不见了?

Redis数据库介绍


Redis作为一个成熟的数据存储中间件,它提供了完善的数据管理功能,比如之前我们提到过的数据过期和今天我们要讲的数据淘汰(evict)策略。



数据的局部性原理


贯穿计算机学科的原理局部性原理,这里可以明确告诉你,局部性原理在缓存场景有这样两种现象,


  1. 最新的数据下次被访问的概率越高。


  1. 被访问次数越多的数据下次被访问的概率越高。 这里我们可以简单认为被访问的概率越高价值越大。


基于上述两种现象,我们可以指定出两种策略


  1. 淘汰掉最早未被访问的数据,LRU(Least Recently Used)。
  2. 淘汰掉访被访问频次最低的数据,LFU(Least Frequently Used)。


除了LRU和LFU之外,还可以随机淘汰。这就是将数据一视同仁,随机选取一部分淘汰。实际上Redis实现了以上3中策略,你使用时可以根据具体的数据配置某个淘汰策略。


除了上述三种策略外,Redis还为由过期时间的数据提供了按TTL淘汰的策略,其实就是淘汰剩余TTL中最小的数据。另外需要注意的是Redis的淘汰策略可以配置在全局或者是有过期时间的数据上。




Redis的问题背景


我们有时候会遇到这样的事情,当想redis写入一些数据后,再次查询发现数据不见了,这是怎么回事呢?数据明明过期了,怎么还占用着内存?


  • 知道Redis主要是基于内存来进行高性能、高并发的读写操作的。
  • 然而内存是有限的,比如 redis 就只能用 10G,你要是往里面写了 20G 的数据,会咋办?当然会干掉 10G 的数据,然后就保留 10G 的数据了。那干掉哪些数据?保留哪些数据?这就要根据设置的redis的淘汰机制来选择了。数据明明过期了,竟然还占用这内存,这些都是由 redis 的过期策略来决定。



Redis的过期策略


Redis的过期策略包括两种,分别是定期删除和惰性删除:


  • 定期删除:指的是Redis默认是每隔100ms就随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除。(不能完全删除)
  • 惰性删除:直接查询数据的时候,redis会先查看一些这个数据是否已经过期,如果过期,就进行删除。(不能完全删除数据)



Redis的删除内存策略


定期删除和惰性删除都存在着一些问题,如果定期删除漏掉了很多过期key,然后你也没及时去查,也就没走惰性删除,此时有可能导致大量过期 key 堆积在内存里,导致redis 内存块耗尽了


Redis内存淘汰机制


Redis中数据淘汰实际上是指的在内存空间不足时,清理掉某些数据以节省内存空间。 虽然Redis已经有了过期的策略,它可以清理掉有效期外的数据。


如果过期的数据被清理之后存储空间还是不够怎么办?是不是还可以再删除掉一部分数据? 在缓存这种场景下 这个问题的答案是可以,因为这个数据即便在Redis中找不到,也可以从被缓存的数据源中找到。


所以在某些特定的业务场景下,我们是可以丢弃掉Redis中部分旧数据来给新数据腾出空间。



内存淘汰机制包括以下几种方式:


众所周知,redis是一个内存数据库,所有的键值对都是存储在内存中。当数据变多之后,由于内存有限就要淘汰一些键值对,使得内存有足够的空间来保存新的键值对。在redis中,通过设置server.maxmemory来限定内存的使用(server.maxmemory为0,不限制内存),到达server.maxmemory就会触发淘汰机制。


  • noeviction: 当内存不足以容纳新写入数据时,新写入操作会报错,一般不用。
  • Allkeys-lru: 当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的 key(最常用)。
  • allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个 key(很少用)。
  • volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的 key(很少用)。
  • volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。
  • volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。



设置内存淘汰机制的方式:


在redis.conf中:


  • maxmemory 100mb 最大内存设置,如果0代表无限 ;
  • maxmemory-policy: Allkeys-lru


Redis在每次执行客户端的命令的时候都会检查使用内存是否超过server.maxmemory,如果超过就进行淘汰数据。

int processCommand(client *c) {
  ……//server.maxmemory为0,表示对内存没有限制
  if (server.maxmemory) {
  //判断内存,进行内存淘汰
        int retval = freeMemoryIfNeeded();
        ……
    }
    ……
}
复制代码


evict何时执行


在Redis每次处理命令的时候,都会检查内存空间,并尝试去执行evict,因为有些情况下不需要执行evict,这个可以从isSafeToPerformEvictions中可以看出端倪。

static int isSafeToPerformEvictions(void) {
    /* 没有lua脚本执行超时,也没有在做数据超时 */
    if (server.lua_timedout || server.loading) return 0;
    /* 只有master才需要做evict */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return 0;
    /* 当客户端暂停时,不需要evict,因为数据是不会变化的 */
    if (checkClientPauseTimeoutAndReturnIfPaused()) return 0;
    return 1;
}
复制代码



执行回收驱逐操作

int performEvictions(void) {
    if (!isSafeToPerformEvictions()) return EVICT_OK;
    int keys_freed = 0;
    size_t mem_reported, mem_tofree;
    long long mem_freed; /* May be negative */
    mstime_t latency, eviction_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = EVICT_FAIL;
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return EVICT_OK;
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        return EVICT_FAIL;  /* We need to free memory, but policy forbids. */
    unsigned long eviction_time_limit_us = evictionTimeLimitUs();
    mem_freed = 0;
    latencyStartMonitor(latency);
    monotime evictionTimer;
    elapsedStart(&evictionTimer);
    while (mem_freed < (long long)mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;
            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;
                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. 
                 * 先从dict中采样key并放到pool中 */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */
                /* 从pool中选择最适合淘汰的key. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;
                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }
                    /* 从淘汰池中移除. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;
                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }
        /* volatile-random and allkeys-random 策略 */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* 当随机淘汰时,我们用静态变量next_db来存储当前执行到哪个db了*/
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }
        /* 从dict中移除被选中的key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /*我们单独计算db*Delete()释放的内存量。实际上,在AOF和副本传播所需的内存可能大于我们正在释放的内存(删除key)
            ,如果我们考虑这点的话会很绕。由signalModifiedKey生成的CSC失效消息也是这样。
            因为AOF和输出缓冲区内存最终会被释放,所以我们只需要关心key空间使用的内存即可。*/
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;
            if (keys_freed % 16 == 0) {
                /*当要释放的内存开始足够大时,我们可能会在这里花费太多时间,不可能足够快地将数据传送到副本,因此我们会在循环中强制传输。*/
                if (slaves) flushSlavesOutputBuffers();
                /*通常我们的停止条件是释放一个固定的,预先计算的内存量。但是,当我们*在另一个线程中删除对象时,
                最好不时*检查是否已经达到目标*内存,因为“mem\u freed”量只在dbAsyncDelete()调用中*计算,
                而线程可以*一直释放内存。*/
                if (server.lazyfree_lazy_eviction) {
                    if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                        break;
                    }
                }
                /*一段时间后,尽早退出循环-即使尚未达到内存限制*。如果我们突然需要释放大量的内存,不要在这里花太多时间。*/
                if (elapsedUs(evictionTimer) > eviction_time_limit_us) {
                    // We still need to free memory - start eviction timer proc
                    if (!isEvictionProcRunning) {
                        isEvictionProcRunning = 1;
                        aeCreateTimeEvent(server.el, 0,
                                evictionTimeProc, NULL, NULL);
                    }
                    break;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    /* at this point, the memory is OK, or we have reached the time limit */
    result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK;
cant_free:
    if (result == EVICT_FAIL) {
        /* At this point, we have run out of evictable items.  It's possible
         * that some items are being freed in the lazyfree thread.  Perform a
         * short wait here if such jobs exist, but don't wait long.  */
        if (bioPendingJobsOfType(BIO_LAZY_FREE)) {
            usleep(eviction_time_limit_us);
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = EVICT_OK;
            }
        }
    }
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}
复制代码



释放资源如果在需要时候
int freeMemoryIfNeeded(void) {
    //获取redis内存使用
    mem_reported = zmalloc_used_memory();
    if (mem_reported <= server.maxmemory) return C_OK; 
    mem_used = mem_reported;
    if (slaves) {
        listRewind(server.slaves,&li);
        //减去slaves的output缓冲区
        while((ln = listNext(&li))) {
            ……
        }
    }
     //aof的缓冲区的内存使用
    if (server.aof_state != AOF_OFF) {
        mem_used -= sdslen(server.aof_buf);
        mem_used -= aofRewriteBufferSize();
    }
    /* Check if we are still over the memory limit. */
    if (mem_used <= server.maxmemory) return C_OK;
    /* Compute how much memory we need to free. */
    mem_tofree = mem_used - server.maxmemory;
    mem_freed = 0;
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* 禁止驱逐数据 */
    //进行数据驱逐
    while (mem_freed < mem_tofree) {
      ……
      sds bestkey = NULL;
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {   //进行ttl或者lru淘汰机制
            struct evictionPoolEntry *pool = EvictionPoolLRU;
            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        //pool根据机制构建的evictionPool
                    }
                }/*在evictionPool中从后往前选择一个还在存在数据库中的键值进行驱逐*/
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;
                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }
                    ……
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {   /* 从db->dict或者db->expires随机选择一个键值对进行淘汰*/
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }//驱逐选中的键值对
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            delta = (long long) zmalloc_used_memory();
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            decrRefCount(keyobj);
            keys_freed++;
            if (slaves) flushSlavesOutputBuffers();
        }
    }
    return C_OK;
cant_free://进行内存空间的惰性释放
    while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
        if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
            break;
        usleep(1000);
    }
    return C_ERR;
}
复制代码



根据淘汰机制从随机选取的键值对中选取键值对构建evictionPool


  • 1)LRU数据淘汰机制:在数据集中随机选取几个键值对,选择lru最大的一部分键值对构建evictionPool。


LRU的本质是淘汰最久没被访问的数据,有种实现方式是用链表的方式实现,如果数据被访问了就把它移到链表头部,那么链尾一定是最久未访问的数据,但是单链表的查询时间复杂度是O(n),所以一般会用hash表来加快查询数据,比如Java中LinkedHashMap就是这么实现的。但Redis并没有采用这种策略,Redis就是单纯记录了每个Key最近一次的访问时间戳,通过时间戳排序的方式来选找出最早的数据,当然如果把所有的数据都排序一遍,未免也太慢了,所以Redis是每次选一批数据,然后从这批数据执行淘汰策略。这样的好处就是性能高,坏处就是不一定是全局最优,只是达到局部最优。


在redisObject中有个24位的lru字段,这24位保存了数据访问的时间戳(秒),当然24位无法保存完整的unix时间戳,不到200天就会有一个轮回,当然这已经足够了。


robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                val->lru = LRU_CLOCK();  // 这里更新LRU时间戳  
            }
        }
        return val;
    } else {
        return NULL;
    }
}
复制代码


  • 2)LFU数据淘汰机制:在数据集中随机选取几个键值对,选择lfu最小的一部分键值对构建evictionPool。


lru这个字段也会被lfu用到,所以你在上面lookupkey中可以看到在使用lfu策略是也会更新lru。Redis中lfu的出现稍晚一些,是在Redis 4.0才被引入的,所以这里复用了lru字段。 lru的实现思路只有一种,就是记录下key被访问的次数。但实现lru有个问题需要考虑到,虽然LFU是按访问频次来淘汰数据,但在Redis中随时可能有新数据就来,本身老数据可能有更多次的访问,新数据当前被访问次数少,并不意味着未来被访问的次数会少,如果不考虑到这点,新数据可能一就来就会被淘汰掉,这显然是不合理的。


Redis为了解决上述问题,将24位被分成了两部分,高16位的时间戳(分钟级),低8位的计数器。每个新数据计数器初始有一定值,这样才能保证它能走出新手村,然后计数值会随着时间推移衰减,这样可以保证老的但当前不常用的数据才有机会被淘汰掉,我们来看下具体实现代码。


计数器只有8个二进制位,充其量数到255,怎么会够? 当然Redis使用的不是精确计数,而是近似计数。具体实现就是counter概率性增长,counter的值越大增长速度越慢,具体增长逻辑如下:

/* 更新lfu的counter,counter并不是一个准确的数值,而是概率增长,counter的数值越大其增长速度越慢
 * 只能反映出某个时间窗口的热度,无法反映出具体访问次数 */
uint8_t LFULogIncr(uint8_t counter) {
    if (counter == 255) return 255;
    double r = (double)rand()/RAND_MAX;
    double baseval = counter - LFU_INIT_VAL; // LFU_INIT_VAL为5
    if (baseval < 0) baseval = 0;
    double p = 1.0/(baseval*server.lfu_log_factor+1);  // server.lfu_log_factor可配置,默认是10 
    if (r < p) counter++;
    return counter;
}
复制代码


LFU计数器衰减:如果说counter一直增长,即便增长速度很慢也有一天会增长到最大值255,最终导致无法做数据的筛选,所以要给它加一个衰减策略,思路就是counter随时间增长衰减,具体代码如下:

/* lfu counter衰减逻辑, lfu_decay_time是指多久counter衰减1,比如lfu_decay_time == 10
 * 表示每10分钟counter衰减一,但lfu_decay_time为0时counter不衰减 */
unsigned long LFUDecrAndReturn(robj *o) {
    unsigned long ldt = o->lru >> 8;
    unsigned long counter = o->lru & 255;
    unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
        counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}
复制代码


  • 3)TTL数据淘汰机制:从设置过期时间的数据集中随机选取几个键值对,选择TTL最大的一部分键值对构建evictionPool。
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];
    //从数据集sampledict随机选取键值对
    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        de = samples[j];
        key = dictGetKey(de);
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);//LRU机制,计算lru值
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            idle = 255-LFUDecrAndReturn(o);//LFU机制,计算lfu值
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            idle = ULLONG_MAX - (long)dictGetVal(de);//TTL机制,计算ttl值
        }
        k = 0;
        //根据idle从小到大将键值对插入到pool(插入排序的机制),但只保留idle最大的EVPOOL_SIZE个
        while (k < EVPOOL_SIZE &&pool[k].key &&pool[k].idle < idle) 
          k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* Inserting into empty position. No setup needed before insert. */
        } else {
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                k--;
                sds cached = pool[0].cached; /* Save SDS before overwriting. */
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}




相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5天前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供了 8 种数据淘汰策略,分为淘汰易失数据和淘汰全库数据两大类。易失数据淘汰策略包括:volatile-lru、volatile-lfu、volatile-ttl 和 volatile-random;全库数据淘汰策略包括:allkeys-lru、allkeys-lfu 和 allkeys-random。此外,还有 no-eviction 策略,禁止驱逐数据,当内存不足时新写入操作会报错。
34 16
|
25天前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
35 1
场景题:百万数据插入Redis有哪些实现方案?
|
24天前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
29 1
|
5天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
33 14
|
5天前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
29 13
|
5天前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
28 11
|
5天前
|
监控 NoSQL 测试技术
【赵渝强老师】Redis的AOF数据持久化
Redis 是内存数据库,提供数据持久化功能,支持 RDB 和 AOF 两种方式。AOF 以日志形式记录每个写操作,支持定期重写以压缩文件。默认情况下,AOF 功能关闭,需在 `redis.conf` 中启用。通过 `info` 命令可监控 AOF 状态。AOF 重写功能可有效控制文件大小,避免性能下降。
|
5天前
|
存储 监控 NoSQL
【赵渝强老师】Redis的RDB数据持久化
Redis 是内存数据库,提供数据持久化功能以防止服务器进程退出导致数据丢失。Redis 支持 RDB 和 AOF 两种持久化方式,其中 RDB 是默认的持久化方式。RDB 通过在指定时间间隔内将内存中的数据快照写入磁盘,确保数据的安全性和恢复能力。RDB 持久化机制包括创建子进程、将数据写入临时文件并替换旧文件等步骤。优点包括适合大规模数据恢复和低数据完整性要求的场景,但也有数据完整性和一致性较低及备份时占用内存的缺点。
|
17天前
|
缓存 监控 NoSQL
Redis 缓存穿透的检测方法与分析
【10月更文挑战第23天】通过以上对 Redis 缓存穿透检测方法的深入探讨,我们对如何及时发现和处理这一问题有了更全面的认识。在实际应用中,我们需要综合运用多种检测手段,并结合业务场景和实际情况进行分析,以确保能够准确、及时地检测到缓存穿透现象,并采取有效的措施加以解决。同时,要不断优化和改进检测方法,提高检测的准确性和效率,为系统的稳定运行提供有力保障。
46 5
|
1月前
|
消息中间件 缓存 NoSQL
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
50 2