Rust 笔记:发布订阅模式及其 在 Rust 语言中的使用

简介: Rust 笔记:发布订阅模式及其 在 Rust 语言中的使用

Rust 笔记、设计模式发布订阅模式及其 在 Rust 语言中的使用


1. 引例:从我的一个经历说起

1.1 从 订阅 到 发布

记得一九年的时候我刚刚来到深圳工作,众所周知那时候还没有爆发 新冠疫情,身边的同事们组队去香港购物是常有的事情。但是那会儿我还没有办理港澳通行证,于是年底回老家的时候去当地政务中心办理了。

办证是需要时间的,万万没想到的是二零年春节前夕——新冠疫情爆发了。当我回到深圳后的某一天接到老家政务中心的电话,通知我由于疫情的原因,通信证的办理已经被暂停了并且什么时候恢复办理还不能确定,如果愿意等待,则需要到恢复办证的时候,再通知我们

—— 这就是一个 发布-订阅模式的典型例子。

发布-订阅 模式 模式中的多方可以分为两类,一类是消息的 发布者,另外一类是消息的 订阅者。在上面的案例中,政务中心的工作人员就是 发布者,当我表示愿意等到恢复通信证办理时,我就 向发布者订阅了 恢复办理的通知(消息),因此我时消息的 订阅者

这样有什么好处呢:

  • 对于我(订阅者)来说,不需要每隔几天就打电话到政务服务中心(发布者)去询问是否恢复办理的消息;
  • 对于政务服务中心(发布者)同样也不需要每天回答相同的问题——毕竟何时恢复办理他们也不能确定。
  • 一旦恢复办理,政务服务中心(发布者)可以一次性地通知所有和我一样地广大订阅者

看到了吗——相比于我们去轮询以获取消息,改用发布-订阅 模式 同时节省了我们双方地时间!

多么棒地思想!——运用于程序设计中岂不秒哉?

1.2 如果我不想继续订阅了

有一种情况也是非常常见的,那就是我不愿意继续等待消息了,也有可能是这个消息对我来说已经不重要了。这时我不再希望继续收到来自发布者的恢复办理通知,那就需要 退订

还记得吗——当我们订阅的时候,是将我们的订阅意愿登记在发布者那边的,这样就能实现发布者在适当的时候通过查询 所有的登记记录 然后逐一通知。

因此如果一旦有用户需要退订,其实很简单,只需要订阅者在他们所登记订阅的“订阅者登陆表”中将订阅信息删除掉即可,这样下一次广播通知的时候就不会再将消息发送给退订的用户。

2. 发布-订阅 的 实践、应用、思考

2.1 实践:用 Rust 来复现上面的场景

如果现在你好像明白 发布-订阅模式 的基本思想了——那么就请成热打铁,跟着我用程序来模拟一下证件办理的情景。

use std::collections::HashSet;
#[derive(Eq, Hash, PartialEq, Clone)]
struct Subscriber {
    name: String,
}
struct Publisher {
    subscribers: HashSet<Subscriber>,
    name: String,
}
impl Publisher {
    fn new(name: &str) -> Publisher {
        Publisher {
            subscribers: HashSet::new(),
            name: name.to_string(),
        }
    }
    fn add_subscriber(&mut self, subscriber: &Subscriber) {
        self.subscribers.insert(subscriber.clone());
    }
    fn remove_subscriber(&mut self, subscriber: &Subscriber) {
        self.subscribers.remove(subscriber);
        println!("\n=> {} 已取消订阅。\n", subscriber.name);
    }
    fn notify_all(&self, arg: &str) {
        for subscriber in &self.subscribers {
            subscriber.notify(self, arg);
        }
    }
}
impl Subscriber {
    fn new(name: &str) -> Subscriber {
        Subscriber {
            name: name.to_string(),
        }
    }
    fn notify(&self, publisher: &Publisher, arg: &str) {
        println!(
            "\"{}\"(订阅者) 收到的通知来自 \"{}\"(发布者)的通知: {}",
            self.name, publisher.name, arg
        );
    }
}
fn main() {
    let mut publisher = Publisher::new("政务服务中心");
    let jack_lee = Subscriber::new("jackLee");
    let jack_ma = Subscriber::new("jackMa");
    publisher.add_subscriber(&jack_lee);
    publisher.add_subscriber(&jack_ma);
    println!("------- 第一次发布消息 -------");
    publisher.notify_all("[通知] 恢复证件办理!");
    // 用户 jackMa 取消订阅
    publisher.remove_subscriber(&jack_ma);
    println!("------- 第二次发布消息 -------");
    publisher.notify_all("[通知] 恢复证件办理!");
}

