从代码实现看Redis分布式锁的原子性保证
分布式锁是 Redis 在实际业务场景中的一个重要应用。当有多个客户端并发访问某个共享资源时,比如要修改数据库中的某条记录,为了避免记录修改冲突,我们可以让所有客户端从 Redis 上获取分布式锁,只有拿到锁的客户端才能操作共享资源。
那么,对于分布式锁来说,它实现的关键就是要保证加锁和解锁两个操作是原子操作,这样才能保证多客户端访问时锁的正确性。而通过前面的学习,你知道 Redis 能通过事件驱动框架同时捕获多个客户端的可读事件,也就是命令请求。此外,在 Redis 6.0 版本中,多个 IO 线程会被用于并发地读取或写回数据。
而既然如此,你就可以来思考一个问题:分布式锁的原子性还能得到保证吗?
今天这篇文章呢,我就带你来了解下一条命令在 Redis server 中的执行过程,然后结合分布式锁的要求,来带你看下命令执行的原子性是如何保证的。同时,我们再来看看在有 IO 多路复用和多 IO 线程的情况下,分布式锁的原子性是否会受到影响。
这样一来,你就既可以掌握客户端的一条命令是如何完成执行的,其原子性是如何得到保证的,而且还可以把之前学习到的知识点串接应用起来。要知道,了解客户端命令的执行过程,对于日常排查 Redis 问题也是非常有帮助的,你可以在命令执行的过程中加入检测点,以便分析和排查运行问题。
好,那么接下来,我们就先来了解下分布式锁的实现方法,这样就能知道分布式锁对应的实现命令,以便进行进一步分析。
分布式锁的实现方法
这里,我再来简要介绍下分布式锁的加锁和解锁实现的命令。
首先,对于分布式锁的加锁操作来说,我们可以使用 Redis 的 SET 命令。Redis SET 命令提供了 NX 和 EX 选项,这两个选项的含义分别是:
- NX,表示当操作的 key 不存在时,Redis 会直接创建;当操作的 key 已经存在了,则返回 NULL 值,Redis 对 key 不做任何修改。
- EX,表示设置 key 的过期时间。
因此,我们可以让客户端发送以下命令来进行加锁。其中,lockKey 是锁的名称,uid 是客户端可以用来唯一标记自己的 ID,expireTime 是这个 key 所代表的锁的过期时间,当这个过期时间到了之后,这个 key 会被删除,相当于锁被释放了,这样就避免了锁一直无法释放的问题。
SET lockKey uid EX expireTime NX
而如果还没有客户端创建过锁,那么,假设客户端 A 发送了这个 SET 命令给 Redis,如下所示:
SET lockKey 1033 EX 30 NX
这样,Redis 就会创建对应的 key 为 lockKey,而键值对的 value 就是这个客户端的 ID 1033。此时,假设有另一个客户端 B 也发送了 SET 命令,如下所示,表示要把 key 为 lockKey的键值对值,改为客户端 B 的 ID 2033,也就是要加锁
SET lockKey 2033 EX 30 NX
由于使用了 NX 选项,如果 lockKey的 key 已经存在了,客户端 B 就无法对其进行修改了,也就无法获得锁了,这样就实现了加锁的效果。
而对于解锁来说,我们可以使用如下的 Lua 脚本来完成,而 Lua 脚本会以 EVAL 命令的形式在 Redis server 中执行。客户端会使用 GET 命令读取锁对应 key 的 value,并判断 value 是否等于客户端自身的 ID。如果等于,就表明当前客户端正拿着锁,此时可以执行 DEL 命令删除 key,也就是释放锁;如果 value 不等于客户端自身 ID,那么该脚本会直接返回。
if redis.call("get",lockKey) == uid then return redis.call("del",lockKey) else return 0 end
这样一来,客户端就不会误删除别的客户端获得的锁了,从而保证了锁的安全性。
好,现在我们就了解了分布式锁的实现命令。那么在这里,我们需要搞明白的问题就是:无论是加锁的 SET 命令,还是解锁的 Lua 脚本和 EVAL 命令,在有 IO 多路复用时,会被同时执行吗?或者当我们使用了多 IO 线程后,会被多个线程同时执行吗?
这就和 Redis 中命令的执行过程有关了。下面,我们就来了解下,一条命令在 Redis 是如何完成执行的。同时,我们还会学习到,IO 多路复用引入的多个并发客户端,以及多 IO 线程是否会破坏命令的原子性。
一条命令的处理过程
现在我们知道,Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令请求。而对于整个命令处理的过程来说,我认为主要可以分成四个阶段,它们分别对应了 Redis 源码中的不同函数。这里,我把它们对应的入口函数,也就是它们是从哪个函数开始进行执行的,罗列如下:
- 命令读取,对应 readQueryFromClient 函数;
- 命令解析,对应 processInputBufferAndReplicate 函数;
- 命令执行,对应 processCommand 函数;
- 结果返回,对应 addReply 函数;
那么下面,我们就来分别看下这四个入口函数的基本流程,以及为了完成命令执行,它们内部的主要调用关系都是怎样的。
命令读取阶段:readQueryFromClient 函数
首先,我们来了解下 readQueryFromClient 函数的基本流程。
readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。
紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。
最后,readQueryFromClient 函数会调用 processInputBuffer函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。
整个函数如下:
void readQueryFromClient(connection *conn) { //从连接数据结构中获取客户端 client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ // 判断是否推迟从客户端读取数据 if (postponeClientRead(c)) return; /* Update total number of reads on server */ // 更新服务器上的读取总数 atomicIncr(server.stat_total_reads_processed, 1); // 从客户端socket中读取的数据长度,默认为16KB readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ // 如果这是一个多批量请求,并且我们正在处理一个足够大的批量回复,请尝试最大化查询缓冲区恰好包含表示对象的 SDS 字符串的概率 // 即使可能需要更多的 read(2) 调用.这样,函数 processMultiBulkBuffer() 可以避免复制缓冲区来创建表示参数的 Redis 对象 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 给缓冲区分配空间 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 调用read从描述符为fd的客户端socket中读取数据 nread = connRead(c->conn, c->querybuf+qblen, readlen); // 进行连接的情况处理 if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { // 客户端连接错误 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { // 关闭客户端连接 serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ // 将查询缓冲区附加到主设备的挂起(未应用)缓冲区。稍后我们将使用此缓冲区,以便获得由执行的最后一个命令应用的字符串的副本。 c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; // 当前客户端是主从复制中的主节点,还会把读取的数据,追加到用于主从节点命令同步的缓冲区中 if (c->flags & CLIENT_MASTER) c->read_reploff += nread; atomicIncr(server.stat_net_input_bytes, nread); if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ // 客户端输入缓冲区中有更多数据,继续解析以检查是否有完整的命令要执行 processInputBuffer(c); }
命令解析阶段:processInputBuffer 函数
好了,我们刚才了解了,命令解析实际是在 processInputBuffer 函数中执行的,所以下面,我们还需要清楚这个函数的基本流程是什么样的。首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“*”开头。
- 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
- 而如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。
这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommand 函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ // 每次调用这个函数,在客户端structure 'c'中,有更多的查询缓冲区要处理,因为我们从套接字读取了更多的数据,或者因为客户端被阻塞并稍后重新激活,所以可能已经有待处理的查询缓冲区代表一个完整的命令去处理 void processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; /* Don't process more buffers from clients that have already pending * commands to execute in c->argv. */ if (c->flags & CLIENT_PENDING_COMMAND) break; /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ if (server.lua_timedout && c->flags & CLIENT_MASTER) break; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; /* Determine request type when unknown. */ // 根据客户端输入缓冲区的命令开头字符判断命令类型 if (!c->reqtype) { if (c->querybuf[c->qb_pos] == '*') { // 符合RESP协议的命令 c->reqtype = PROTO_REQ_MULTIBULK; } else { // 管道类型命令 c->reqtype = PROTO_REQ_INLINE; } } //对于管道类型命令,调用processInlineBuffer函数解析 if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; /* If the Gopher mode and we got zero or one argument, process * the request in Gopher mode. To avoid data race, Redis won't * support Gopher if enable io threads to read queries. */ if (server.gopher_enabled && !server.io_threads_do_reads && ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') || c->argc == 0)) { processGopherRequest(c); resetClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; break; } } else if (c->reqtype == PROTO_REQ_MULTIBULK) { // 对于RESP协议命令,调用processMultibulkBuffer函数解析 if (processMultibulkBuffer(c) != C_OK) break; } else { // 否则为未知的请求类型 serverPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ // 如果我们在一个 IO 线程的上下文中,我们不能真正执行这里的命令。我们所能做的就是将客户端标记为需要处理命令的客户端。 if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* We are finally ready to execute the command. */ // 终于准备好执行命令了 if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ // 如果客户端不再有效,我们避免退出此循环并稍后调整客户端缓冲区。所以我们在这种情况下尽快返回 return; } } } /* Trim to pos */ if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } }
下图展示了 processInputBuffer 函数的基本执行流程,你可以再回顾下。
好,那么下面,我们接着来看第三个阶段,也就是命令执行阶段的 processCommand 函数的基本处理流程。