Redis源码分析

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/51534406 这一篇或者说一个系列用来记录Redis相关的一些源码分析,不定时更新。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/51534406

这一篇或者说一个系列用来记录Redis相关的一些源码分析,不定时更新。

目前已添加的内容:

  • Redis之eventloop
  • Redis数据结构之dict

Redis之eventloop

简介

Redis的eventloop实现也是比较平常的,主要关注文件描述符和timer相关事件,而且timer只是简单用一个单链表(O(n)遍历寻找最近触发的时间)实现。

流程

  • 主要在initServer(server.c)中初始化整个eventloop相关的数据结构与回调
// 注册系统timer事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}

// 注册poll fd的接收客户端连接的读事件
for (j = 0; j < server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
  {
    serverPanic(
        "Unrecoverable error creating server.ipfd file event.");
  }
}
// 同上
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
      acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
  • acceptTcpHandler处理客户端请求,分配client结构,注册事件
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(cfd,0,cip);

  • createClient,创建客户端
// receieved a client, alloc client structure 
// and register it into eventpoll
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
  anetNonBlock(NULL,fd);
  anetEnableTcpNoDelay(NULL,fd);
  if (server.tcpkeepalive)
    anetKeepAlive(NULL,fd,server.tcpkeepalive);
  // register read event for client connection
  // the callback handler is readQueryFromClient
  // read into client data buffer
  if (aeCreateFileEvent(server.el,fd,AE_READABLE,
        readQueryFromClient, c) == AE_ERR)
  {
    close(fd);
    zfree(c);
    return NULL;
  }
}
  • client读事件触发,读到buffer,解析client命令
dQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) 
--> processInputBuffer 

// handle query buffer
// in processInputBuffer(c);
if (c->reqtype == PROTO_REQ_INLINE) {
    if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
    if (processMultibulkBuffer(c) != C_OK) break;
} else {
    serverPanic("Unknown request type");
}

/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
    resetClient(c);
} else {
    /* Only reset the client when the command was executed. */
    // handle the client command 
    if (processCommand(c) == C_OK)
        resetClient(c);
    /* freeMemoryIfNeeded may flush slave output buffers. This may result
     * into a slave, that may be the active client, to be freed. */
    if (server.current_client == NULL) break;
}
  • 处理客户端命令
// in processCommand 
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
    c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
    c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
    queueMultiCommand(c);
    addReply(c,shared.queued);
} else {
    // call the cmd 
    // 进入具体数据结构的命令处理
    call(c,CMD_CALL_FULL);
    c->woff = server.master_repl_offset;
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
}

其他注意点

  • 关于timer的实现没有采用优先级队列(O(logn))等其他数据结构,而是直接采用O(n)遍历的单链表,是因为一般来说timer会较少?

Redis数据结构之dict

主要特点

Redis的hashtable实现叫dict,其实现和平常没有太大的区别,唯一比较特殊的地方是每个dict结构内部有两个实际的hashtable结构dictht,是为了实现增量哈希,故名思义,即当第一个dictht到一定负载因子后会触发rehash,分配新的dictht结构的动作和真正的rehash的动作是分离的,并且rehash被均摊到各个具体的操作中去了,这样就不会长时间阻塞线程,因为Redis是单线程。另外,增量hash可以按多步或者持续一定时间做。

主要数据结构

  • dictEntry => hashtable的bucket
  • dictType => 规定操作hashtable的接口
  • dictht => hashtable
  • dict => 对外呈现的”hashtable”
  • dictIterator => 迭代器,方便遍历
// dict.h
// hash table entry 
typedef struct dictEntry {
    void *key;  // key 
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;  // value
    struct dictEntry *next;  // linked list 
} dictEntry;