运行该脚本,输出如下:

------- 第一次发布消息 -------
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!
"jackMa"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通 知] 恢复证件办理!
=> jackMa 已取消订阅。
------- 第二次发布消息 -------
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!

可以看到订阅者有两个订阅者实例:jackLee(本人)、jackMa(可能是阿里出来的)共同订阅了证件办理信息。

政务服务中心(发布者)第一次发布恢复通知的时候,jackLeejackMa 这两个同学都订阅了消息,因此都受到了来自该中心的通知。后来,jackMa 可能由于已经派小弟火速前往该中心取走了他的证件不需要继续订阅了,于是该中心的工作人(发布者)员调用publisher.remove_subscriber(jackMa) 从该中心的订阅者记录表中移除了 jackMa 的订阅记录。

于是,到了该中心第二次发布消息的时候,jackMa 已经不会再收到恢复证件办理消息,而 jackLee 还可以接收到恢复证件办理的消息。

3. 通用型发布者对象的改进

3.1 从 Subscriber 的服务员 到 事件的发布者

在阅读本小节前请读者先自己尝试回答这个问题:Subscriber 类真的有必要实现吗?

在我们上面的代码中,Subscriber 类 实现了几乎唯一一个有用的方法:update,它的作用却是给 Publisher 类的 notifyAll 方法进行调用。

从现实生活中给一个解释:

notifyAll 是消息的发布这发布消息的工具,update 是订阅用户接受到的新的定制化消息,比如同样是订阅了售房信息,但是由于不同类别的购房者订阅时所选定的楼层、大小等参数不一样,则这些不同订阅者接收到的发布结果不一样——也就表面原始的消息需要为不同的订阅者做一些 定制化 处理。在之前的代码实现中,这个消息的定制化工作就是使用 Subscriber 类的 update 方法实现的。

很显然,上表面的代码要真正实现定制化,往往不仅是参数值的不同,可能对参数的处理也不一样。因此仅仅依赖参数data是不合理的。因此我们大概是需要写多个仅仅 update 方法的实现不同的 Subscriber 类——这不太好。略好一点的办法是,让 update 方法接受的不是单纯的数据 data,而是一个 回调函数 传入 update 方法中。

先不着急修改我们的代码。对于发布者来说,似乎可以提供 更加周到 的服务——直接登记好订阅着的定制化需求处理方式,使用订阅者要求的处理方式处理好定制的消息后,直接告诉订阅者。——因此 update 这个接受表示用户定制化需求处理方式的方法可以直接合并到 发布者那边。

于是 Subscriber 类 就不需要了,现在我们只需要更新一下我们的 Publisher。更新的思路是这样的:

  • 添加订阅者时(Publisher.addSubscriber)不仅需要记录订阅者名字,还要记录一个对应的响应函数用以消息发布后给订阅者提供定制化服务。

Publisher 看,需要登记的内容又多了一些。不过好在 订阅者名称(认为是唯一标识符)和 与之对于的服务(回调函数),是对应的关系,既可以一对一,也可以一对多(表示这个订阅者需要多个定制化服务)。因此我们将 Publisher 的 “记录本”改成下面的类型:

HashMap<String, Vec<EventCallback>>

这个映射(Map)的 key 就表示 订阅者名称,而value 部分是一组函数,表示该订阅者需要的各种服务。

另外,到了这里,对于 Publisher来说,添加订阅者就转化为了 为订阅者订阅各种定制化服务 。同时反过来看,对于某个具体的订阅者 Subscriber,一旦它的服务定制数组 (Function[])为空数组,表明他已经没有任何订阅,也不再需要接收发布者的任何消息了。

因此先前我们使用的方法名addSubscriber 不适用了,从含以上换成 addListener 似乎更加合适

为什么呢? 我们接下来对此做进一步说明。

一直以来,我们聚焦点都在于 发布者订阅者,而忽略了 引起发布者发布的事件。 这个方法接受两个参数,一个是用户名,一个是为用户新增的回调。同时必须指出的是,这个 回调 往往是需要再其调用时接受一些数据的,比如由发布者发布的某些原始数据,他们就像是时时刻刻地 监听着守候着 发布者 发布一个事件一旦这个 事件/消息 被发布,就 完成消息发布后为 订阅者 所提供地服务

