从源码分析Redis分布式锁的原子性保证(一)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 从源码分析Redis分布式锁的原子性保证

从代码实现看Redis分布式锁的原子性保证

分布式锁是 Redis 在实际业务场景中的一个重要应用。当有多个客户端并发访问某个共享资源时,比如要修改数据库中的某条记录,为了避免记录修改冲突,我们可以让所有客户端从 Redis 上获取分布式锁,只有拿到锁的客户端才能操作共享资源。

那么,对于分布式锁来说,它实现的关键就是要保证加锁和解锁两个操作是原子操作,这样才能保证多客户端访问时锁的正确性。而通过前面的学习,你知道 Redis 能通过事件驱动框架同时捕获多个客户端的可读事件,也就是命令请求。此外,在 Redis 6.0 版本中,多个 IO 线程会被用于并发地读取或写回数据。

而既然如此,你就可以来思考一个问题:分布式锁的原子性还能得到保证吗?

今天这篇文章呢,我就带你来了解下一条命令在 Redis server 中的执行过程,然后结合分布式锁的要求,来带你看下命令执行的原子性是如何保证的。同时,我们再来看看在有 IO 多路复用和多 IO 线程的情况下,分布式锁的原子性是否会受到影响。

这样一来,你就既可以掌握客户端的一条命令是如何完成执行的,其原子性是如何得到保证的,而且还可以把之前学习到的知识点串接应用起来。要知道,了解客户端命令的执行过程,对于日常排查 Redis 问题也是非常有帮助的,你可以在命令执行的过程中加入检测点,以便分析和排查运行问题。

好,那么接下来,我们就先来了解下分布式锁的实现方法,这样就能知道分布式锁对应的实现命令,以便进行进一步分析。

分布式锁的实现方法

这里,我再来简要介绍下分布式锁的加锁和解锁实现的命令。

首先,对于分布式锁的加锁操作来说,我们可以使用 Redis 的 SET 命令。Redis SET 命令提供了 NX 和 EX 选项,这两个选项的含义分别是:

  • NX,表示当操作的 key 不存在时,Redis 会直接创建;当操作的 key 已经存在了,则返回 NULL 值,Redis 对 key 不做任何修改。
  • EX,表示设置 key 的过期时间。

因此,我们可以让客户端发送以下命令来进行加锁。其中,lockKey 是锁的名称,uid 是客户端可以用来唯一标记自己的 ID,expireTime 是这个 key 所代表的锁的过期时间,当这个过期时间到了之后,这个 key 会被删除,相当于锁被释放了,这样就避免了锁一直无法释放的问题。

SET lockKey uid EX expireTime NX

而如果还没有客户端创建过锁,那么,假设客户端 A 发送了这个 SET 命令给 Redis,如下所示:

SET lockKey 1033 EX 30 NX

这样,Redis 就会创建对应的 key 为 lockKey,而键值对的 value 就是这个客户端的 ID 1033。此时,假设有另一个客户端 B 也发送了 SET 命令,如下所示,表示要把 key 为 lockKey的键值对值,改为客户端 B 的 ID 2033,也就是要加锁

SET lockKey 2033 EX 30 NX

由于使用了 NX 选项,如果 lockKey的 key 已经存在了,客户端 B 就无法对其进行修改了,也就无法获得锁了,这样就实现了加锁的效果。

而对于解锁来说,我们可以使用如下的 Lua 脚本来完成,而 Lua 脚本会以 EVAL 命令的形式在 Redis server 中执行。客户端会使用 GET 命令读取锁对应 key 的 value,并判断 value 是否等于客户端自身的 ID。如果等于,就表明当前客户端正拿着锁,此时可以执行 DEL 命令删除 key,也就是释放锁;如果 value 不等于客户端自身 ID,那么该脚本会直接返回。

if redis.call("get",lockKey) == uid then
   return redis.call("del",lockKey)
else
   return 0
end

这样一来,客户端就不会误删除别的客户端获得的锁了,从而保证了锁的安全性。

好,现在我们就了解了分布式锁的实现命令。那么在这里,我们需要搞明白的问题就是:无论是加锁的 SET 命令,还是解锁的 Lua 脚本和 EVAL 命令,在有 IO 多路复用时,会被同时执行吗?或者当我们使用了多 IO 线程后,会被多个线程同时执行吗?

这就和 Redis 中命令的执行过程有关了。下面,我们就来了解下,一条命令在 Redis 是如何完成执行的。同时,我们还会学习到,IO 多路复用引入的多个并发客户端,以及多 IO 线程是否会破坏命令的原子性。

一条命令的处理过程

现在我们知道,Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令请求。而对于整个命令处理的过程来说,我认为主要可以分成四个阶段,它们分别对应了 Redis 源码中的不同函数。这里,我把它们对应的入口函数,也就是它们是从哪个函数开始进行执行的,罗列如下:

  • 命令读取,对应 readQueryFromClient 函数;
  • 命令解析,对应 processInputBufferAndReplicate 函数;
  • 命令执行,对应 processCommand 函数;
  • 结果返回,对应 addReply 函数;

