redis-py 源码阅读

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。
  • redis协议规范


  • redis-py概述


  • redis-py基础使用


  • RedisCommand
  • Redis连接
  • 连接池


  • pipeline


  • LuaScript


  • lock


redis协议规范



RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:


+OK\r\n
-Error message\r\n
:1000\r\n
$6\r\nfoobar\r\n
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
*3\r\n:1\r\n:2\r\n:3\r\n


协议定义了5种类型:


  1. +前缀表示字符串,后接字符串文本,以\r\n结尾,通常用于命令结果


  1. -前缀表示异常信息,后接以空格连接的两个字符串,以\r\n结尾


  1. :前缀表示整数,后接整数,以\r\n结尾


  1. $前缀表示定长的字符串,后接字符串长度,\r\n和字符串文本,以\r\n结尾


  1. *前缀表示数组,后接数组的长度和\r\n,数组的每个元素可以由上面4种类型构成


协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist 的请求和响应


C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
S: :48293\r\n


  • 客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。


  • 服务端响应整数48293,即mylist数据的长度。


Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。


Redis-py 源码概述



本文使用的redis-py版本是3.5.3, 文件及包信息是:


名称 描述
client redis的api
connection 连接,连接池等
exceptions 异常和错误
lock 锁的实现
sentinel 扩展的哨兵连接
utils 工具
_compat 都版本适配包


redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。


redis-py基础使用



RedisCommand


redis-py的简单使用:


>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'


追踪redis-py的实现:


# client.py
class Redis(object)
     def __init__(self, host='localhost', port=6379,
                 db=0, ..):
        ...
        connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool
        ...
    def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False)
        ...
        return self.execute_command('SET', *pieces)
    def get(self, name):
        return self.execute_command('GET', name)
     # COMMAND EXECUTION AND PROTOCOL PARSING
    def execute_command(self, *args, **options):
        "Execute a command and return a parsed response"
        conn = self.connection or pool.get_connection(command_name, **options)
        conn.send_command(*args)
        return self.parse_response(conn, command_name, **options)


注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等


  • redis首先创造了一个到redis服务的连接,


  • redis包装了redis的所有指令,使用命令模式执行指令。


  • 执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。


Redis连接


继续查看连接的创建和执行:


# connection.py
class Connection(object)
    def __init__(...):
        self.host = host
        self.port = int(port)
        self._sock = connect()
    def connect():
        for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = socket.socket(family, socktype, proto)
            ...
            # TCP_NODELAY
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            # connect
            sock.connect(socket_address)
            ...
            return sock
    def pack_command(self, *args):
        command = []
        args = tuple(args[0].encode().split()) + args[1:]
        ...
        buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF))
        for arg in imap(self.encoder.encode, args):
            buff = SYM_EMPTY.join(
                        (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF))
            output.append(buff)
            output.append(arg)
            ...
        return command
    def send_command(self, *args, **kwargs):
        command = self.pack_command(args)
        if isinstance(command, str):
                command = [command]
        for item in command:
            self._sock.sendall(*args, **kwargs)


  • connection维持了一个socket连接


  • 收到redis的命令使用pack_command进行RESP的序列化打包


  • 数据包使用socket发送


# connection.py
class Connection(object)
    def __init__(...,parser_class=PythonParser,...);
        self._parser = parser_class(socket_read_size=socket_read_size)
    def read_response(self):
        response = self._parser.read_response()
        return response
