一文打尽python-web开发的signal机制

简介: signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制。

signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制:


  • blinker的api
  • blinker-signal的实现
  • flask-signal的实现
  • django-signal的实现
  • weakref介绍
  • 小结
  • 小技巧


blinker简介



blinker源码使用 1.4 版本, 项目结构如下:


文件 描述
base.py 核心逻辑
_saferef.py 安全引用相关逻辑
_utilities.py 工具类


blinker的API



blinker的api使用示例:


from blinker import signal
def subscriber1(sender):
    print("1 Got a signal sent by %r" % sender)
def subscriber2(sender):
    print("2 Got a signal sent by %r" % sender)
ready = signal('ready')
print(ready)
ready.connect(subscriber1)
ready.connect(subscriber2)
ready.send("go")


示例的日志输出:


<blinker.base.NamedSignal object at 0x7f93a805ad00; 'ready'>
1 Got a signal sent by 'go'
2 Got a signal sent by 'go'


可以看到signal是发布/订阅模式。或者换个更常见的说法,事件中心:


  • ready = signal('ready') 创建名为ready的事件中心
  • ready.connect(subscriber1) 给ready事件中心添加事件监听器
  • ready.send("go") 向ready事件中心派发事件,这样事件监听器会收到事件并进行处理


signal的实现



signal默认单例,提供开箱即用的API:


class NamedSignal(Signal):
    """A named generic notification emitter."""
    def __init__(self, name, doc=None):
        Signal.__init__(self, doc)
        self.name = name
class Namespace(dict):
    def signal(self, name, doc=None):
        try:
            return self[name]
        except KeyError:
            return self.setdefault(name, NamedSignal(name, doc))
signal = Namespace().signal


需要说明一下的是,signal的单例是和name绑定的。同一个名称得到同一个NamedSignal对象,不同名称得到的NamedSignal对象不一样。


NamedSignal的父类Signal的构造方法,包括1)事件接收器字典receivers:以事件接收器id为key和事件接收器为value;2)接收器ID-发送器ID的字典:以接收器ID为key和发送器ID集合为value;3)和2类似的字典,只不过是反向的,key为发送器ID,value为接收器集合。


ANY = symbol('ANY')
class Signal(object):
    ANY = ANY
    def __init__(self, doc=None)
        self.receivers = {}
        self._by_receiver = defaultdict(set)
        self._by_sender = defaultdict(set)
        ...


Signal的connect函数添加消息接收器,可以看到sender和receiver是多对多的关系。


def connect(self, receiver, sender=ANY, weak=True):
    receiver_id = hashable_identity(receiver)
    receiver_ref = receiver
    sender_id = ANY_ID
    self.receivers.setdefault(receiver_id, receiver_ref)
    self._by_sender[sender_id].add(receiver_id)
    self._by_receiver[receiver_id].add(sender_id)
    del receiver_ref
    return receiver


Signal的send函数将消息发送给所有关注该sender的接收器:


def send(self, *sender, **kwargs):
    sender = sender[0]
    # 循环执行所有的receiver
    return [(receiver, receiver(sender, **kwargs))
            for receiver in self.receivers_for(sender)]
def receivers_for(self, sender):
    sender_id = hashable_identity(sender)
    # 根据sender_id找receiver_id
    if sender_id in self._by_sender:
        # 2个set的合集 
        ids = (self._by_sender[ANY_ID] |
               self._by_sender[sender_id])
    else:
        ids = self._by_sender[ANY_ID].copy()
    for receiver_id in ids:
        receiver = self.receivers.get(receiver_id)
        if receiver is None:
            continue
        # 迭代器
        yield receiver


有始有终,Signal使用disconnect函数注销消息的接收器:


def disconnect(self, receiver, sender=ANY):
    sender_id = ANY_ID
    receiver_id = hashable_identity(receiver)
    self._disconnect(receiver_id, sender_id)
def _disconnect(self, receiver_id, sender_id):
    if sender_id == ANY_ID:
        if self._by_receiver.pop(receiver_id, False):
            for bucket in self._by_sender.values():
                bucket.discard(receiver_id)
        self.receivers.pop(receiver_id, None)
    else:
        self._by_sender[sender_id].discard(receiver_id)
        self._by_receiver[receiver_id].discard(sender_id)


为了便于理解signal机制,我们暂时忽略了weakref相关的代码,稍后再进行介绍。


flask-signal的实现



flask-signal依赖blinker的实现:


# flask.signals.py
from blinker import Namespace
_signals = Namespace()
template_rendered = _signals.signal("template-rendered")
before_render_template = _signals.signal("before-render-template")
request_started = _signals.signal("request-started")
request_finished = _signals.signal("request-finished")
request_tearing_down = _signals.signal("request-tearing-down")
got_request_exception = _signals.signal("got-request-exception")
appcontext_tearing_down = _signals.signal("appcontext-tearing-down")
appcontext_pushed = _signals.signal("appcontext-pushed")
appcontext_popped = _signals.signal("appcontext-popped")
message_flashed = _signals.signal("message-flashed")


