Redis事务相关源码探究
文章目录
Redis事务源码解读
源码地址:https://github.com/redis/redis/tree/7.0/src
从源码来简单分析下 Redis 中事务的实现过程
1、MULTI 声明事务
Redis 中使用 MULTI 命令来声明和开启一个事务
// https://github.com/redis/redis/blob/7.0/src/multi.c#L104 void multiCommand(client *c) { // 判断是否已经开启了事务 // 不持之事务的嵌套 if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } // 设置事务标识 c->flags |= CLIENT_MULTI; addReply(c,shared.ok); }
1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;
2、给 flags 设置事务标识 CLIENT_MULTI
。
2、命令入队
开始事务之后,后面所有的命令都会被添加到事务队列中
// https://github.com/redis/redis/blob/7.0/src/multi.c#L59 /* Add a new command into the MULTI commands queue */ void queueMultiCommand(client *c) { multiCmd *mc; // 这里有两种情况的判断 // 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误 // 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) return; // 在原commands后面配置空间以存放新命令 c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); // 微信新配置的空间设置执行的命令和参数 mc = c->mstate.commands+c->mstate.count; mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = c->argv; mc->argv_len = c->argv_len; ... }
入队的时候会做个判断:
1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC
表示入队的时候,命令有语法的错误;
2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS
表示该客户端监听的键值有变动;
3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CAS
,CLIENT_DIRTY_CAS
是在何时被标记,可继续看下文。
3、EXEC 执行事务
命令入队之后,再来看下事务的提交
// https://github.com/redis/redis/blob/7.0/src/multi.c#L140 void execCommand(client *c) { ... // 判断下是否开启了事务 if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } // 事务中不能 watch 有过期时间的键值 if (isWatchedKeyExpired(c)) { c->flags |= (CLIENT_DIRTY_CAS); } // 检查是否需要中退出事务,有下面两种情况 // 1、 watch 的 key 有变化了 // 2、命令入队的时候,有语法错误 if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) { if (c->flags & CLIENT_DIRTY_EXEC) { addReplyErrorObject(c, shared.execaborterr); } else { addReply(c, shared.nullarray[c->resp]); } // 取消事务 discardTransaction(c); return; } uint64_t old_flags = c->flags; /* we do not want to allow blocking commands inside multi */ // 事务中不允许出现阻塞命令 c->flags |= CLIENT_DENY_BLOCKING; /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ server.in_exec = 1; orig_argv = c->argv; orig_argv_len = c->argv_len; orig_argc = c->argc; orig_cmd = c->cmd; addReplyArrayLen(c,c->mstate.count); // 循环处理执行事务队列中的命令 for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->argv_len = c->mstate.commands[j].argv_len; c->cmd = c->realcmd = c->mstate.commands[j].cmd; // 权限检查 int acl_errpos; int acl_retval = ACLCheckAllPerm(c,&acl_errpos); if (acl_retval != ACL_OK) { ... } else { // 执行命令 if (c->id == CLIENT_ID_AOF) call(c,CMD_CALL_NONE); else call(c,CMD_CALL_FULL); serverAssert((c->flags & CLIENT_BLOCKED) == 0); } // 命令执行后可能会被修改,需要更新操作 c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } // restore old DENY_BLOCKING value if (!(old_flags & CLIENT_DENY_BLOCKING)) c->flags &= ~CLIENT_DENY_BLOCKING; // 恢复原命令 c->argv = orig_argv; c->argv_len = orig_argv_len; c->argc = orig_argc; c->cmd = c->realcmd = orig_cmd; // 清除事务 discardTransaction(c); server.in_exec = 0; }
事务提交的时候,命令的执行逻辑还是比较简单的
1、首先会进行一些检查;
- 检查事务有没有嵌套;
- watch 监听的键值是否有变动;
- 事务中命令入队列的时候,是否有语法错误;
2、循环执行,事务队列中的命令。
通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行。
4、WATCH 监听变量
WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
看下 watch 的键值对是如何和客户端进行映射的
// https://github.com/redis/redis/blob/7.0/src/server.h#L918 typedef struct redisDb { ... dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ ... } redisDb; // https://github.com/redis/redis/blob/7.0/src/server.h#L1083 typedef struct client { ... list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ ... } client; // https://github.com/redis/redis/blob/7.0/src/multi.c#L262 // 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。 // // 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key . typedef struct watchedKey { // 键值 robj *key; // 键值所在的db redisDb *db; // 客户端 client *client; // 正在监听过期key 的标识 unsigned expired:1; /* Flag that we're watching an already expired key. */ } watchedKey;
变量映射关系吐下所示
分析完数据结构,看下 watch 的代码实现
// https://github.com/redis/redis/blob/7.0/src/multi.c#L441 void watchCommand(client *c) { int j; if (c->flags & CLIENT_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } /* No point in watching if the client is already dirty. */ if (c->flags & CLIENT_DIRTY_CAS) { addReply(c,shared.ok); return; } for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok); } // https://github.com/redis/redis/blob/7.0/src/multi.c#L270 /* Watch for the specified key */ void watchForKey(client *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; // 检查是否正在 watch 传入的 key listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; /* Key already watched */ } // 没有监听,添加监听的 key 到 db 中的 watched_keys 中 clients = dictFetchValue(c->db->**watched_keys**,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } // 添加 key 到 client 中的 watched_keys 中 wk = zmalloc(sizeof(*wk)); wk->key = key; wk->client = c; wk->db = c->db; wk->expired = keyIsExpired(c->db, key); incrRefCount(key); listAddNodeTail(c->watched_keys,wk); listAddNodeTail(clients,wk); }
1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;
2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;
3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。
上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS
标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS
是在何时被标记的呢?
// https://github.com/redis/redis/blob/7.0/src/db.c#L535 /*----------------------------------------------------------------------------- * Hooks for key space changes. * * Every time a key in the database is modified the function * signalModifiedKey() is called. * * Every time a DB is flushed the function signalFlushDb() is called. *----------------------------------------------------------------------------*/ // 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。 // 每次DB被刷新时,函数signalFlushDb()被调用。 /* Note that the 'c' argument may be NULL if the key was modified out of * a context of a client. */ // 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS void signalModifiedKey(client *c, redisDb *db, robj *key) { touchWatchedKey(db,key); trackingInvalidateKey(c,key,1); } // https://github.com/redis/redis/blob/7.0/src/multi.c#L348 /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ // 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新 void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln; // 如果 redisDb 中的 watched_keys 为空,直接返回 if (dictSize(db->watched_keys) == 0) return; // 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息 clients = dictFetchValue(db->watched_keys, key); if (!clients) return; /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ // 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态 listRewind(clients,&li); while((ln = listNext(&li))) { watchedKey *wk = listNodeValue(ln); client *c = wk->client; if (wk->expired) { /* The key was already expired when WATCH was called. */ if (db == wk->db && equalStringObjects(key, wk->key) && dictFind(db->dict, key->ptr) == NULL) { /* Already expired key is deleted, so logically no change. Clear * the flag. Deleted keys are not flagged as expired. */ wk->expired = 0; goto skip_client; } break; } c->flags |= CLIENT_DIRTY_CAS; /* As the client is marked as dirty, there is no point in getting here * again in case that key (or others) are modified again (or keep the * memory overhead till EXEC). */ // 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key unwatchAllKeys(c); skip_client: continue; } }
Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS
位,有下面两种情况:
1、每次修改数据库中的一个键值时;
2、每次DB被 flush 时,整个 Redis 的键值被清空;
上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。
几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand
等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。