signal在flask/django中都是很重要的解耦手段。flask的signal依赖blinker实现,django的signal也很类似。blinker库是纯python实现的代码简单,功能强大的signal库。本文我们从blinker开始,一起了解python-web开发的signal机制:
- blinker的api
- blinker-signal的实现
- flask-signal的实现
- django-signal的实现
- weakref介绍
- 小结
- 小技巧
blinker简介
blinker源码使用 1.4
版本, 项目结构如下:
文件 | 描述 |
base.py | 核心逻辑 |
_saferef.py | 安全引用相关逻辑 |
_utilities.py | 工具类 |
blinker的API
blinker的api使用示例:
from blinker import signal def subscriber1(sender): print("1 Got a signal sent by %r" % sender) def subscriber2(sender): print("2 Got a signal sent by %r" % sender) ready = signal('ready') print(ready) ready.connect(subscriber1) ready.connect(subscriber2) ready.send("go")
示例的日志输出:
<blinker.base.NamedSignal object at 0x7f93a805ad00; 'ready'> 1 Got a signal sent by 'go' 2 Got a signal sent by 'go'
可以看到signal是发布/订阅模式。或者换个更常见的说法,事件中心:
ready = signal('ready')
创建名为ready的事件中心ready.connect(subscriber1)
给ready事件中心添加事件监听器ready.send("go")
向ready事件中心派发事件,这样事件监听器会收到事件并进行处理
signal的实现
signal默认单例,提供开箱即用的API:
class NamedSignal(Signal): """A named generic notification emitter.""" def __init__(self, name, doc=None): Signal.__init__(self, doc) self.name = name class Namespace(dict): def signal(self, name, doc=None): try: return self[name] except KeyError: return self.setdefault(name, NamedSignal(name, doc)) signal = Namespace().signal
需要说明一下的是,signal的单例是和name绑定的。同一个名称得到同一个NamedSignal对象,不同名称得到的NamedSignal对象不一样。
NamedSignal的父类Signal的构造方法,包括1)事件接收器字典receivers:以事件接收器id为key和事件接收器为value;2)接收器ID-发送器ID的字典:以接收器ID为key和发送器ID集合为value;3)和2类似的字典,只不过是反向的,key为发送器ID,value为接收器集合。
ANY = symbol('ANY') class Signal(object): ANY = ANY def __init__(self, doc=None) self.receivers = {} self._by_receiver = defaultdict(set) self._by_sender = defaultdict(set) ...
Signal的connect函数添加消息接收器,可以看到sender和receiver是多对多的关系。
def connect(self, receiver, sender=ANY, weak=True): receiver_id = hashable_identity(receiver) receiver_ref = receiver sender_id = ANY_ID self.receivers.setdefault(receiver_id, receiver_ref) self._by_sender[sender_id].add(receiver_id) self._by_receiver[receiver_id].add(sender_id) del receiver_ref return receiver
Signal的send函数将消息发送给所有关注该sender的接收器:
def send(self, *sender, **kwargs): sender = sender[0] # 循环执行所有的receiver return [(receiver, receiver(sender, **kwargs)) for receiver in self.receivers_for(sender)] def receivers_for(self, sender): sender_id = hashable_identity(sender) # 根据sender_id找receiver_id if sender_id in self._by_sender: # 2个set的合集 ids = (self._by_sender[ANY_ID] | self._by_sender[sender_id]) else: ids = self._by_sender[ANY_ID].copy() for receiver_id in ids: receiver = self.receivers.get(receiver_id) if receiver is None: continue # 迭代器 yield receiver
有始有终,Signal使用disconnect函数注销消息的接收器:
def disconnect(self, receiver, sender=ANY): sender_id = ANY_ID receiver_id = hashable_identity(receiver) self._disconnect(receiver_id, sender_id) def _disconnect(self, receiver_id, sender_id): if sender_id == ANY_ID: if self._by_receiver.pop(receiver_id, False): for bucket in self._by_sender.values(): bucket.discard(receiver_id) self.receivers.pop(receiver_id, None) else: self._by_sender[sender_id].discard(receiver_id) self._by_receiver[receiver_id].discard(sender_id)
为了便于理解signal机制,我们暂时忽略了weakref相关的代码,稍后再进行介绍。
flask-signal的实现
flask-signal依赖blinker的实现:
# flask.signals.py from blinker import Namespace _signals = Namespace() template_rendered = _signals.signal("template-rendered") before_render_template = _signals.signal("before-render-template") request_started = _signals.signal("request-started") request_finished = _signals.signal("request-finished") request_tearing_down = _signals.signal("request-tearing-down") got_request_exception = _signals.signal("got-request-exception") appcontext_tearing_down = _signals.signal("appcontext-tearing-down") appcontext_pushed = _signals.signal("appcontext-pushed") appcontext_popped = _signals.signal("appcontext-popped") message_flashed = _signals.signal("message-flashed")
从上面代码可以看到flask使用blinker预制了多个signal。以request_started为例, flask在处理request时候会向request_started派发事件:
# flask.app.py from .signals import request_started def full_dispatch_request(self): ... request_started.send(self) ...
我们可以在自己的代码中,这样注册事件监听:
def log_request(sender, **extra): sender.logger.debug('Request context is set up') from flask import request_started request_started.connect(log_request, app)
这样就可以很方便的使用signal获取到flask在各个阶段的数据。
django-signal的实现
django-signal虽然是独立实现,但是模式和blinker非常类似。Signal构造函数创建了一个对象,充当事件中心。
# django/dispatch/dispatcher.py def _make_id(target): if hasattr(target, '__func__'): return (id(target.__self__), id(target.__func__)) return id(target) NONE_ID = _make_id(None) # A marker for caching NO_RECEIVERS = object() class Signal: def __init__(self, providing_args=None, use_caching=False): """ Create a new signal. """ self.receivers = [] self.lock = threading.Lock() self.use_caching = use_caching self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {} self._dead_receivers = False
connect核心功能就是为事件监听器构建唯一标识(receiver_id,sender_id),然后加入receivers数组。
def connect(self, receiver, sender=None, weak=True, dispatch_uid=None): from django.conf import settings lookup_key = (_make_id(receiver), _make_id(sender)) ref = weakref.ref receiver_object = receiver receiver = ref(receiver) with self.lock: if not any(r_key == lookup_key for r_key, _ in self.receivers): self.receivers.append((lookup_key, receiver))
send函数和blinker的send类似:
def send(self, sender, **named): return [ (receiver, receiver(signal=self, sender=sender, **named)) for receiver in self._live_receivers(sender) ] def _live_receivers(self, sender): with self.lock: senderkey = _make_id(sender) receivers = [] for (receiverkey, r_senderkey), receiver in self.receivers: if r_senderkey == NONE_ID or r_senderkey == senderkey: receivers.append(receiver) ... non_weak_receivers = [] for receiver in receivers: non_weak_receivers.append(receiver) return non_weak_receivers
django-signal额外提供了一个receiver装饰器,方便业务使用:
def receiver(signal, **kwargs): def _decorator(func): if isinstance(signal, (list, tuple)): for s in signal: s.connect(func, **kwargs) else: signal.connect(func, **kwargs) return func return _decorator
django的model中额外包装了ModelSignal类并且预制了一些signal:
class ModelSignal(Signal): def _lazy_method(self, method, apps, receiver, sender, **kwargs): from django.db.models.options import Options # This partial takes a single optional argument named "sender". partial_method = partial(method, receiver, **kwargs) if isinstance(sender, str): apps = apps or Options.default_apps apps.lazy_model_operation(partial_method, make_model_tuple(sender)) else: return partial_method(sender) def connect(self, receiver, sender=None, weak=True, dispatch_uid=None, apps=None): self._lazy_method( super().connect, apps, receiver, sender, weak=weak, dispatch_uid=dispatch_uid, ) ... # 定义模型各个阶段的signal pre_init = ModelSignal(use_caching=True) post_init = ModelSignal(use_caching=True) pre_save = ModelSignal(use_caching=True) post_save = ModelSignal(use_caching=True) pre_delete = ModelSignal(use_caching=True) post_delete = ModelSignal(use_caching=True) m2m_changed = ModelSignal(use_caching=True) pre_migrate = Signal()
signal的使用方式在receiver装饰器的注释中有介绍:
""" A decorator for connecting receivers to signals. Used by passing in the signal (or list of signals) and keyword arguments to connect:: @receiver(post_save, sender=MyModel) def signal_receiver(sender, **kwargs): ... @receiver([post_save, post_delete], sender=MyModel) def signals_receiver(sender, **kwargs): ... """
这样利用signal机制,可以对MyModel进行一些额外的逻辑处理,又避免了代码的硬耦合。
weakref 介绍
了解了signal的各种实现和使用后,我们再回头学习blinker-signal中另外一个环节weakref。weakref可以显著提高signal的性能, 请看下面示例:
def test_weak_value_dict(cache): c_list = [] class C: def method(self): return ("method called!", id(self)) c1 = C() c2 = C() c3 = C() c_list.append(c1) c_list.append(c2) c_list.append(c3) del c1, c2, c3 def do_cache(cache, name, target): cache[name] = target for idx, target in enumerate(c_list): do_cache(cache, idx, target) for k, v in cache.items(): print("before", k, v.method()) del c_list gc.collect() for x, y in cache.items(): print("after", x, y.method()) test_weak_value_dict({}) print("==" * 10) test_weak_value_dict(weakref.WeakValueDictionary())
在test_weak_value_dict函数中,创建了3个对象,将对象放到一个列表和cache中,完成后再删除对象和对象列表并进行gc。如果cache的实现是set,那么gc后cache中任然存在3个对象,也就是对象不会回收;如果是使用WeakValueDictionary实现的cache,则部分对象进行了回收。在一个事件中心,如果监听函数取消后却无法释放回收,内存会持续增长。
before 0 ('method called!', 140431874960640) before 1 ('method called!', 140431874959440) before 2 ('method called!', 140431874959968) after 0 ('method called!', 140431874960640) after 1 ('method called!', 140431874959440) after 2 ('method called!', 140431874959968) ==================== before 0 ('method called!', 140431875860416) before 1 ('method called!', 140431875860128) before 2 ('method called!', 140431876163136) after 2 ('method called!', 140431876163136)
为什么WeakValueDictionary还保留最后一个数据呢? 欢迎大家评论区交流
signal 小结
到这里我们可以知道blinker/flask/django的signal都是单纯的python消息中心,和我们之前在gunicorn中使用的系统 signal
完全不一样。消息中心,可以用来进行业务逻辑的解耦,一般就包括三步:
- 注册监听器
- 派发事件
- 注销监听器
小技巧
blinker中提供了一种 单例模式 的实现参考,我把它叫做 分组单例 , 组名相同会得到同一个对象实例:
class _symbol(object): def __init__(self, group): """Construct a new group symbol.""" # 原文是name,我把它换成了group,感觉这样更容易理解一些 self.__group__ = self.group = group def __reduce__(self): return symbol, (self.group,) def __repr__(self): return self.group _symbol.__group__ = 'symbol' class symbol(object): """A constant symbol. # group相同的symbol是同一个对象 >>> symbol('foo') is symbol('foo') True >>> symbol('foo') foo """ symbols = {} def __new__(cls, group): try: return cls.symbols[group] except KeyError: return cls.symbols.setdefault(group, _symbol(group)) ANY = symbol('ANY') # 单例
参考链接:
- pythonhosted.org/blinker/
- stackify.com/python-garb…
- www.cnblogs.com/TM0831/p/10…
- www.geeksforgeeks.org/weak-refere…
- pymotw.com/2/weakref/