Rb(redis blaster),一个为 redis 实现 non-replicated 分片的 python 库

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
全局流量管理 GTM,标准版 1个月
简介: Rb(redis blaster),一个为 redis 实现 non-replicated 分片的 python 库

Rb,redis blaster,是一个为 redis 实现非复制分片(non-replicated sharding)的库。它在 python redis 之上实现了一个自定义路由系统,允许您自动定位不同的服务器,而无需手动将请求路由到各个节点。

它没有实现 redis 的所有功能,也没有尝试这样做。您可以随时将客户端连接到特定主机,但大多数情况下假设您的操作仅限于可以自动路由到不同节点的基本 key/value 操作。


你可以做什么:

  • 自动针对主机进行单 key 操作
  • 对所有或部分节点执行命令
  • 并行执行所有这些


   安装


rbPyPI 上可用,可以从那里安装:


$ pip install rb


   配置


开始使用 rb 非常简单。如果您之前一直在使用 py-redis,您会感到宾至如归。主要区别在于,不是连接到单个主机,而是将 cluster 配置为连接到多个:


from rb import Cluster
cluster = Cluster(hosts={
    0: {'port': 6379},
    1: {'port': 6380},
    2: {'port': 6381},
    3: {'port': 6382},
    4: {'port': 6379},
    5: {'port': 6380},
    6: {'port': 6381},
    7: {'port': 6382},
}, host_defaults={
    'host': '127.0.0.1',
})


在这种情况下,我们在同一主机上的四个不同服务器进程上设置了 8 个节点。hosts 参数是要连接的主机的映射。字典的 keyhost ID(整数),值是参数字典。


host_defaults 是为所有主机填写的可选默认值字典。如果您想共享一些重复的常见默认值(在这种情况下,所有主机都连接到 localhost),这很有用。


在默认配置中,PartitionRouter 用于路由。


   路由


现在集群已经构建好了,我们可以使用 Cluster.get_routing_client() 来获取一个 redis 客户端,它会为每个命令自动路由到正确的 redis 节点:


client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
    results[key] = client.get(key)


该客户端的工作原理与标准的 pyredis StrictClient 非常相似,主要区别在于它只能执行只涉及一个 key 的命令。


然而,这个基本操作是串联运行的。使 rb 有用的是它可以自动构建 redis 管道并将查询并行发送到许多主机。但是,这会稍微改变用法,因为现在该值无法立即使用:


results = {}
with cluster.map() as client:
    for key in keys_to_look_up:
        results[key] = client.get(key)


虽然到目前为止看起来很相似,但不是将实际值存储在 result 字典中,而是存储 Promise 对象。当 map context manager 结束时,它们保证已经被执行,您可以访问 Promise.value 属性来获取值:


for key, promise in results.iteritems():
    print '%s: %s' % (key, promise.value)


如果要向所有参与的主机发送命令(例如删除数据库),可以使用 Cluster.all() 方法:


with cluster.all() as client:
    client.flushdb()


如果你这样做,promise 值是一个字典,其中 host ID 作为 key,结果作为 value。举个例子:


with cluster.all() as client:
    results = client.info()
for host_id, info in results.iteritems():
    print 'host %s is running %s' % (host_id, info['os'])


要明确针对某些主机,您可以使用 Cluster.fanout() 接受要将命令发送到 host ID 列表。


   API


这是公共 API 的完整参考。请注意,此库扩展了 Python redis 库,因此其中一些类具有更多功能,您需要查阅 py-redis 库。


Cluster


class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

clusterrb 背后的核心对象。它保存到各个节点的连接池,并且可以在应用程序运行期间在中央位置共享。


具有默认 router 的四个 redis 实例上的集群的基本示例:


cluster = Cluster(hosts={
    0: {'port': 6379},
    1: {'port': 6380},
    2: {'port': 6381},
    3: {'port': 6382},
}, host_defaults={
    'host': '127.0.0.1',
})


hosts 是一个主机字典,它将 host ID 数量映射到配置参数。参数对应于 add_host() 函数的签名。这些参数的默认值是从 host_defaults 中提取的。要覆盖 pool 类,可以使用 pool_clspool_options 参数。这同样适用于 routerrouter_clsrouter_optionspool 选项对于设置 socket 超时和类似参数很有用。


  • add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)
  • 将新主机添加到集群。这仅对单元测试真正有用,因为通常主机是通过构造函数添加的,并且在第一次使用集群后进行更改不太可能有意义。
  • all(timeout=None, max_concurrency=64, auto_batch=True)
  • 扇出到所有主机。其他方面与 fanout() 完全一样。
  • 例子:


with cluster.all() as client:
client.flushdb()


  • disconnect_pools()
  • 断开与内部池的所有连接。
  • execute_commands(mapping, *args, **kwargs)
  • 同时在 Redis 集群上执行与路由 key 关联的一系列命令,返回一个新映射,其中值是与同一位置的命令对应的结果列表。例如:


>>> cluster.execute_commands({
...   'foo': [
...     ('PING',),
...     ('TIME',),
...   ],
...   'bar': [
...     ('CLIENT', 'GETNAME'),
...   ],
... })
{'bar': [<Promise None>],
 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}


  • 作为 redis.client.Script 实例的命令将首先检查它们在目标节点上的存在,然后在执行之前加载到目标上,并且可以与其他命令交错:


>>> from redis.client import Script
>>> TestScript = Script(None, 'return {KEYS, ARGV}')
>>> cluster.execute_commands({
...   'foo': [
...     (TestScript, ('key:1', 'key:2'), range(0, 3)),
...   ],
...   'bar': [
...     (TestScript, ('key:3', 'key:4'), range(3, 6)),
...   ],
... })
{'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}


  • 在内部,FanoutClient用于发出命令。
  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)
  • 用于获取路由客户端、开始扇出操作并 join 结果的快捷上下文管理器。
  • 在上下文管理器中,可用的客户端是 FanoutClient。示例用法:


with cluster.fanout(hosts='all') as client:
    client.flushdb()


  • get_local_client(host_id)
  • 返回特定主机 ID 的本地化 client。这个 client 就像一个普通的 Python redis 客户端一样工作,并立即返回结果。
  • get_local_client_for_key(key)
  • 类似于 get_local_client_for_key() 但根据 router 所说的 key 目的地返回 client
  • get_pool_for_host(host_id)
  • 返回给定主机的连接池。
  • redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。
  • get_router()
  • 返回 clusterrouter 。如果 cluster 重新配置,router 将被重新创建。通常,您不需要自己与 router 交互,因为集群的路由客户端会自动执行此操作。
  • 这将返回 BaseRouter 的一个实例。
  • get_routing_client(auto_batch=True)
  • 返回一个路由客户端。该客户端能够自动将请求路由到各个主机。它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。
  • 路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。例如,路由到同一节点的多个 GET 命令最终可以合并为一个 MGET 命令。可以通过将 auto_batch 设置为 False 来禁用此行为。这对于调试很有用,因为 MONITOR 将更准确地反映代码中发出的命令。
  • 有关详细信息,请参阅 RoutingClient
  • map(timeout=None, max_concurrency=64, auto_batch=True)
  • 用于获取路由客户端、开始映射操作并 join 结果的快捷上下文管理器。max_concurrency 定义在隐式连接发生之前可以存在多少未完成的并行查询。
  • 在上下文管理器中,可用的客户端是 MappingClient。示例用法:


results = {}
with cluster.map() as client:
    for key in keys_to_fetch:
        results[key] = client.get(key)
for key, promise in results.iteritems():
    print '%s => %s' % (key, promise.value)


  • remove_host(host_id)
  • client 中删除 host。这仅对单元测试真正有用。


Clients


class rb.RoutingClient(cluster, auto_batch=True)

可以路由到单个目标的客户端。


有关参数,请参见 Cluster.get_routing_client()

  • execute_command(*args, **options)
  • 执行命令并返回解析后的响应
  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)
  • 返回映射操作的 context manager,该操作扇出到手动指定的主机,而不是使用路由系统。例如,这可用于清空所有主机上的数据库。context manager 返回一个 FanoutClient。示例用法:


with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
    results = client.info()
for host_id, info in results.value.iteritems():
    print '%s -> %s' % (host_id, info['is'])


  • 返回的 promise 将所有结果累积到由 host_id 键入的字典中。
  • hosts 参数是一个 host_id 列表,或者是字符串 'all' ,用于将命令发送到所有主机。
  • fanout API 需要非常小心地使用,因为当 key 被写入不期望它们的主机时,它可能会造成很多损坏。
  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)
  • 返回线程不安全的扇出客户端。
  • 返回 FanoutClient 的实例。
  • get_mapping_client(max_concurrency=64, auto_batch=None)
  • 返回一个线程不安全的映射客户端。此客户端的工作方式类似于 redis 管道并返回最终结果对象。它需要 join 才能正常工作。您应该使用自动 joinmap() 上下文管理器,而不是直接使用它。
  • 返回 MappingClient 的一个实例。
  • map(timeout=None, max_concurrency=64, auto_batch=None)
  • 返回映射操作的 context manager。这会并行运行多个查询,然后最后 join 以收集所有结果。
  • 在上下文管理器中,可用的客户端是 MappingClient。示例用法:


results = {}
with cluster.map() as client:
    for key in keys_to_fetch:
        results[key] = client.get(key)
for key, promise in results.iteritems():
    print '%s => %s' % (key, promise.value)


class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

路由客户端使用 clusterrouter 根据执行的 redis 命令的 key 自动定位单个节点。


有关参数,请参见 Cluster.map()


  • cancel()
  • 取消所有未完成的请求。
  • execute_command(*args, **options)
  • 执行命令并返回解析后的响应
  • join(timeout=None)
  • 等待所有未完成的响应返回或超时
  • mget(keys, *args)
  • 返回与 key 顺序相同的值列表
  • mset(*args, **kwargs)
  • 根据映射设置 key/value。映射是 key/value 对的字典。keyvalue 都应该是可以通过 str() 转换为 string 的字符串或类型。

class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

这与 MappingClient 的工作方式相似,但它不是使用 router 来定位主机,而是将命令发送到所有手动指定的主机。


结果累积在由 host_id 键入的字典中。

有关参数,请参见 Cluster.fanout()

  • execute_command(*args, **options)
  • 执行命令并返回解析后的响应
  • target(hosts)
  • 为一次调用临时重新定位 client。当必须为一次调用处理主机 subset 时,这很有用。
  • target_key(key)
  • 临时重新定位客户端以进行一次调用,以专门路由到给定 key 路由到的一台主机。在这种情况下,promise 的结果只是一个主机的值而不是字典。
  • 1.3 版中的新功能。


Promise


class rb.Promise

一个尝试为 Promise 对象镜像 ES6 APIPromise 对象。与 ES6Promise 不同,这个 Promise 也直接提供对底层值的访问,并且它有一些稍微不同的静态方法名称,因为这个 Promise 可以在外部解析。


  • static all(iterable_or_dict)
  • 当所有传递的 promise 都解决时,promise 就解决了。你可以传递一个 promise 列表或一个 promise 字典。
  • done(on_success=None, on_failure=None)
  • 将一些回调附加到 Promise 并返回 Promise
  • is_pending
  • 如果 promise 仍然等待,则为 True,否则为 False
  • is_rejected
  • 如果 promise 被拒绝,则为 True,否则为 False
  • is_resolved
  • 如果 promise 已解决,则为 True,否则为 False
  • reason
  • 如果它被拒绝,这个 promise 的原因。
  • reject(reason)
  • 以给定的理由拒绝 promise
  • static rejected(reason)
  • 创建一个以特定值被拒绝的 promise 对象。
  • resolve(value)
  • 用给定的值解决 promise
  • static resolved(value)
  • 创建一个以特定值解析的 promise 对象。
  • then(success=None, failure=None)
  • Promise 添加成功和/或失败回调的实用方法,该方法还将在此过程中返回另一个 Promise
  • value
  • 如果它被解决,这个 promise 所持有的值。


Routers


class rb.BaseRouter(cluster)

所有路由的基类。如果你想实现一个自定义路由,这就是你的子类。


  • cluster
  • 引用回此 router 所属的 Cluster
  • get_host_for_command(command, args)
  • 返回应执行此命令的主机。
  • get_host_for_key(key)
  • 执行路由并返回目标的 host_id
  • 子类需要实现这一点。
  • get_key(command, args)
  • 返回命令操作的 key

class rb.ConsistentHashingRouter(cluster)

基于一致哈希算法返回 host_idrouter。一致的哈希算法仅在提供 key 参数时才有效。


router 要求主机是无间隙的,这意味着 N 台主机的 ID 范围从 0N-1

  • get_host_for_key(key)
  • 执行路由并返回目标的 host_id
  • 子类需要实现这一点。

class rb.PartitionRouter(cluster)

一个简单的 router,仅根据简单的 crc32 % node_count 设置将命令单独路由到单个节点。

router 要求主机是无间隙的,这意味着 N 台主机的 ID 范围从 0N-1

  • get_host_for_key(key)
  • 执行路由并返回目标的 host_id
  • 子类需要实现这一点。

exception rb.UnroutableCommand

如果发出的命令无法通过 router 路由到单个主机,则引发。


Testing

class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

测试设置是生成多个 redis 服务器进行测试并自动关闭它们的便捷方式。这可以用作 context manager 来自动终止客户端。


  • rb.testing.make_test_cluster(*args, **kwargs)
  • 用于创建测试设置然后从中创建 cluster 的便捷快捷方式。这必须用作 context manager


