- redis协议规范
- redis-py概述
- redis-py基础使用
- RedisCommand
- Redis连接
- 连接池
- pipeline
- LuaScript
- lock
redis协议规范
RESP(Redis Serialization Protocol)是Redis客户端和服务端的通讯协议。数据示例如下:
+OK\r\n -Error message\r\n :1000\r\n $6\r\nfoobar\r\n *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n *3\r\n:1\r\n:2\r\n:3\r\n
协议定义了5种类型:
+
前缀表示字符串,后接字符串文本,以\r\n
结尾,通常用于命令结果
-
前缀表示异常信息,后接以空格连接的两个字符串,以\r\n
结尾
:
前缀表示整数,后接整数,以\r\n
结尾
$
前缀表示定长的字符串,后接字符串长度,\r\n
和字符串文本,以\r\n
结尾
*
前缀表示数组,后接数组的长度和\r\n
,数组的每个元素可以由上面4种类型构成
协议还约定了Null等的实现,详情请看参考链接部分。下面示例了 LLEN mylist
的请求和响应
C: *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n S: :48293\r\n
- 客户端发送了 LLEN mylist指令,指令序列化成RESP长度为2的数组,2个定长字符串分别是llen和mylist。
- 服务端响应整数48293,即mylist数据的长度。
Request-Response model是redis服务的请求响应模型,可以对比http协议的模式。redis服务端响应客户端的指令,处理后响应回复客户端,可以简单理解为一问一答。当然pipeline,pub/sub和monitor除外。
Redis-py 源码概述
本文使用的redis-py版本是3.5.3
, 文件及包信息是:
名称 | 描述 |
client | redis的api |
connection | 连接,连接池等 |
exceptions | 异常和错误 |
lock | 锁的实现 |
sentinel | 扩展的哨兵连接 |
utils | 工具 |
_compat | 都版本适配包 |
redis-py未依赖其它的包,代码量虽然不多,6000行左右,但是100%理解还是需要一定的时间和基础。本文从redis-py日常使用出发,也是redis-py的README中内容,介绍这些基础功能在源码中的实现。
redis-py基础使用
RedisCommand
redis-py的简单使用:
>>> import redis >>> r = redis.Redis(host='localhost', port=6379, db=0) >>> r.set('foo', 'bar') True >>> r.get('foo') b'bar'
追踪redis-py的实现:
# client.py class Redis(object) def __init__(self, host='localhost', port=6379, db=0, ..): ... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool ... def set(self, name, value, ex=None, px=None, nx=False, xx=False, keepttl=False) ... return self.execute_command('SET', *pieces) def get(self, name): return self.execute_command('GET', name) # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" conn = self.connection or pool.get_connection(command_name, **options) conn.send_command(*args) return self.parse_response(conn, command_name, **options)
注意:为了便于理解,示例代码和实际的代码有出入,省去了复杂的逻辑和异常等
- redis首先创造了一个到redis服务的连接,
- redis包装了redis的所有指令,使用命令模式执行指令。
- 执行命令就是使用创建的连接发送指令,然后解析和获取响应。这和redis协议上的Request-Response model行为一致。
Redis连接
继续查看连接的创建和执行:
# connection.py class Connection(object) def __init__(...): self.host = host self.port = int(port) self._sock = connect() def connect(): for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = socket.socket(family, socktype, proto) ... # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # connect sock.connect(socket_address) ... return sock def pack_command(self, *args): command = [] args = tuple(args[0].encode().split()) + args[1:] ... buff = SYM_EMPTY.join((SYM_STAR, str(len(args)).encode(), SYM_CRLF)) for arg in imap(self.encoder.encode, args): buff = SYM_EMPTY.join( (buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)) output.append(buff) output.append(arg) ... return command def send_command(self, *args, **kwargs): command = self.pack_command(args) if isinstance(command, str): command = [command] for item in command: self._sock.sendall(*args, **kwargs)
- connection维持了一个socket连接
- 收到redis的命令使用pack_command进行RESP的序列化打包
- 数据包使用socket发送
# connection.py class Connection(object) def __init__(...,parser_class=PythonParser,...); self._parser = parser_class(socket_read_size=socket_read_size) def read_response(self): response = self._parser.read_response() return response class PythonParser(BaseParser): "Plain Python parsing class" def __init__(self, socket_read_size): self.socket_read_size = socket_read_size ... self._sock = connection._sock self._buffer = SocketBuffer(self._sock, self.socket_read_size, connection.socket_timeout) self.encoder = connection.encoder def read_response(self): raw = self._buffer.readline() byte, response = raw[:1], raw[1:] # server returned an error if byte == b'-': response = nativestr(response) ... # single value elif byte == b'+': pass # int value elif byte == b':': response = long(response) # bulk response elif byte == b'$': length = int(response) response = self._buffer.read(length) # multi-bulk response elif byte == b'*': length = int(response) response = [self.read_response() for i in xrange(length)] if isinstance(response, bytes): response = self.encoder.decode(response) return response
- connection创建了一个parser用于读取和解析服务响应
- 默认的PythonParser使用SocketBuffer读取socket数据
- read_response实现了RESP协议的解析过程。对于每行数据
\r\n
,第一个字符是响应类型,剩下的数据内容,如果是multi-bulk还需要循环读取多行。建议对比协议和发送请求进行详细阅读理解。
PythonParser是pure-python的实现,如果希望更高效,可以额外安装hiredis,会提供一个基于c的解析器HiredisParser
。
连接池
redis-py使用连接池来提高执行效率,主要的使用方法3个步骤,创建连接池,从连接池中获取有效连接执行命令,完成后释放连接,语句如下:
# redis.py connection_pool = ConnectionPool(**kwargs) pool.get_connection(command_name, **options) try: conn.send_command(*args) ... finally: ... pool.release(conn)
连接池一定要注意释放,可以用try/finally,也可以使用上下文装饰器,这里使用了前者
连接池的具体实现:
# connection.py class ConnectionPool(object): def __init__(...): self._available_connections = [] self._in_use_connections = set() def make_connection(self): "Create a new connection" return self.connection_class(**self.connection_kwargs) def get_connection(self, command_name, *keys, **options) try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) ... connection.connect() return connection def release(self, connection): "Releases the connection back to the pool" try: self._in_use_connections.remove(connection) except KeyError: # Gracefully fail when a connection is returned to this pool # that the pool doesn't actually own pass self._available_connections.append(connection)
- 连接池内部使用可用连接数组和正在使用连接集合管理所有连接
- 获取连接时候,优先从可用连接数组获取;没有可用连接会创建新的连接
- 所有获取到的连接会加入正在使用连接, 如果当前连接未连接会先建立连接
- 连接释放时会从正在使用连接集合中移除,然后加入可用连接数组数组,等待复用
到这里,我们基本理顺了一个redis指令执行的流程:
r = redis.Redis(host='localhost', port=6379, db=0) r.set('foo', 'bar')
pipeline
redis还支持pipeline管线模式,可以批量发送一些命令,然后获取所有的结果:
>>> r = redis.Redis(...) >>> pipe = r.pipeline() >>> pipe.set('foo', 'bar').sadd('faz', 'baz').incr('auto_number').execute() [True, True, 6]
pipeline的继承自redis,做了一些扩展
class Pipeline(Redis) def __init__(...): self.command_stack = [] def execute_command(self, *args, **kwargs): self.command_stack.append((args, options)) return self def execute(self, raise_on_error=True): "Execute all the commands in the current pipeline" stack = self.command_stack execute = self._execute_pipeline execute(conn, stack, raise_on_error) def _execute_pipeline(self, connection, commands, raise_on_error): # build up all commands into a single request to increase network perf all_cmds = connection.pack_commands([args for args, _ in commands]) connection.send_packed_command(all_cmds) response = [] for args, options in commands: response.append( self.parse_response(connection, args[0], **options)) return response
- pipeline使用一个stack来临时存储批量发送的命令,同时返回自身,这样可以支持链式语法
- execute时候才正式发送指
- 发送指令后再依次获取服务响应,打包称一个数组统一返回
LuaScript
redis使用lua脚本来处理事务,使用方法如下:
>>> r = redis.Redis() >>> lua = """ ... local value = redis.call('GET', KEYS[1]) ... value = tonumber(value) ... return value * ARGV[1]""" >>> multiply = r.register_script(lua) >>> r.set('foo', 2) >>> multiply(keys=['foo'], args=[5]) 10
- lua脚本中定义了KEYS和ARGV两个数组用于接受参数,KEY的第一个值(lua数组从1开始)是key的名称,ARGV的第一个值是倍数
- 脚本需要进行注册
- redis-py中把参数传递给脚本并执行得到结果
脚本的实现原理:
# client.py class Redis(object): def register_script(self, script): return Script(self, script def script_load(self, script): "Load a Lua ``script`` into the script cache. Returns the SHA." return self.execute_command('SCRIPT LOAD', script) def evalsha(self, sha, numkeys, *keys_and_args): return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args) class Script(object): "An executable Lua script object returned by ``register_script``" def __init__(self, registered_client, script): self.registered_client = registered_client self.script = script # Precalculate and store the SHA1 hex digest of the script. ... self.sha = hashlib.sha1(script).hexdigest() def __call__(self, keys=[], args=[], client=None): "Execute the script, passing any required ``args``" args = tuple(keys) + tuple(args) # make sure the Redis server knows about the script ... try: return client.evalsha(self.sha, len(keys), *args) except NoScriptError: # Maybe the client is pointed to a differnet server than the client # that created this instance? # Overwrite the sha just in case there was a discrepancy. self.sha = client.script_load(self.script) return client.evalsha(self.sha, len(keys), *args
- lua脚本通过
script load
加载到redis服务,并获得一个sha值,sha值可以重用,避免多次加载同一脚本
- 通过
evalsha
执行脚本
lock
redis-py还提供了一个全局锁的实现, 可以跨进程同步:
try: with r.lock('my-lock-key', blocking_timeout=5) as lock: # code you want executed only after the lock has been acquired except LockError: # the lock wasn't acquired
下面是其实实现:
# lock.py class Lock(object): LUA_RELEASE_SCRIPT = """ local token = redis.call('get', KEYS[1]) if not token or token ~= ARGV[1] then return 0 end redis.call('del', KEYS[1]) return 1 "" def __init__(...): ... self.redis = redis self.name = name self.local = threading.local() if self.thread_local else dummy() self.local.token = None cls = self.__class__ cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT) def __enter__(self): # force blocking, as otherwise the user would have to check whether # the lock was actually acquired or not. if self.acquire(blocking=True): return self raise LockError("Unable to acquire lock within the time specified") def __exit__(self, exc_type, exc_value, traceback): self.release() def acquire(self, blocking=None, blocking_timeout=None, token=None): ... token = uuid.uuid1().hex.encode() self.redis.set(self.name, token, nx=True, px=timeout) ... self.local.token = token ... def release(self): expected_token = self.local.token self.local.token = None self.lua_release(keys=[self.name], args=[expected_token], client=self.redis)
- LUA_RELEASE_SCRIPT使用lua脚本来处理删除token的事务
- lock使用线程变量来存储token值,保证多线程并发可以正常
- __enter__和__exit__是装饰器语法,保证可以合法的获取和释放
- 申请锁的时候获取一个临时的token,然后设置到redis服务中,这个token是有生命周期的,可以超时自动释放。
- 释放的时候清理线程本地变量和redis服务中的变量
TODO
源码中的 publish/subscibe
, Monitor
,Sentinel
和事务等内容,个人认为并不在主线任务上,留待后续再行介绍。