class PythonParser(BaseParser):
    "Plain Python parsing class"
    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        ...
        self._sock = connection._sock
        self._buffer = SocketBuffer(self._sock,
                                    self.socket_read_size,
                                    connection.socket_timeout)
        self.encoder = connection.encoder
    def read_response(self):
        raw = self._buffer.readline()
        byte, response = raw[:1], raw[1:]
        # server returned an error
        if byte == b'-':
            response = nativestr(response)
            ...
        # single value
        elif byte == b'+':
            pass
        # int value
        elif byte == b':':
            response = long(response)
        # bulk response
        elif byte == b'$':
            length = int(response)
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == b'*':
            length = int(response)
            response = [self.read_response() for i in xrange(length)]
        if isinstance(response, bytes):
            response = self.encoder.decode(response)
        return response


  • connection创建了一个parser用于读取和解析服务响应


  • 默认的PythonParser使用SocketBuffer读取socket数据


  • read_response实现了RESP协议的解析过程。对于每行数据\r\n,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。


PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器HiredisParser


连接池


redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:


# redis.py
connection_pool = ConnectionPool(**kwargs)
pool.get_connection(command_name, **options)
try:
    conn.send_command(*args)
    ...
finally:
    ...
    pool.release(conn)


连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者


连接池的具体实现:


# connection.py
class ConnectionPool(object):
    def __init__(...):
        self._available_connections = []
        self._in_use_connections = set()
    def make_connection(self):
        "Create a new connection"
        return self.connection_class(**self.connection_kwargs)
    def get_connection(self, command_name, *keys, **options)
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        ...
        connection.connect()
        return connection
    def release(self, connection):
        "Releases the connection back to the pool"
        try:
            self._in_use_connections.remove(connection)
        except KeyError:
            # Gracefully fail when a connection is returned to this pool
            # that the pool doesn't actually own
            pass
        self._available_connections.append(connection)


  • 连接池内部使用可用连接数组和正在使用连接集合管理所有连接


  • 获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接


  • 所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接


  • 连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用


到这里,我们基本理顺了一个redis指令执行的流程:


r = redis.Redis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')


pipeline



redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:


>>> r = redis.Redis(...)
>>> pipe = r.pipeline()
>>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute()
[True, True, 6]


pipeline的继承自redis,做了一些扩展


class Pipeline(Redis)
    def __init__(...):
        self.command_stack = []
    def execute_command(self, *args, **kwargs):
        self.command_stack.append((args, options))
        return self
    def execute(self, raise_on_error=True):
        "Execute all the commands in the current pipeline"
        stack = self.command_stack
        execute = self._execute_pipeline
        execute(conn, stack, raise_on_error)
    def _execute_pipeline(self, connection, commands, raise_on_error):
        # build up all commands into a single request to increase network perf
        all_cmds = connection.pack_commands([args for args, _ in commands])
        connection.send_packed_command(all_cmds)
        response = []
        for args, options in commands:
            response.append(
                self.parse_response(connection, args[0], **options))
        return response


  • pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法


  • execute时候才正式发送指


  • 发送指令后再依次获取服务响应,打包称一个数组统一返回


LuaScript



redis使用lua脚本来处理事务,使用方法如下:


>>> r = redis.Redis()
>>> lua = """
... local value = redis.call('GET', KEYS[1])
... value = tonumber(value)
... return value * ARGV[1]"""
>>> multiply = r.register_script(lua)
>>> r.set('foo', 2)
>>> multiply(keys=['foo'], args=[5])
10


  • lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数


  • 脚本需要进行注册


  • redis-py中把参数传递给脚本并执行得到结果


脚本的实现原理:


# client.py
class Redis(object):
    def register_script(self, script):
        return Script(self, script
    def script_load(self, script):
        "Load a Lua ``script`` into the script cache. Returns the SHA."
        return self.execute_command('SCRIPT LOAD', script)
    def evalsha(self, sha, numkeys, *keys_and_args):
        return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
class Script(object):
    "An executable Lua script object returned by ``register_script``"
    def __init__(self, registered_client, script):
        self.registered_client = registered_client
        self.script = script
        # Precalculate and store the SHA1 hex digest of the script.
        ...
        self.sha = hashlib.sha1(script).hexdigest()
    def __call__(self, keys=[], args=[], client=None):
        "Execute the script, passing any required ``args``"
        args = tuple(keys) + tuple(args)
        # make sure the Redis server knows about the script
        ...
        try:
            return client.evalsha(self.sha, len(keys), *args)
        except NoScriptError:
            # Maybe the client is pointed to a differnet server than the client
            # that created this instance?
            # Overwrite the sha just in case there was a discrepancy.
            self.sha = client.script_load(self.script)
            return client.evalsha(self.sha, len(keys), *args


  • lua脚本通过 script load 加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本


  • 通过 evalsha 执行脚本


lock



redis-py还提供了一个全局锁的实现, 可以跨进程同步:


try:
    with r.lock('my-lock-key', blocking_timeout=5) as lock:
        # code you want executed only after the lock has been acquired
except LockError:
    # the lock wasn't acquired


下面是其实实现:


# lock.py
class Lock(object):
    LUA_RELEASE_SCRIPT = """
        local token = redis.call('get', KEYS[1])
        if not token or token ~= ARGV[1] then
            return 0
        end
        redis.call('del', KEYS[1])
        return 1
    ""
    def __init__(...):
        ...
        self.redis = redis
        self.name = name
        self.local = threading.local() if self.thread_local else dummy()
        self.local.token = None
        cls = self.__class__
        cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
    def __enter__(self):
        # force blocking, as otherwise the user would have to check whether
        # the lock was actually acquired or not.
        if self.acquire(blocking=True):
            return self
        raise LockError("Unable to acquire lock within the time specified")
    def __exit__(self, exc_type, exc_value, traceback):
        self.release()
    def acquire(self, blocking=None, blocking_timeout=None, token=None):
        ...
        token = uuid.uuid1().hex.encode()
        self.redis.set(self.name, token, nx=True, px=timeout)
        ...
        self.local.token = token
        ...
    def release(self):
        expected_token = self.local.token
        self.local.token = None
        self.lua_release(keys=[self.name],
                                     args=[expected_token],
                                     client=self.redis)


  • LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务


  • lock使用线程变量来存储token值,保证多线程并发可以正常


  • __enter__和__exit__是装饰器语法,保证可以合法的获取和释放


  • 申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。


  • 释放的时候清理线程本地变量和redis服务中的变量


TODO



源码中的 publish/subscibeMonitor ,Sentinel 和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。


参考链接






原文链接



game404.github.io/post/python…

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
存储 NoSQL 算法
Redis系列-13.Redis经典五大类型源码及底层实现(一)(上)
Redis系列-13.Redis经典五大类型源码及底层实现(一)
75 0
|
3月前
|
存储 NoSQL Redis
redis源码学习
redis源码学习
|
28天前
|
存储 NoSQL 算法
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)(二)
【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(字典)
45 0
|
3月前
|
NoSQL 网络协议 Linux
Redis单线程源码深入解析
Redis单线程源码深入解析
|
3月前
|
NoSQL Linux Redis
redis源码调试---vscode使用技巧-----C语言跳转到函数定义
redis源码调试---vscode使用技巧-----C语言跳转到函数定义
70 0
|
8天前
|
人工智能 前端开发 Java
Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码
智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。
148 10
|
1月前
|
存储 NoSQL 网络协议
读懂Redis源码,我总结了这7点心得
读懂Redis源码,我总结了这7点心得
|
3月前
|
缓存 NoSQL 关系型数据库
Redis 7.0 源码调试环境搭建与源码导读技巧
Redis 7.0 源码调试环境搭建与源码导读技巧
54 0
|
3月前
|
NoSQL 算法 Redis
redis7.0源码阅读(五):跳表(skiplist)
redis7.0源码阅读(五):跳表(skiplist)
62 1
|
3月前
|
负载均衡 NoSQL Java
redis7.0源码阅读(四):Redis中的IO多线程(线程池)
redis7.0源码阅读(四):Redis中的IO多线程(线程池)
77 0

热门文章

最新文章