那么下面,我们就来分别看下这四个入口函数的基本流程,以及为了完成命令执行,它们内部的主要调用关系都是怎样的。

命令读取阶段:readQueryFromClient 函数

首先,我们来了解下 readQueryFromClient 函数的基本流程。

readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。

最后,readQueryFromClient 函数会调用 processInputBuffer函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。

整个函数如下:

void readQueryFromClient(connection *conn) {
    //从连接数据结构中获取客户端
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 判断是否推迟从客户端读取数据
    if (postponeClientRead(c)) return;
    /* Update total number of reads on server */
    // 更新服务器上的读取总数
    atomicIncr(server.stat_total_reads_processed, 1);
    // 从客户端socket中读取的数据长度,默认为16KB
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    // 如果这是一个多批量请求,并且我们正在处理一个足够大的批量回复,请尝试最大化查询缓冲区恰好包含表示对象的 SDS 字符串的概率
    // 即使可能需要更多的 read(2) 调用.这样,函数 processMultiBulkBuffer() 可以避免复制缓冲区来创建表示参数的 Redis 对象
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 给缓冲区分配空间
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 调用read从描述符为fd的客户端socket中读取数据
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    // 进行连接的情况处理
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            // 客户端连接错误
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        // 关闭客户端连接
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        // 将查询缓冲区附加到主设备的挂起(未应用)缓冲区。稍后我们将使用此缓冲区,以便获得由执行的最后一个命令应用的字符串的副本。
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    // 当前客户端是主从复制中的主节点,还会把读取的数据,追加到用于主从节点命令同步的缓冲区中
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    atomicIncr(server.stat_net_input_bytes, nread);
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }
    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
    // 客户端输入缓冲区中有更多数据,继续解析以检查是否有完整的命令要执行
     processInputBuffer(c);
}

命令解析阶段:processInputBuffer 函数

好了,我们刚才了解了,命令解析实际是在 processInputBuffer 函数中执行的,所以下面,我们还需要清楚这个函数的基本流程是什么样的。首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“*”开头。

  • 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
  • 而如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。

这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommand 函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
// 每次调用这个函数,在客户端structure 'c'中,有更多的查询缓冲区要处理,因为我们从套接字读取了更多的数据,或者因为客户端被阻塞并稍后重新激活,所以可能已经有待处理的查询缓冲区代表一个完整的命令去处理
void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;
        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        /* Determine request type when unknown. */
        // 根据客户端输入缓冲区的命令开头字符判断命令类型
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                // 符合RESP协议的命令
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                // 管道类型命令
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        //对于管道类型命令,调用processInlineBuffer函数解析
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
            /* If the Gopher mode and we got zero or one argument, process
             * the request in Gopher mode. To avoid data race, Redis won't
             * support Gopher if enable io threads to read queries. */
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            // 对于RESP协议命令,调用processMultibulkBuffer函数解析
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            // 否则为未知的请求类型
            serverPanic("Unknown request type");
        }
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            // 如果我们在一个 IO 线程的上下文中,我们不能真正执行这里的命令。我们所能做的就是将客户端标记为需要处理命令的客户端。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            /* We are finally ready to execute the command. */
            // 终于准备好执行命令了
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                // 如果客户端不再有效,我们避免退出此循环并稍后调整客户端缓冲区。所以我们在这种情况下尽快返回
                return;
            }
        }
    }
    /* Trim to pos */
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}

下图展示了 processInputBuffer 函数的基本执行流程,你可以再回顾下。

好,那么下面,我们接着来看第三个阶段,也就是命令执行阶段的 processCommand 函数的基本处理流程。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
5天前
|
NoSQL Java 关系型数据库
【Redis系列笔记】分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
26 2
|
1天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
2天前
|
缓存 NoSQL Java
【亮剑】如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
3天前
|
NoSQL Redis 微服务
分布式锁_redis实现
分布式锁_redis实现
|
2月前
|
NoSQL Java Redis
如何通俗易懂的理解Redis分布式锁
在多线程并发的情况下,我们如何保证一个代码块在同一时间只能由一个线程访问呢?
39 2
|
23天前
|
NoSQL Java Redis
redis分布式锁
redis分布式锁
|
2月前
|
缓存 NoSQL Java
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(一)
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson
59 0
|
5天前
|
存储 NoSQL Java
基于Redis实现分布式锁
基于Redis实现分布式锁
29 0
|
6天前
|
NoSQL Java Redis
Redis入门到通关之分布式锁Rediision
Redis入门到通关之分布式锁Rediision
10 0
|
6天前
|
NoSQL 关系型数据库 MySQL
Redis入门到通关之Redis实现分布式锁
Redis入门到通关之Redis实现分布式锁
11 1