Redis事务相关源码探究

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis事务相关源码探究

Redis事务相关源码探究


文章目录

Redis事务源码解读

源码地址:https://github.com/redis/redis/tree/7.0/src

从源码来简单分析下 Redis 中事务的实现过程

1、MULTI 声明事务

Redis 中使用 MULTI 命令来声明和开启一个事务

// https://github.com/redis/redis/blob/7.0/src/multi.c#L104
void multiCommand(client *c) {
  // 判断是否已经开启了事务
  // 不持之事务的嵌套
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
  // 设置事务标识
    c->flags |= CLIENT_MULTI;
    addReply(c,shared.ok);
}

1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;

2、给 flags 设置事务标识 CLIENT_MULTI

2、命令入队

开始事务之后,后面所有的命令都会被添加到事务队列中

// https://github.com/redis/redis/blob/7.0/src/multi.c#L59
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) {
    multiCmd *mc;
    // 这里有两种情况的判断  
    // 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误
    // 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
    // 在原commands后面配置空间以存放新命令
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 微信新配置的空间设置执行的命令和参数
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;
    ...
}

入队的时候会做个判断:

1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误;

2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动;

3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CASCLIENT_DIRTY_CAS 是在何时被标记,可继续看下文。

3、EXEC 执行事务

命令入队之后,再来看下事务的提交

// https://github.com/redis/redis/blob/7.0/src/multi.c#L140
void execCommand(client *c) {
    ...
    // 判断下是否开启了事务
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    // 事务中不能 watch 有过期时间的键值
    if (isWatchedKeyExpired(c)) {
        c->flags |= (CLIENT_DIRTY_CAS);
    }
     // 检查是否需要中退出事务,有下面两种情况  
     // 1、 watch 的 key 有变化了
     // 2、命令入队的时候,有语法错误  
    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
        if (c->flags & CLIENT_DIRTY_EXEC) {
            addReplyErrorObject(c, shared.execaborterr);
        } else {
            addReply(c, shared.nullarray[c->resp]);
        }
        // 取消事务
        discardTransaction(c);
        return;
    }
    uint64_t old_flags = c->flags;
    /* we do not want to allow blocking commands inside multi */
    // 事务中不允许出现阻塞命令
    c->flags |= CLIENT_DENY_BLOCKING;
    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    server.in_exec = 1;
    orig_argv = c->argv;
    orig_argv_len = c->argv_len;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    // 循环处理执行事务队列中的命令
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->argv_len = c->mstate.commands[j].argv_len;
        c->cmd = c->realcmd = c->mstate.commands[j].cmd;
        // 权限检查
        int acl_errpos;
        int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
        if (acl_retval != ACL_OK) {
          ...
        } else {
            // 执行命令
            if (c->id == CLIENT_ID_AOF)
                call(c,CMD_CALL_NONE);
            else
                call(c,CMD_CALL_FULL);
            serverAssert((c->flags & CLIENT_BLOCKED) == 0);
        }
        // 命令执行后可能会被修改,需要更新操作
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    // restore old DENY_BLOCKING value
    if (!(old_flags & CLIENT_DENY_BLOCKING))
        c->flags &= ~CLIENT_DENY_BLOCKING;
    // 恢复原命令
    c->argv = orig_argv;
    c->argv_len = orig_argv_len;
    c->argc = orig_argc;
    c->cmd = c->realcmd = orig_cmd;
    // 清除事务
    discardTransaction(c);
    server.in_exec = 0;
}

事务提交的时候,命令的执行逻辑还是比较简单的

1、首先会进行一些检查;

  • 检查事务有没有嵌套;
  • watch 监听的键值是否有变动;
  • 事务中命令入队列的时候,是否有语法错误;

2、循环执行,事务队列中的命令。

通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行

4、WATCH 监听变量

WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

看下 watch 的键值对是如何和客户端进行映射的

// https://github.com/redis/redis/blob/7.0/src/server.h#L918
typedef struct redisDb {
    ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    ...
} redisDb;
// https://github.com/redis/redis/blob/7.0/src/server.h#L1083
typedef struct client {
    ...
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ...
} client;
// https://github.com/redis/redis/blob/7.0/src/multi.c#L262
// 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。   
//
// 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key .
typedef struct watchedKey {
    // 键值
    robj *key;
    // 键值所在的db
    redisDb *db;
    // 客户端
    client *client;
    // 正在监听过期key 的标识
    unsigned expired:1; /* Flag that we're watching an already expired key. */
} watchedKey;

变量映射关系吐下所示

分析完数据结构,看下 watch 的代码实现

