从源码分析Redis分布式锁的原子性保证(一)

简介: 从源码分析Redis分布式锁的原子性保证

从代码实现看Redis分布式锁的原子性保证

分布式锁是 Redis 在实际业务场景中的一个重要应用。当有多个客户端并发访问某个共享资源时,比如要修改数据库中的某条记录,为了避免记录修改冲突,我们可以让所有客户端从 Redis 上获取分布式锁,只有拿到锁的客户端才能操作共享资源。

那么,对于分布式锁来说,它实现的关键就是要保证加锁和解锁两个操作是原子操作,这样才能保证多客户端访问时锁的正确性。而通过前面的学习,你知道 Redis 能通过事件驱动框架同时捕获多个客户端的可读事件,也就是命令请求。此外,在 Redis 6.0 版本中,多个 IO 线程会被用于并发地读取或写回数据。

而既然如此,你就可以来思考一个问题:分布式锁的原子性还能得到保证吗?

今天这篇文章呢,我就带你来了解下一条命令在 Redis server 中的执行过程,然后结合分布式锁的要求,来带你看下命令执行的原子性是如何保证的。同时,我们再来看看在有 IO 多路复用和多 IO 线程的情况下,分布式锁的原子性是否会受到影响。

这样一来,你就既可以掌握客户端的一条命令是如何完成执行的,其原子性是如何得到保证的,而且还可以把之前学习到的知识点串接应用起来。要知道,了解客户端命令的执行过程,对于日常排查 Redis 问题也是非常有帮助的,你可以在命令执行的过程中加入检测点,以便分析和排查运行问题。

好,那么接下来,我们就先来了解下分布式锁的实现方法,这样就能知道分布式锁对应的实现命令,以便进行进一步分析。

分布式锁的实现方法

这里,我再来简要介绍下分布式锁的加锁和解锁实现的命令。

首先,对于分布式锁的加锁操作来说,我们可以使用 Redis 的 SET 命令。Redis SET 命令提供了 NX 和 EX 选项,这两个选项的含义分别是:

  • NX,表示当操作的 key 不存在时,Redis 会直接创建;当操作的 key 已经存在了,则返回 NULL 值,Redis 对 key 不做任何修改。
  • EX,表示设置 key 的过期时间。

因此,我们可以让客户端发送以下命令来进行加锁。其中,lockKey 是锁的名称,uid 是客户端可以用来唯一标记自己的 ID,expireTime 是这个 key 所代表的锁的过期时间,当这个过期时间到了之后,这个 key 会被删除,相当于锁被释放了,这样就避免了锁一直无法释放的问题。

SET lockKey uid EX expireTime NX

而如果还没有客户端创建过锁,那么,假设客户端 A 发送了这个 SET 命令给 Redis,如下所示:

SET lockKey 1033 EX 30 NX

这样,Redis 就会创建对应的 key 为 lockKey,而键值对的 value 就是这个客户端的 ID 1033。此时,假设有另一个客户端 B 也发送了 SET 命令,如下所示,表示要把 key 为 lockKey的键值对值,改为客户端 B 的 ID 2033,也就是要加锁

SET lockKey 2033 EX 30 NX

由于使用了 NX 选项,如果 lockKey的 key 已经存在了,客户端 B 就无法对其进行修改了,也就无法获得锁了,这样就实现了加锁的效果。

而对于解锁来说,我们可以使用如下的 Lua 脚本来完成,而 Lua 脚本会以 EVAL 命令的形式在 Redis server 中执行。客户端会使用 GET 命令读取锁对应 key 的 value,并判断 value 是否等于客户端自身的 ID。如果等于,就表明当前客户端正拿着锁,此时可以执行 DEL 命令删除 key,也就是释放锁;如果 value 不等于客户端自身 ID,那么该脚本会直接返回。

if redis.call("get",lockKey) == uid then
   return redis.call("del",lockKey)
else
   return 0
end

这样一来,客户端就不会误删除别的客户端获得的锁了,从而保证了锁的安全性。

好,现在我们就了解了分布式锁的实现命令。那么在这里,我们需要搞明白的问题就是:无论是加锁的 SET 命令,还是解锁的 Lua 脚本和 EVAL 命令,在有 IO 多路复用时,会被同时执行吗?或者当我们使用了多 IO 线程后,会被多个线程同时执行吗?

这就和 Redis 中命令的执行过程有关了。下面,我们就来了解下,一条命令在 Redis 是如何完成执行的。同时,我们还会学习到,IO 多路复用引入的多个并发客户端,以及多 IO 线程是否会破坏命令的原子性。

一条命令的处理过程

现在我们知道,Redis server 一旦和一个客户端建立连接后,就会在事件驱动框架中注册可读事件,这就对应了客户端的命令请求。而对于整个命令处理的过程来说,我认为主要可以分成四个阶段,它们分别对应了 Redis 源码中的不同函数。这里,我把它们对应的入口函数,也就是它们是从哪个函数开始进行执行的,罗列如下:

  • 命令读取,对应 readQueryFromClient 函数;
  • 命令解析,对应 processInputBufferAndReplicate 函数;
  • 命令执行,对应 processCommand 函数;
  • 结果返回,对应 addReply 函数;

那么下面,我们就来分别看下这四个入口函数的基本流程,以及为了完成命令执行,它们内部的主要调用关系都是怎样的。

命令读取阶段:readQueryFromClient 函数

首先,我们来了解下 readQueryFromClient 函数的基本流程。