// operations(APIS) of some type of hashtable
typedef struct dictType {
    // hash function 
    unsigned int (*hashFunction)(const void *key);
    // copy key 
    void *(*keyDup)(void *privdata, const void *key);
    // copy value 
    void *(*valDup)(void *privdata, const void *obj);
    // key comparison 
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    // dtor for key 
    void (*keyDestructor)(void *privdata, void *key);
    // dtor for value 
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
// a hashtable 
typedef struct dictht {
    dictEntry **table;  // entries 
    unsigned long size;  // max size 
    unsigned long sizemask;  // mask 
    unsigned long used;  // current used 
} dictht;

typedef struct dict {
    dictType *type;  // type operations 
    void *privdata;  // for extension 
    dictht ht[2];    // two hashtables 
    // rehashing flag
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // users number 
    unsigned long iterators; /* number of iterators currently running */
} dict;

/* If safe is set to 1 this is a safe iterator, that means, you can call
 * dictAdd, dictFind, and other functions against the dictionary even while
 * iterating. Otherwise it is a non safe iterator, and only dictNext()
 * should be called while iterating. */
typedef struct dictIterator {
    dict *d;
    long index;
    int table, safe;
    dictEntry *entry, *nextEntry;
    /* unsafe iterator fingerprint for misuse detection. */
    long long fingerprint;
} dictIterator;

主要接口

// dict.h
// create
dict *dictCreate(dictType *type, void *privDataPtr);

// expand or initilize the just created dict, alloc second hashtable of dict for incremental rehashing
int dictExpand(dict *d, unsigned long size);

// add, if in rehashing, do 1 step of incremental rehashing
int dictAdd(dict *d, void *key, void *val);
dictEntry *dictAddRaw(dict *d, void *key);

// update, if in rehashing, do 1 step of incremental rehashing
// can we first find and return the entry no matter it is update or add, so 
// we can speed up the update process because no need to do twice find process?
int dictReplace(dict *d, void *key, void *val);
dictEntry *dictReplaceRaw(dict *d, void *key);

// delete if in rehashing, do 1 step of incremental rehashing
int dictDelete(dict *d, const void *key);  // free the memory 
int dictDeleteNoFree(dict *d, const void *key);  // not free the memory

// can we use a double linked list to free the hash table, so speed up?
void dictRelease(dict *d);

// find an entry
dictEntry * dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);

// resize to eh pow of 2 number just >= the used number of slots
int dictResize(dict *d);

// alloc a new iterator
dictIterator *dictGetIterator(dict *d);
// alloc a safe iterator 
dictIterator *dictGetSafeIterator(dict *d);
// next entry 
dictEntry *dictNext(dictIterator *iter);
void dictReleaseIterator(dictIterator *iter);

// random sampling
dictEntry *dictGetRandomKey(dict *d);
unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count);

// get stats info
void dictGetStats(char *buf, size_t bufsize, dict *d);

// murmurhash 
unsigned int dictGenHashFunction(const void *key, int len);
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);

// empty a dict 
void dictEmpty(dict *d, void(callback)(void*));

void dictEnableResize(void);
void dictDisableResize(void);

// do n steps rehashing
int dictRehash(dict *d, int n);
// do rehashing for a ms milliseconds
int dictRehashMilliseconds(dict *d, int ms);

// hash function seed 
void dictSetHashFunctionSeed(unsigned int initval);
unsigned int dictGetHashFunctionSeed(void);

// scan a dict
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);

一些可能优化的地方

  • 在dictReplace中能否统一add和update的查找,无论是add还是update都返回一个entry,用标识表明是add还是update,而不用在update时做两次查找,从而提升update的性能

  • 在release整个dict时,是循环遍历所有头bucket,最坏情况接近O(n),能否用双向的空闲链表优化(当然这样会浪费指针所占空间)

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
72 6
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
65 3
|
7月前
|
NoSQL API Redis
Redis源码(1)基本数据结构(上)
Redis源码(1)基本数据结构
86 2
|
7月前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
53 0
|
4月前
|
Web App开发 前端开发 关系型数据库
基于SpringBoot+Vue+Redis+Mybatis的商城购物系统 【系统实现+系统源码+答辩PPT】
这篇文章介绍了一个基于SpringBoot+Vue+Redis+Mybatis技术栈开发的商城购物系统,包括系统功能、页面展示、前后端项目结构和核心代码,以及如何获取系统源码和答辩PPT的方法。
|
7月前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
129 0
|
4月前
|
NoSQL Redis
redis 6源码解析之 ziplist
redis 6源码解析之 ziplist
36 5
|
5月前
|
消息中间件 存储 NoSQL
Redis数据结构—跳跃表 skiplist 实现源码分析
Redis 是一个内存中的数据结构服务器,使用跳跃表(skiplist)来实现有序集合。跳跃表是一种概率型数据结构,支持平均 O(logN) 查找复杂度,它通过多层链表加速查找,同时保持有序性。节点高度随机生成,最大为 32 层,以平衡查找速度和空间效率。跳跃表在 Redis 中用于插入、删除和按范围查询元素,其内部节点包含对象、分值、后退指针和多个前向指针。Redis 源码中的 `t_zset.c` 文件包含了跳跃表的具体实现细节。
|
7月前
|
NoSQL 安全 算法
Redis源码(1)基本数据结构(中)
Redis源码(1)基本数据结构
73 5
|
7月前
|
存储 NoSQL 算法
Redis源码、面试指南(2)内存编码数据结构(下)
Redis源码、面试指南(2)内存编码数据结构
65 4