一文打尽python-web开发的signal机制

简介: signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制。

signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制:


  • blinker的api
  • blinker-signal的实现
  • flask-signal的实现
  • django-signal的实现
  • weakref介绍
  • 小结
  • 小技巧


blinker简介



blinker源码使用 1.4 版本, 项目结构如下:


文件 描述
base.py 核心逻辑
_saferef.py 安全引用相关逻辑
_utilities.py 工具类


blinker的API



blinker的api使用示例:


from blinker import signal
def subscriber1(sender):
    print("1 Got a signal sent by %r" % sender)
def subscriber2(sender):
    print("2 Got a signal sent by %r" % sender)
ready = signal('ready')
print(ready)
ready.connect(subscriber1)
ready.connect(subscriber2)
ready.send("go")


示例的日志输出:


<blinker.base.NamedSignal object at 0x7f93a805ad00; 'ready'>
1 Got a signal sent by 'go'
2 Got a signal sent by 'go'


可以看到signal是发布/订阅模式。或者换个更常见的说法,事件中心:


  • ready = signal('ready') 创建名为ready的事件中心
  • ready.connect(subscriber1) 给ready事件中心添加事件监听器
  • ready.send("go") 向ready事件中心派发事件,这样事件监听器会收到事件并进行处理


signal的实现



signal默认单例,提供开箱即用的API:


class NamedSignal(Signal):
    """A named generic notification emitter."""
    def __init__(self, name, doc=None):
        Signal.__init__(self, doc)
        self.name = name
class Namespace(dict):
    def signal(self, name, doc=None):
        try:
            return self[name]
        except KeyError:
            return self.setdefault(name, NamedSignal(name, doc))
signal = Namespace().signal


需要说明一下的是,signal的单例是和name绑定的。同一个名称得到同一个NamedSignal对象,不同名称得到的NamedSignal对象不一样。


NamedSignal的父类Signal的构造方法,包括1)事件接收器字典receivers:以事件接收器id为key和事件接收器为value;2)接收器ID-发送器ID的字典:以接收器ID为key和发送器ID集合为value;3)和2类似的字典,只不过是反向的,key为发送器ID,value为接收器集合。


ANY = symbol('ANY')
class Signal(object):
    ANY = ANY
    def __init__(self, doc=None)
        self.receivers = {}
        self._by_receiver = defaultdict(set)
        self._by_sender = defaultdict(set)
        ...


Signal的connect函数添加消息接收器,可以看到sender和receiver是多对多的关系。


def connect(self, receiver, sender=ANY, weak=True):
    receiver_id = hashable_identity(receiver)
    receiver_ref = receiver
    sender_id = ANY_ID
    self.receivers.setdefault(receiver_id, receiver_ref)
    self._by_sender[sender_id].add(receiver_id)
    self._by_receiver[receiver_id].add(sender_id)
    del receiver_ref
    return receiver


Signal的send函数将消息发送给所有关注该sender的接收器:


def send(self, *sender, **kwargs):
    sender = sender[0]
    # 循环执行所有的receiver
    return [(receiver, receiver(sender, **kwargs))
            for receiver in self.receivers_for(sender)]
def receivers_for(self, sender):
    sender_id = hashable_identity(sender)
    # 根据sender_id找receiver_id
    if sender_id in self._by_sender:
        # 2个set的合集 
        ids = (self._by_sender[ANY_ID] |
               self._by_sender[sender_id])
    else:
        ids = self._by_sender[ANY_ID].copy()
    for receiver_id in ids:
        receiver = self.receivers.get(receiver_id)
        if receiver is None:
            continue
        # 迭代器
        yield receiver


有始有终,Signal使用disconnect函数注销消息的接收器:


def disconnect(self, receiver, sender=ANY):
    sender_id = ANY_ID
    receiver_id = hashable_identity(receiver)
    self._disconnect(receiver_id, sender_id)
def _disconnect(self, receiver_id, sender_id):
    if sender_id == ANY_ID:
        if self._by_receiver.pop(receiver_id, False):
            for bucket in self._by_sender.values():
                bucket.discard(receiver_id)
        self.receivers.pop(receiver_id, None)
    else:
        self._by_sender[sender_id].discard(receiver_id)
        self._by_receiver[receiver_id].discard(sender_id)


为了便于理解signal机制,我们暂时忽略了weakref相关的代码,稍后再进行介绍。


flask-signal的实现



flask-signal依赖blinker的实现:


# flask.signals.py
from blinker import Namespace
_signals = Namespace()
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")


从上面代码可以看到flask使用blinker预制了多个signal。以request_started为例, flask在处理request时候会向request_started派发事件:


# flask.app.py
from .signals import request_started
def full_dispatch_request(self):
    ...
    request_started.send(self)
    ...


我们可以在自己的代码中,这样注册事件监听:

def log_request(sender, **extra):
    sender.logger.debug('Request context is set up')
from flask import request_started
request_started.connect(log_request, app)


这样就可以很方便的使用signal获取到flask在各个阶段的数据。


django-signal的实现



django-signal虽然是独立实现,但是模式和blinker非常类似。Signal构造函数创建了一个对象,充当事件中心。


# django/dispatch/dispatcher.py
def _make_id(target):
    if hasattr(target, '__func__'):
        return (id(target.__self__), id(target.__func__))
    return id(target)
NONE_ID = _make_id(None)
# A marker for caching
NO_RECEIVERS = object()
class Signal:
    def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.
        """
        self.receivers = []
        self.lock = threading.Lock()
        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False


connect核心功能就是为事件监听器构建唯一标识(receiver_id,sender_id),然后加入receivers数组。


def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
    from django.conf import settings
    lookup_key = (_make_id(receiver), _make_id(sender))
    ref = weakref.ref
    receiver_object = receiver
    receiver = ref(receiver)
    with self.lock:
        if not any(r_key == lookup_key for r_key, _ in self.receivers):
            self.receivers.append((lookup_key, receiver))


send函数和blinker的send类似:


def send(self, sender, **named):
    return [
        (receiver, receiver(signal=self, sender=sender, **named))
        for receiver in self._live_receivers(sender)
    ]
def _live_receivers(self, sender):
    with self.lock:
        senderkey = _make_id(sender)
        receivers = []
        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == NONE_ID or r_senderkey == senderkey:
                receivers.append(receiver)
    ...
    non_weak_receivers = []
    for receiver in receivers:
        non_weak_receivers.append(receiver)
    return non_weak_receivers


django-signal额外提供了一个receiver装饰器,方便业务使用:


def receiver(signal, **kwargs):
    def _decorator(func):
        if isinstance(signal, (list, tuple)):
            for s in signal:
                s.connect(func, **kwargs)
        else:
            signal.connect(func, **kwargs)
        return func
    return _decorator


django的model中额外包装了ModelSignal类并且预制了一些signal:



class ModelSignal(Signal):
    def _lazy_method(self, method, apps, receiver, sender, **kwargs):
        from django.db.models.options import Options
        # This partial takes a single optional argument named "sender".
        partial_method = partial(method, receiver, **kwargs)
        if isinstance(sender, str):
            apps = apps or Options.default_apps
            apps.lazy_model_operation(partial_method, make_model_tuple(sender))
        else:
            return partial_method(sender)
    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None, apps=None):
        self._lazy_method(
            super().connect, apps, receiver, sender,
            weak=weak, dispatch_uid=dispatch_uid,
        )
...
# 定义模型各个阶段的signal
pre_init = ModelSignal(use_caching=True)
post_init = ModelSignal(use_caching=True)
pre_save = ModelSignal(use_caching=True)
post_save = ModelSignal(use_caching=True)
pre_delete = ModelSignal(use_caching=True)
post_delete = ModelSignal(use_caching=True)
m2m_changed = ModelSignal(use_caching=True)
pre_migrate = Signal()


signal的使用方式在receiver装饰器的注释中有介绍:


"""
A decorator for connecting receivers to signals. Used by passing in the
signal (or list of signals) and keyword arguments to connect::
    @receiver(post_save, sender=MyModel)
    def signal_receiver(sender, **kwargs):
        ...
    @receiver([post_save, post_delete], sender=MyModel)
    def signals_receiver(sender, **kwargs):
        ...