换一下思路,我们接着把聚焦点转移到 事件 上来。

其实从现实中看,同一个事件发生,可能意味着可能需 要干很多事,既可以 服务更多的订阅者,也可以干其它任何的工作——我们一味地想着在发布者处登记订阅者的id然后完成订阅者的需求,那么 没有区别为何事件而需要去发布这些消息

更好的做法是 不再记录订阅者,而是记录为什么要发布消息给订阅者——也就是记录 事件这样我们就可以在同一个事件发生的时候,通过一系列的属于该事件函数(可能一个或多个回调函数服务于同一个订阅者),完成该事件的响应,也就是回调函数们。

从这个意义上看,我们所关注所谓的 订阅者 可以看作 一个事件发布后,发布者需要调用的一组函数。而所谓 发布,实际上就是调用这组函数以 完成事件(的回调)。因此我们接下来该用 listener 表示监听事件以待执行的回调函数, event 表示事件名, emit 表示这个事件发生后需要由发布者调用函数的过程。

至此,我们的 Publisher 从一个 Subscriber 的服务员 转型成为了职业 事件的管理者,不妨给它改个名——EventEmitter

现在我们实现一个最基础的 EventEmitter 对象:

use std::collections::HashMap;
type EventCallback = Box<dyn Fn()>;
struct EventEmitter {
    /// 事件-监听器数组容器
    _events: HashMap<String, Vec<EventCallback>>,
}
impl EventEmitter {
    fn new() -> EventEmitter {
        EventEmitter {
            events: HashMap::new(),
        }
    }
  /// 添加事件监听器,监听器是一个回调函数,表示用户订阅的具体服务
    fn add_listener(&mut self, event: &str, callback: EventCallback) {
        let callbacks = self._events.entry(event.to_string()).or_insert(Vec::new());
        callbacks.push(callback);
    }
  /// 移除事件监听器:相当于用户取消订阅
    fn remove_listener(&mut self, event: &str, callback: &EventCallback) {
        if let Some(callbacks) = self._events.get_mut(event) {
            callbacks.retain(|cb| cb != callback);
        }
    }
  /// 触发事件:相当于发布消息或服务,也就是事件发生时,将订阅者订阅的服务一一为订阅者执行
    fn emit(&self, event: &str) {
        if let Some(callbacks) = self._events.get(event) {
            for callback in callbacks {
                callback();
            }
        }
    }
}

3.2 一个比较初步的功能增强

不过很多时候我们还不满足于此,比如能够限制监听器的数量。从现实生活中打个比方,就像我们只服务一定数量的客户,一旦订满,由于资源有限,不再接收其它订阅。更贴切实际地说,就像节假日你去旅游景区地酒店订房间,对于酒店来说,一旦所有房间都预定满了,就不再接收新的订阅了——除非,有已经订阅地客人退订了它们先前已经预定地房间。

实现这样一个功能,只需要一个变量 _max_listeners 作为最大监听器数量的控制变量。在外部相应的我们需要允许用户修改和读取该变量的值,因此还要提供 set_max_listenersget_max_listeners 两个方法。

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type EventCallback = Arc<dyn Fn() + Send + Sync>;
pub struct EventEmitter {
    _events: Mutex<HashMap<String, Vec<EventCallback>>>,
    _max_listeners: usize,
}
impl EventEmitter {
    pub fn new() -> Self {
        EventEmitter {
            _events: Mutex::new(HashMap::new()),
            _max_listeners: usize::MAX,
        }
    }
    /// 设置最大监听器数量
    /// Set the maximum number of listeners
    pub fn set_max_listeners(&mut self, max_listeners: usize) {
        self._max_listeners = max_listeners;
    }
    /// 获取最大监听器数量
    pub fn get_max_listeners(&self) -> usize {
        self._max_listeners
    }
    /// 添加事件监听器
    pub fn add_listener(&self, event: &str, callback: EventCallback) {
        let mut events = self._events.lock().unwrap();
        let callbacks = events.entry(event.to_string()).or_insert(Vec::new());
        callbacks.push(callback);
    }
  /// 移除事件监听器
    pub fn remove_listener(&self, event: &str, callback: &EventCallback) {
        let mut events = self._events.lock().unwrap();
        if let Some(callbacks) = events.get_mut(event) {
            callbacks.retain(|cb| !Arc::ptr_eq(cb, callback));
        }
    }
    /// 触发事件
    pub fn emit(&self, event: &str) {
        let events = self._events.lock().unwrap();
        if let Some(callbacks) = events.get(event) {
            for callback in callbacks {
                let callback_clone = callback.clone();
                // Spawn a new thread to run each callback asynchronously
                std::thread::spawn(move || {
                    (*callback_clone)();
                });
            }
        }
    }
}