readQueryFromClient 函数会从客户端连接的 socket 中,读取最大为 readlen 长度的数据,readlen 值大小是宏定义 PROTO_IOBUF_LEN。该宏定义是在server.h文件中定义的,默认值为 16KB。

紧接着,readQueryFromClient 函数会根据读取数据的情况,进行一些异常处理,比如数据读取失败或是客户端连接关闭等。此外,如果当前客户端是主从复制中的主节点,readQueryFromClient 函数还会把读取的数据,追加到用于主从节点命令同步的缓冲区中。

最后,readQueryFromClient 函数会调用 processInputBuffer函数,这就进入到了命令处理的下一个阶段,也就是命令解析阶段。

整个函数如下:

void readQueryFromClient(connection *conn) {
    //从连接数据结构中获取客户端
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 判断是否推迟从客户端读取数据
    if (postponeClientRead(c)) return;
    /* Update total number of reads on server */
    // 更新服务器上的读取总数
    atomicIncr(server.stat_total_reads_processed, 1);
    // 从客户端socket中读取的数据长度,默认为16KB
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    // 如果这是一个多批量请求,并且我们正在处理一个足够大的批量回复,请尝试最大化查询缓冲区恰好包含表示对象的 SDS 字符串的概率
    // 即使可能需要更多的 read(2) 调用.这样,函数 processMultiBulkBuffer() 可以避免复制缓冲区来创建表示参数的 Redis 对象
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        /* Note that the 'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 给缓冲区分配空间
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 调用read从描述符为fd的客户端socket中读取数据
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    // 进行连接的情况处理
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            // 客户端连接错误
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        // 关闭客户端连接
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        // 将查询缓冲区附加到主设备的挂起(未应用)缓冲区。稍后我们将使用此缓冲区,以便获得由执行的最后一个命令应用的字符串的副本。
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    // 当前客户端是主从复制中的主节点,还会把读取的数据,追加到用于主从节点命令同步的缓冲区中
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    atomicIncr(server.stat_net_input_bytes, nread);
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClientAsync(c);
        return;
    }
    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
    // 客户端输入缓冲区中有更多数据,继续解析以检查是否有完整的命令要执行
     processInputBuffer(c);
}

命令解析阶段:processInputBuffer 函数

好了,我们刚才了解了,命令解析实际是在 processInputBuffer 函数中执行的,所以下面,我们还需要清楚这个函数的基本流程是什么样的。首先,processInputBuffer 函数会执行一个 while 循环,不断地从客户端的输入缓冲区中读取数据。然后,它会判断读取到的命令格式,是否以“*”开头。

  • 如果命令是以“*”开头,那就表明这个命令是 PROTO_REQ_MULTIBULK 类型的命令请求,也就是符合 RESP 协议(Redis 客户端与服务器端的标准通信协议)的请求。那么,processInputBuffer 函数就会进一步调用 processMultibulkBuffer(在 networking.c 文件中)函数,来解析读取到的命令。
  • 而如果命令不是以“*”开头,那则表明这个命令是 PROTO_REQ_INLINE 类型的命令请求,并不是 RESP 协议请求。这类命令也被称为管道命令,命令和命令之间是使用换行符“\r\n”分隔开来的。比如,我们使用 Telnet 发送给 Redis 的命令,就是属于 PROTO_REQ_INLINE 类型的命令。在这种情况下,processInputBuffer 函数会调用 processInlineBuffer(在 networking.c 文件中)函数,来实际解析命令。

这样,等命令解析完成后,processInputBuffer 函数就会调用 processCommand 函数,开始进入命令处理的第三个阶段,也就是命令执行阶段。下面的代码展示了 processInputBuffer 函数解析命令时的主要流程,你可以看下。

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
// 每次调用这个函数,在客户端structure 'c'中,有更多的查询缓冲区要处理,因为我们从套接字读取了更多的数据,或者因为客户端被阻塞并稍后重新激活,所以可能已经有待处理的查询缓冲区代表一个完整的命令去处理
void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;
        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;
        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
        /* Determine request type when unknown. */
        // 根据客户端输入缓冲区的命令开头字符判断命令类型
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                // 符合RESP协议的命令
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                // 管道类型命令
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        //对于管道类型命令,调用processInlineBuffer函数解析
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
            /* If the Gopher mode and we got zero or one argument, process
             * the request in Gopher mode. To avoid data race, Redis won't
             * support Gopher if enable io threads to read queries. */
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            // 对于RESP协议命令,调用processMultibulkBuffer函数解析
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            // 否则为未知的请求类型
            serverPanic("Unknown request type");
        }
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            // 如果我们在一个 IO 线程的上下文中,我们不能真正执行这里的命令。我们所能做的就是将客户端标记为需要处理命令的客户端。
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            /* We are finally ready to execute the command. */
            // 终于准备好执行命令了
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                // 如果客户端不再有效,我们避免退出此循环并稍后调整客户端缓冲区。所以我们在这种情况下尽快返回
                return;
            }
        }
    }
    /* Trim to pos */
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}

下图展示了 processInputBuffer 函数的基本执行流程,你可以再回顾下。

好,那么下面,我们接着来看第三个阶段,也就是命令执行阶段的 processCommand 函数的基本处理流程。


相关文章
|
8月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
521 2
|
8月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
469 6
|
9月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
7月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
571 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
7月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1486 0
分布式爬虫框架Scrapy-Redis实战指南
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
1534 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
11月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
727 67
|
9月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
319 8
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2651 7