from rb.testing import make_test_cluster
with make_test_cluster() as cluster:
    ...
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
12天前
|
调度 开发者 Python
Python中的异步编程:理解asyncio库
在Python的世界里,异步编程是一种高效处理I/O密集型任务的方法。本文将深入探讨Python的asyncio库,它是实现异步编程的核心。我们将从asyncio的基本概念出发,逐步解析事件循环、协程、任务和期货的概念,并通过实例展示如何使用asyncio来编写异步代码。不同于传统的同步编程,异步编程能够让程序在等待I/O操作完成时释放资源去处理其他任务,从而提高程序的整体效率和响应速度。
|
15天前
|
数据采集 存储 数据挖掘
Python数据分析:Pandas库的高效数据处理技巧
【10月更文挑战第27天】在数据分析领域,Python的Pandas库因其强大的数据处理能力而备受青睐。本文介绍了Pandas在数据导入、清洗、转换、聚合、时间序列分析和数据合并等方面的高效技巧,帮助数据分析师快速处理复杂数据集,提高工作效率。
44 0
|
8天前
|
数据库 Python
异步编程不再难!Python asyncio库实战,让你的代码流畅如丝!
在编程中,随着应用复杂度的提升,对并发和异步处理的需求日益增长。Python的asyncio库通过async和await关键字,简化了异步编程,使其变得流畅高效。本文将通过实战示例,介绍异步编程的基本概念、如何使用asyncio编写异步代码以及处理多个异步任务的方法,帮助你掌握异步编程技巧,提高代码性能。
26 4
|
8天前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
19 2
|
14天前
|
数据采集 JSON 测试技术
Python爬虫神器requests库的使用
在现代编程中,网络请求是必不可少的部分。本文详细介绍 Python 的 requests 库,一个功能强大且易用的 HTTP 请求库。内容涵盖安装、基本功能(如发送 GET 和 POST 请求、设置请求头、处理响应)、高级功能(如会话管理和文件上传)以及实际应用场景。通过本文,你将全面掌握 requests 库的使用方法。🚀🌟
36 7
|
30天前
|
网络协议 数据库连接 Python
python知识点100篇系列(17)-替换requests的python库httpx
【10月更文挑战第4天】Requests 是基于 Python 开发的 HTTP 库,使用简单,功能强大。然而,随着 Python 3.6 的发布,出现了 Requests 的替代品 —— httpx。httpx 继承了 Requests 的所有特性,并增加了对异步请求的支持,支持 HTTP/1.1 和 HTTP/2,能够发送同步和异步请求,适用于 WSGI 和 ASGI 应用。安装使用 httpx 需要 Python 3.6 及以上版本,异步请求则需要 Python 3.8 及以上。httpx 提供了 Client 和 AsyncClient,分别用于优化同步和异步请求的性能。
python知识点100篇系列(17)-替换requests的python库httpx
|
14天前
|
机器学习/深度学习 数据采集 算法
Python机器学习:Scikit-learn库的高效使用技巧
【10月更文挑战第28天】Scikit-learn 是 Python 中最受欢迎的机器学习库之一,以其简洁的 API、丰富的算法和良好的文档支持而受到开发者喜爱。本文介绍了 Scikit-learn 的高效使用技巧,包括数据预处理(如使用 Pipeline 和 ColumnTransformer)、模型选择与评估(如交叉验证和 GridSearchCV)以及模型持久化(如使用 joblib)。通过这些技巧,你可以在机器学习项目中事半功倍。
21 3
|
17天前
|
数据采集 数据可视化 数据处理
如何使用Python实现一个交易策略。主要步骤包括:导入所需库(如`pandas`、`numpy`、`matplotlib`)
本文介绍了如何使用Python实现一个交易策略。主要步骤包括:导入所需库(如`pandas`、`numpy`、`matplotlib`),加载历史数据,计算均线和其他技术指标,实现交易逻辑,记录和可视化交易结果。示例代码展示了如何根据均线交叉和价格条件进行开仓、止损和止盈操作。实际应用时需注意数据质量、交易成本和风险管理。
37 5
|
16天前
|
存储 数据挖掘 数据处理
Python数据分析:Pandas库的高效数据处理技巧
【10月更文挑战第26天】Python 是数据分析领域的热门语言,Pandas 库以其高效的数据处理功能成为数据科学家的利器。本文介绍 Pandas 在数据读取、筛选、分组、转换和合并等方面的高效技巧,并通过示例代码展示其实际应用。
30 2
|
25天前
|
数据可视化 数据挖掘 Python
Seaborn 库创建吸引人的统计图表
【10月更文挑战第11天】本文介绍了如何使用 Seaborn 库创建多种统计图表,包括散点图、箱线图、直方图、线性回归图、热力图等。通过具体示例和代码,展示了 Seaborn 在数据可视化中的强大功能和灵活性,帮助读者更好地理解和应用这一工具。
36 3