4. 使用现成的第三方模块:EventEmitter

4.1 EventEmitter 的安装

你可以直接使用 cargo 包管理器安装 EventEmitter:

cargo add EventEmitter

4.2 在你的项目中使用 EventEmitter

以下是一段包括引入 EventEmitter 和使用的例子:

use std::sync::{Arc};
use EventEmitter::EventEmitter;
fn main() {
    let emitter = EventEmitter::new();
    let callback1 = Arc::new(|| println!("[event1 emitted]: The first callback of event1 has been called."));
    let callback2 = Arc::new(|| println!("[event1 emitted]: The second callback of event1 has been called."));
    let callback3 = Arc::new(|| println!("[event2 emitted]: The only one callbask of event2 has been called."));
    // Add event listener
    emitter.on("event1", callback1);
    emitter.on("event1", callback2);
    emitter.on("event2", callback3);
    let ct1 = emitter.listener_count("event1");
    let ct2 = emitter.listener_count("event2");
    println!("Number of Listeners for event1 is: {ct1}, \nNumber of Listeners for event2 is: {ct2}");
    emitter.emit("event1"); // Emit event1
    emitter.emit("event2"); // Emit event1
}

运行项目:

cargo run

可以看到控制台打印结果:

Number of Listeners for event1 is: 2, 
Number of Listeners for event2 is: 1

4.3 EventEmitter 实例上的方法

4.3.1 set_max_listeners 方法

pub fn set_max_listeners(&mut self, max_listeners: usize)

设置最大监听器数量。

4.3.2 set_max_listeners 方法

pub fn get_max_listeners(&self) -> usize

获取最大监听器数量。

4.3.3 on 方法

pub fn on(&self, event: &str, callback: Arc<dyn Fn() + Send + Sync>)

添加事件监听器。

4.3.4 add_listener 方法

pub fn add_listener(&self, event: &str, callback: Arc<dyn Fn() + Send + Sync>)

添加事件监听器,是 on 方法的别名。

4.3.5 off 方法

pub fn off(&self, event: &str, callback: &Arc<dyn Fn() + Send + Sync>)

移除事件监听器。

4.3.6 remove_listener 方法

pub fn remove_listener(
    &self,
    event: &str,
    callback: &Arc<dyn Fn() + Send + Sync>
)

移除事件监听器,是 off 方法的别名。

4.3.7 emit 方法

pub fn emit(&self, event: &str)

触发事件。

触发相当于“发布-订阅”模式中的“发布”,一但某个事件被触发,该事件对应得所有监听器函数都会被执行。监听器就相当于“订阅者”。

4.3.8 remove_all_listeners 方法

pub fn remove_all_listeners(&self, event: &str)

移除所有事件的所有监听器。

4.3.9 prepend_listener 方法

pub fn prepend_listener(
    &self,
    event: &str,
    callback: Arc<dyn Fn() + Send + Sync>
)

从指定事件监听器向量的前方插入新的监听器。该方法与使用 onadd_listener 方法添加新的监听器时,插入监听器向量的方向相反。

4.3.10 listeners 方法

pub fn listeners(&self, event: &str) -> Vec<Arc<dyn Fn() + Send + Sync>>

获取指定事件的监听器。

4.3.11 listener_count 方法

pub fn listener_count(&self, event: &str) -> usize

获取指定事件的监听器数量。