// https://github.com/redis/redis/blob/7.0/src/multi.c#L441
void watchCommand(client *c) {
    int j;
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    /* No point in watching if the client is already dirty. */
    if (c->flags & CLIENT_DIRTY_CAS) {
        addReply(c,shared.ok);
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}
// https://github.com/redis/redis/blob/7.0/src/multi.c#L270
/* Watch for the specified key */
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;
    // 检查是否正在 watch 传入的 key 
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    // 没有监听,添加监听的 key 到 db 中的 watched_keys 中
    clients = dictFetchValue(c->db->**watched_keys**,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 添加 key 到 client 中的  watched_keys 中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->client = c;
    wk->db = c->db;
    wk->expired = keyIsExpired(c->db, key);
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
    listAddNodeTail(clients,wk);
}

1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;

2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;

3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。

上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS 标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS 是在何时被标记的呢?

// https://github.com/redis/redis/blob/7.0/src/db.c#L535
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/
// 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。
// 每次DB被刷新时,函数signalFlushDb()被调用。
/* Note that the 'c' argument may be NULL if the key was modified out of
 * a context of a client. */
// 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS
void signalModifiedKey(client *c, redisDb *db, robj *key) {
    touchWatchedKey(db,key);
    trackingInvalidateKey(c,key,1);
}
// https://github.com/redis/redis/blob/7.0/src/multi.c#L348
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
// 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    // 如果 redisDb 中的 watched_keys 为空,直接返回
    if (dictSize(db->watched_keys) == 0) return;
    // 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态  
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        watchedKey *wk = listNodeValue(ln);
        client *c = wk->client;
        if (wk->expired) {
            /* The key was already expired when WATCH was called. */
            if (db == wk->db &&
                equalStringObjects(key, wk->key) &&
                dictFind(db->dict, key->ptr) == NULL)
            {
                /* Already expired key is deleted, so logically no change. Clear
                 * the flag. Deleted keys are not flagged as expired. */
                wk->expired = 0;
                goto skip_client;
            }
            break;
        }
        c->flags |= CLIENT_DIRTY_CAS;
        /* As the client is marked as dirty, there is no point in getting here
         * again in case that key (or others) are modified again (or keep the
         * memory overhead till EXEC). */
         // 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key
        unwatchAllKeys(c);
    skip_client:
        continue;
    }
}

Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS 位,有下面两种情况:

1、每次修改数据库中的一个键值时;

2、每次DB被 flush 时,整个 Redis 的键值被清空;

上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。

几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand 等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
缓存 NoSQL Redis
Redis 事务
10月更文挑战第18天
31 1
|
4月前
|
负载均衡 NoSQL 算法
一天五道Java面试题----第十天(简述Redis事务实现--------->负载均衡算法、类型)
这篇文章是关于Java面试中Redis相关问题的笔记,包括Redis事务实现、集群方案、主从复制原理、CAP和BASE理论以及负载均衡算法和类型。
一天五道Java面试题----第十天(简述Redis事务实现--------->负载均衡算法、类型)
|
4月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
72 6
|
3天前
|
NoSQL Redis
Redis事务长什么样?一文带你全面了解
Redis事务是一组命令的有序队列,通过MULTI、EXEC、WATCH和DISCARD等命令实现原子性操作。事务中的命令在EXEC执行前不会实际运行,而是先进入队列,确保所有命令要么全部成功,要么全部失败。此外,Redis还支持Lua脚本实现类似事务的操作,通常更简单高效。事务适用于购物车结算、秒杀活动、排行榜更新等需要保证数据一致性的场景。
14 0
|
2月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
65 3
|
2月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
30 2
|
2月前
|
NoSQL 关系型数据库 MySQL
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
本文全面阐述了Redis事务的特性、原理、具体命令操作,指出Redis事务具有原子性但不保证一致性、持久性和隔离性,并解释了Redis事务的适用场景和WATCH命令的乐观锁机制。
400 0
Redis 事务特性、原理、具体命令操作全方位诠释 —— 零基础可学习
|
7月前
|
NoSQL 安全 Unix
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅(中)
Redis源码、面试指南(4)单机数据库、持久化、通知与订阅
54 0
|
4月前
|
NoSQL 关系型数据库 Redis
Redis6入门到实战------ 九、10. Redis_事务_锁机制_秒杀
这篇文章深入探讨了Redis事务的概念、命令使用、错误处理机制以及乐观锁和悲观锁的应用,并通过WATCH/UNWATCH命令展示了事务中的锁机制。
Redis6入门到实战------ 九、10. Redis_事务_锁机制_秒杀
|
3月前
|
监控 NoSQL 关系型数据库
9)Redis 居然也有事务
9)Redis 居然也有事务
42 0