从上面代码可以看到flask使用blinker预制了多个signal。以request_started为例, flask在处理request时候会向request_started派发事件:


# flask.app.py
from .signals import request_started
def full_dispatch_request(self):
    ...
    request_started.send(self)
    ...


我们可以在自己的代码中,这样注册事件监听:

def log_request(sender, **extra):
    sender.logger.debug('Request context is set up')
from flask import request_started
request_started.connect(log_request, app)


这样就可以很方便的使用signal获取到flask在各个阶段的数据。


django-signal的实现



django-signal虽然是独立实现,但是模式和blinker非常类似。Signal构造函数创建了一个对象,充当事件中心。


# django/dispatch/dispatcher.py
def _make_id(target):
    if hasattr(target, '__func__'):
        return (id(target.__self__), id(target.__func__))
    return id(target)
NONE_ID = _make_id(None)
# A marker for caching
NO_RECEIVERS = object()
class Signal:
    def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.
        """
        self.receivers = []
        self.lock = threading.Lock()
        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False


connect核心功能就是为事件监听器构建唯一标识(receiver_id,sender_id),然后加入receivers数组。


def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
    from django.conf import settings
    lookup_key = (_make_id(receiver), _make_id(sender))
    ref = weakref.ref
    receiver_object = receiver
    receiver = ref(receiver)
    with self.lock:
        if not any(r_key == lookup_key for r_key, _ in self.receivers):
            self.receivers.append((lookup_key, receiver))


send函数和blinker的send类似:


def send(self, sender, **named):
    return [
        (receiver, receiver(signal=self, sender=sender, **named))
        for receiver in self._live_receivers(sender)
    ]
def _live_receivers(self, sender):
    with self.lock:
        senderkey = _make_id(sender)
        receivers = []
        for (receiverkey, r_senderkey), receiver in self.receivers:
            if r_senderkey == NONE_ID or r_senderkey == senderkey:
                receivers.append(receiver)
    ...
    non_weak_receivers = []
    for receiver in receivers:
        non_weak_receivers.append(receiver)
    return non_weak_receivers


django-signal额外提供了一个receiver装饰器,方便业务使用:


def receiver(signal, **kwargs):
    def _decorator(func):
        if isinstance(signal, (list, tuple)):
            for s in signal:
                s.connect(func, **kwargs)
        else:
            signal.connect(func, **kwargs)
        return func
    return _decorator


django的model中额外包装了ModelSignal类并且预制了一些signal:



class ModelSignal(Signal):
    def _lazy_method(self, method, apps, receiver, sender, **kwargs):
        from django.db.models.options import Options
        # This partial takes a single optional argument named "sender".
        partial_method = partial(method, receiver, **kwargs)
        if isinstance(sender, str):
            apps = apps or Options.default_apps
            apps.lazy_model_operation(partial_method, make_model_tuple(sender))
        else:
            return partial_method(sender)
    def connect(self, receiver, sender=None, weak=True, dispatch_uid=None, apps=None):
        self._lazy_method(
            super().connect, apps, receiver, sender,
            weak=weak, dispatch_uid=dispatch_uid,
        )
...
# 定义模型各个阶段的signal
pre_init = ModelSignal(use_caching=True)
post_init = ModelSignal(use_caching=True)
pre_save = ModelSignal(use_caching=True)
post_save = ModelSignal(use_caching=True)
pre_delete = ModelSignal(use_caching=True)
post_delete = ModelSignal(use_caching=True)
m2m_changed = ModelSignal(use_caching=True)
pre_migrate = Signal()


signal的使用方式在receiver装饰器的注释中有介绍:


"""
A decorator for connecting receivers to signals. Used by passing in the
signal (or list of signals) and keyword arguments to connect::
    @receiver(post_save, sender=MyModel)
    def signal_receiver(sender, **kwargs):
        ...
    @receiver([post_save, post_delete], sender=MyModel)
    def signals_receiver(sender, **kwargs):
        ...