"""


这样利用signal机制,可以对MyModel进行一些额外的逻辑处理,又避免了代码的硬耦合。


weakref 介绍



了解了signal的各种实现和使用后,我们再回头学习blinker-signal中另外一个环节weakref。weakref可以显著提高signal的性能, 请看下面示例:


def test_weak_value_dict(cache):
    c_list = []
    class C:
        def method(self):
            return ("method called!", id(self))
    c1 = C()
    c2 = C()
    c3 = C()
    c_list.append(c1)
    c_list.append(c2)
    c_list.append(c3)
    del c1, c2, c3
    def do_cache(cache, name, target):
        cache[name] = target
    for idx, target in enumerate(c_list):
        do_cache(cache, idx, target)
    for k, v in cache.items():
        print("before", k, v.method())
    del c_list
    gc.collect()
    for x, y in cache.items():
        print("after", x, y.method())
test_weak_value_dict({})
print("==" * 10)
test_weak_value_dict(weakref.WeakValueDictionary())


在test_weak_value_dict函数中,创建了3个对象,将对象放到一个列表和cache中,完成后再删除对象和对象列表并进行gc。如果cache的实现是set,那么gc后cache中任然存在3个对象,也就是对象不会回收;如果是使用WeakValueDictionary实现的cache,则部分对象进行了回收。在一个事件中心,如果监听函数取消后却无法释放回收,内存会持续增长。


before 0 ('method called!', 140431874960640)
before 1 ('method called!', 140431874959440)
before 2 ('method called!', 140431874959968)
after 0 ('method called!', 140431874960640)
after 1 ('method called!', 140431874959440)
after 2 ('method called!', 140431874959968)
====================
before 0 ('method called!', 140431875860416)
before 1 ('method called!', 140431875860128)
before 2 ('method called!', 140431876163136)
after 2 ('method called!', 140431876163136)


为什么WeakValueDictionary还保留最后一个数据呢? 欢迎大家评论区交流


signal 小结



到这里我们可以知道blinker/flask/django的signal都是单纯的python消息中心,和我们之前在gunicorn中使用的系统 signal 完全不一样。消息中心,可以用来进行业务逻辑的解耦,一般就包括三步:


  • 注册监听器
  • 派发事件
  • 注销监听器


小技巧



blinker中提供了一种 单例模式 的实现参考,我把它叫做 分组单例 , 组名相同会得到同一个对象实例:


class _symbol(object):
    def __init__(self, group):
        """Construct a new group symbol."""
        # 原文是name,我把它换成了group,感觉这样更容易理解一些
        self.__group__ = self.group = group
    def __reduce__(self):
        return symbol, (self.group,)
    def __repr__(self):
        return self.group
_symbol.__group__ = 'symbol'
class symbol(object):
    """A constant symbol.
    # group相同的symbol是同一个对象
    >>> symbol('foo') is symbol('foo')
    True
    >>> symbol('foo')
    foo
    """
    symbols = {}
    def __new__(cls, group):
        try:
            return cls.symbols[group]
        except KeyError:
            return cls.symbols.setdefault(group, _symbol(group))
ANY = symbol('ANY')  # 单例


参考链接:




目录
相关文章
|
1月前
|
人工智能 Python
【02】做一个精美的打飞机小游戏,python开发小游戏-鹰击长空—优雅草央千澈-持续更新-分享源代码和游戏包供游玩-记录完整开发过程-用做好的素材来完善鹰击长空1.0.1版本
【02】做一个精美的打飞机小游戏,python开发小游戏-鹰击长空—优雅草央千澈-持续更新-分享源代码和游戏包供游玩-记录完整开发过程-用做好的素材来完善鹰击长空1.0.1版本
56 7
|
4天前
|
JavaScript 搜索推荐 Android开发
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
23 8
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
|
18天前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
97 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
4天前
|
人工智能 测试技术 数据处理
通义灵码 2.0 体验报告:Deepseek 加持下的 Python 开发之旅
通义灵码 2.0 体验报告:Deepseek 加持下的 Python 开发之旅
57 11
|
1月前
|
前端开发 搜索推荐 编译器
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
111 34
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
|
8天前
|
API Python
python泛微e9接口开发
通过POST请求向指定IP的API注册设备以获取`secrit`和`spk`。请求需包含`appid`、`loginid`、`pwd`等头信息。响应中包含状态码、消息及`secrit`(注意拼写)、`secret`和`spk`字段。示例代码使用`curl`命令发送请求,成功后返回相关信息。
30 5
|
29天前
|
缓存 JSON 数据处理
Python进阶:深入理解import机制与importlib的妙用
本文深入解析了Python的`import`机制及其背后的原理,涵盖基本用法、模块缓存、导入搜索路径和导入钩子等内容。通过理解这些机制,开发者可以优化模块加载速度并确保代码的一致性。文章还介绍了`importlib`的强大功能,如动态模块导入、实现插件系统及重新加载模块,展示了如何利用这些特性编写更加灵活和高效的代码。掌握这些知识有助于提升编程技能,充分利用Python的强大功能。
28 4
|
2月前
|
IDE 测试技术 开发工具
10个必备Python调试技巧:从pdb到单元测试的开发效率提升指南
在Python开发中,调试是提升效率的关键技能。本文总结了10个实用的调试方法,涵盖内置调试器pdb、breakpoint()函数、断言机制、logging模块、列表推导式优化、IPython调试、警告机制、IDE调试工具、inspect模块和单元测试框架的应用。通过这些技巧,开发者可以更高效地定位和解决问题,提高代码质量。
333 8
10个必备Python调试技巧:从pdb到单元测试的开发效率提升指南
|
1月前
|
人工智能 编译器 Python
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
41 0
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
|
2月前
|
存储 API 数据库
使用Python开发获取商品销量详情API接口
本文介绍了使用Python开发获取商品销量详情的API接口方法,涵盖API接口概述、技术选型(Flask与FastAPI)、环境准备、API接口创建及调用淘宝开放平台API等内容。通过示例代码,详细说明了如何构建和调用API,以及开发过程中需要注意的事项,如数据库连接、API权限、错误处理、安全性和性能优化等。
167 5

热门文章

最新文章

推荐镜像

更多