目录
相关文章
|
1天前
|
Rust 安全 云计算
Rust语言入门:安全性与并发性的完美结合
【10月更文挑战第25天】Rust 是一种系统级编程语言,以其独特的安全性和并发性保障而著称。它提供了与 C 和 C++ 相当的性能,同时确保内存安全,避免了常见的安全问题。Rust 的所有权系统通过编译时检查保证内存安全,其零成本抽象设计使得抽象不会带来额外的性能开销。Rust 还提供了强大的并发编程工具,如线程、消息传递和原子操作,确保了数据竞争的编译时检测。这些特性使 Rust 成为编写高效、安全并发代码的理想选择。
7 0
|
2月前
|
Rust 安全 开发者
30天拿下Rust之模式与模式匹配
30天拿下Rust之模式与模式匹配
48 1
|
18天前
|
Rust 安全 网络安全
在 Rust 语言中,寻找企业上网行为管理软件的突破
在数字化企业环境中,上网行为管理软件对于保障网络安全和提升工作效率至关重要。Rust 语言凭借其安全性、高性能和并发性,为开发此类软件提供了新机遇。本文通过几个 Rust 代码示例,展示了如何实现网址检查、访问频率统计及访问控制等功能,旨在探索 Rust 在企业上网行为管理中的应用潜力。
29 0
|
2月前
|
Rust Linux Go
Rust/Go语言学习
Rust/Go语言学习
|
3月前
|
开发者 vr&ar 机器学习/深度学习
Xamarin 开发者的未来趋势展望:掌握跨平台开发新机遇,引领移动应用创新潮流与技术变革方向
【8月更文挑战第31天】Xamarin 作为领先的跨平台开发框架,通过 C# 和 .NET 框架实现一次编写、多平台运行,简化了 iOS、Android 和 Windows 应用的开发流程。未来几年,Xamarin 开发者将面临跨平台开发普及、云集成、机器学习、AR/VR、性能优化及安全性等关键趋势。通过学习新技术并积极采用新工具,开发者能够提升应用质量和用户体验,如利用 Azure AD B2C 实现身份认证,从而在竞争激烈的市场中脱颖而出。
56 0
|
3月前
|
Rust 安全 开发者
惊爆!Xamarin 携手机器学习,开启智能应用新纪元,个性化体验与跨平台优势完美融合大揭秘!
【8月更文挑战第31天】随着互联网的发展,Web应用对性能和安全性要求不断提高。Rust凭借卓越的性能、内存安全及丰富生态,成为构建高性能Web服务器的理想选择。本文通过一个简单示例,展示如何使用Rust和Actix-web框架搭建基本Web服务器,从创建项目到运行服务器全程指导,帮助读者领略Rust在Web后端开发中的强大能力。通过实践,读者可以体验到Rust在性能和安全性方面的优势,以及其在Web开发领域的巨大潜力。
37 0
|
3月前
|
开发者 API 开发框架
Xamarin 在教育应用开发中的应用:从课程笔记到互动测验,全面解析使用Xamarin.Forms构建多功能教育平台的技术细节与实战示例
【8月更文挑战第31天】Xamarin 作为一款强大的跨平台移动开发框架,在教育应用开发中展现了巨大潜力。它允许开发者使用单一的 C# 代码库构建 iOS、Android 和 Windows 应用,确保不同设备上的一致体验。Xamarin 提供广泛的 API 支持,便于访问摄像头、GPS 等原生功能。本文通过一个简单的教育应用示例——课程笔记和测验功能,展示了 Xamarin 在实际开发中的应用过程。从定义用户界面到实现保存笔记和检查答案的逻辑,Xamarin 展现了其在教育应用开发中的高效性和灵活性。
40 0
|
3月前
|
Rust 开发者 C#
解锁Rust高手的秘密武器:模式匹配与宏,学会这一招,编程效率翻倍!
【8月更文挑战第31天】Xamarin 是移动应用开发领域的强大跨平台工具,采用 C# 语言,具备高代码复用性、熟悉开发语言及接近原生性能等优势。开发者可通过共享项目实现多平台业务逻辑复用,简化开发流程。然而,Xamarin 也存在学习曲线陡峭、需处理平台差异及第三方库兼容性等问题。总体而言,Xamarin 在提高开发效率的同时,也对开发者提出了新的挑战。
24 0
|
3月前
|
Rust 安全 Go
揭秘Rust语言:为何它能让你在编程江湖中,既安全驰骋又高效超车,颠覆你的编程世界观!
【8月更文挑战第31天】Rust 是一门新兴的系统级编程语言,以其卓越的安全性、高性能和强大的并发能力著称。它通过独特的所有权和借用检查机制解决了内存安全问题,使开发者既能享受 C/C++ 的性能,又能避免常见的内存错误。Rust 支持零成本抽象,确保高级抽象不牺牲性能,同时提供模块化和并发编程支持,适用于系统应用、嵌入式设备及网络服务等多种场景。从简单的 “Hello World” 程序到复杂的系统开发,Rust 正逐渐成为现代软件开发的热门选择。
61 1