"""


这样利用signal机制,可以对MyModel进行一些额外的逻辑处理,又避免了代码的硬耦合。


weakref 介绍



了解了signal的各种实现和使用后,我们再回头学习blinker-signal中另外一个环节weakref。weakref可以显著提高signal的性能, 请看下面示例:


def test_weak_value_dict(cache):
    c_list = []
    class C:
        def method(self):
            return ("method called!", id(self))
    c1 = C()
    c2 = C()
    c3 = C()
    c_list.append(c1)
    c_list.append(c2)
    c_list.append(c3)
    del c1, c2, c3
    def do_cache(cache, name, target):
        cache[name] = target
    for idx, target in enumerate(c_list):
        do_cache(cache, idx, target)
    for k, v in cache.items():
        print("before", k, v.method())
    del c_list
    gc.collect()
    for x, y in cache.items():
        print("after", x, y.method())
test_weak_value_dict({})
print("==" * 10)
test_weak_value_dict(weakref.WeakValueDictionary())


在test_weak_value_dict函数中,创建了3个对象,将对象放到一个列表和cache中,完成后再删除对象和对象列表并进行gc。如果cache的实现是set,那么gc后cache中任然存在3个对象,也就是对象不会回收;如果是使用WeakValueDictionary实现的cache,则部分对象进行了回收。在一个事件中心,如果监听函数取消后却无法释放回收,内存会持续增长。


before 0 ('method called!', 140431874960640)
before 1 ('method called!', 140431874959440)
before 2 ('method called!', 140431874959968)
after 0 ('method called!', 140431874960640)
after 1 ('method called!', 140431874959440)
after 2 ('method called!', 140431874959968)
====================
before 0 ('method called!', 140431875860416)
before 1 ('method called!', 140431875860128)
before 2 ('method called!', 140431876163136)
after 2 ('method called!', 140431876163136)


为什么WeakValueDictionary还保留最后一个数据呢? 欢迎大家评论区交流


signal 小结



到这里我们可以知道blinker/flask/django的signal都是单纯的python消息中心,和我们之前在gunicorn中使用的系统 signal 完全不一样。消息中心,可以用来进行业务逻辑的解耦,一般就包括三步:


  • 注册监听器
  • 派发事件
  • 注销监听器


小技巧



blinker中提供了一种 单例模式 的实现参考,我把它叫做 分组单例 , 组名相同会得到同一个对象实例:


class _symbol(object):
    def __init__(self, group):
        """Construct a new group symbol."""
        # 原文是name,我把它换成了group,感觉这样更容易理解一些
        self.__group__ = self.group = group
    def __reduce__(self):
        return symbol, (self.group,)
    def __repr__(self):
        return self.group
_symbol.__group__ = 'symbol'
class symbol(object):
    """A constant symbol.
    # group相同的symbol是同一个对象
    >>> symbol('foo') is symbol('foo')
    True
    >>> symbol('foo')
    foo
    """
    symbols = {}
    def __new__(cls, group):
        try:
            return cls.symbols[group]
        except KeyError:
            return cls.symbols.setdefault(group, _symbol(group))
ANY = symbol('ANY')  # 单例


参考链接:




目录
相关文章
|
8天前
|
存储 数据库连接 API
Python环境变量在开发和运行Python应用程序时起着重要的作用
Python环境变量在开发和运行Python应用程序时起着重要的作用
48 15
|
20天前
|
设计模式 前端开发 数据库
Python Web开发:Django框架下的全栈开发实战
【10月更文挑战第27天】本文介绍了Django框架在Python Web开发中的应用,涵盖了Django与Flask等框架的比较、项目结构、模型、视图、模板和URL配置等内容,并展示了实际代码示例,帮助读者快速掌握Django全栈开发的核心技术。
113 45
|
15天前
|
JSON 安全 API
如何使用Python开发API接口?
在现代软件开发中,API(应用程序编程接口)用于不同软件组件之间的通信和数据交换,实现系统互操作性。Python因其简单易用和强大功能,成为开发API的热门选择。本文详细介绍了Python开发API的基础知识、优势、实现方式(如Flask和Django框架)、实战示例及注意事项,帮助读者掌握高效、安全的API开发技巧。
41 3
如何使用Python开发API接口?
|
1天前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
13 2
|
8天前
|
JSON API 数据格式
如何使用Python开发1688商品详情API接口?
本文介绍了如何使用Python开发1688商品详情API接口,获取商品的标题、价格、销量和评价等详细信息。主要内容包括注册1688开放平台账号、安装必要Python模块、了解API接口、生成签名、编写Python代码、解析返回数据以及错误处理和日志记录。通过这些步骤,开发者可以轻松地集成1688商品数据到自己的应用中。
24 1
|
14天前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
16天前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
在Web开发中,前后端的高效交互是提升用户体验的关键。本文通过一个基于Flask框架的博客系统实战案例,详细介绍了如何使用AJAX和Fetch API实现不刷新页面查看评论的功能。从后端路由设置到前端请求处理,全面展示了这两种技术的应用技巧,帮助Python Web开发者提升项目质量和开发效率。
31 1
|
18天前
|
XML 安全 PHP
PHP与SOAP Web服务开发:基础与进阶教程
本文介绍了PHP与SOAP Web服务的基础和进阶知识,涵盖SOAP的基本概念、PHP中的SoapServer和SoapClient类的使用方法,以及服务端和客户端的开发示例。此外,还探讨了安全性、性能优化等高级主题,帮助开发者掌握更高效的Web服务开发技巧。
|
19天前
|
算法 测试技术 开发者
性能优化与代码审查:提升Python开发效率
性能优化与代码审查:提升Python开发效率
28 1
|
20天前
|
监控 Java 开发者
Python的垃圾收集机制有哪些?
Python的垃圾收集机制有哪些?
下一篇
无影云桌面