Rust网络编程框架-深入理解Tokio中的管道

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。


我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。

客户端代码DEMO

上文中依靠telnet来触发服务端代码的执行,本文我们将自己实现一个客户端。由于笔者也没有从之前比如GO、JAVA等语言的套路中完全走出来,我最初的实现是这样的

#[tokio::main]async fn main() {
      let mut client = client::connect("127.0.0.1:6379").await.unwrap();
    // 生成一个读取key的任务
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });
    // 生成一个设置key的任务
    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });
    t1.await.unwrap();
    t2.await.unwrap();
}

image.gif

但是以上代码根本就无法编译,因为tokio任务T1和T2都需要使用client,但是client并没有像上文中Arc::<Mutex::<HashMap>>一样实现copy方法,你还不能clone一个client分别给t1和t2使用,当然我们可以使用Mutex来解决任务之间的矛盾问题但正如我们上文所说互斥锁的最大问题就是在同一时刻只能有一个任务执行到被加锁的关键代码,这样做法的效率又是问题。

使用消息传递的方案

使用channel管道进行消息传递其实就是我们在并发编程框架中常用的生产者消费者模式这个设计模式在本例当中其实就是生成两个任务,一个专门用来产生消息,另一个专门用来向服务端发送消息,channel管道其实就是一个消息的缓冲区在发送任务繁忙时,产生的消息其实都在消息队列中缓冲,一旦有发送任务缓过劲来,就可以从管道里取新消息进行发送,与Mutex的互斥锁方案相比,channel管理的方式明显可以做得更大的性能与吞吐量。

在Tokio中提供以下四种管道的工作模式
Mpsc:Multi-Producer,Single-Consumer,也就是多生产者,单一消费者模式。

Oneshot:单一模式,也就是单一生产者,单一消费模式。

广播broadcast)模式:Multi-producer multi-consumer。多生产者,消费者的多对多模式。

观察(watch)模式:Single-Producer,multi-consumer。单生产者,多消费者的模式,这个模式与其它模式略有不同,每个接收者都只能看到最近的值。

这里笔者要特别提示大家,注意Tokio当中的channel管道与Rust原生channel和crossbeam提供的Channel不是同一个概念,Tokio中对于消费者来说,调用recv API返回的还是一个Future对象,recv接收消息操作并不会阻塞进程,这也是Tokio设计的一贯风格。以MPSC为例,使用样例如下:

use tokio::sync::mpsc;
#[tokio::main]async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();//clone之后可以将channel指派给不同任务
    tokio::spawn(async move {
        tx.send("sending from first handle").await;//必须调用await才会阻塞
    });
    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });
    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

image.gif

使用管道方式完整的客户端代码及注释如下:

use tokio::sync::mpsc;
use mini_redis::client;
use mini_redis::Command::*;
use bytes::Bytes;
//先定义redis的命令类型
#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}
#[tokio::main]
async fn main() {
//首先建立MPSC模式的通道
    let (tx, mut rx) = mpsc::channel(32);
//消费者允许多个,可以克隆
let tx2 = tx.clone();
//t1任务执行get操作
    let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };
    tx.send(cmd).await.unwrap();
   });
//t2任务执行set操作
    let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };
    tx2.send(cmd).await.unwrap();
});
//manager任务是消费者,接收消息,并向服务端发送信息。
    let manager = tokio::spawn(async move {
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();
    while let Some(cmd) = rx.recv().await {
       use Command::*;
        match cmd {
            Get { key } => {
                client.get(&key).await;
                println!("get command send");
            }
            Set { key, val } => {
                client.set(&key, val).await;
                 println!("set command send");
            }
        }
    }
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}

image.gif

客户端执行结果如下:

get command send
set command send

image.gif

注意:客户端需要在服务端启动的情况下才能运行,完整的服务端代码如下:

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
use bytes::Bytes;
type Db =Arc<std::sync::Mutex<HashMap<String, Bytes>>>;
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    println!("Listening");
    let db = Arc::new(Mutex::new(HashMap::new()));
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();
        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}
async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};
    let mut connection = Connection::new(socket);
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                println!("set command got");
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                println!("get command got");
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

image.gif

读写分离

Tokio中对于I/O的读写操作方式与标准Rust的API基本相同,只是Tokio的读写都是异步的在使用Tokio的读(AsyncRead)和写(AsyncWrite)等API,必须与.await一起使用才能阻塞比如下列代码是肯定不能编译通过的。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("beyondma.txt");
    let mut buffer = [0; 10];
    // read up to 10 bytes
    let n = f.read(&mut buffer[..]);
    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}
上述代码需要进行修改才能运行,如下:
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("beyondma.txt").await?;
    let mut buffer = [0; 10];
    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;
    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

image.gif

另外注意:当read()返回Ok(0)时,表示Stream已经关闭,对于TcpStream实例,Ok(0)代表socket已关闭如果代码运行在一个循环当中,此时应该退出循环。

在上一节的示例代码中,对于socket的读写都是由一个任务完成的,为了通过读写分离,来达到更高效率的,我们必须将TcpStream拆分为读和写两个handle对于tokio的框架来看,读写分享使用io::split来实现例程如下:

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = vec![0; 1024];
            loop {
                match socket.read(&mut buf).await {
                    //记住ok(0)需要直接返回
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => {
                        return;
                    }
                }
            }
        });
    }
}

image.gif

这是一个典型的回显输入的Echo Server,另外启动一个终端执行telnett 结果如下:

telnet 127.0.0.1 6380
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
skldjsfl
skldjsfl
sksdkj
sksdkj

image.gif


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
监控 安全
从 Racket 语言出发,创新员工网络监控软件的框架
在数字化企业环境中,员工网络监控软件对于保障信息安全和提升效率至关重要。Racket 语言凭借其独特特性和强大功能,为开发创新的监控软件提供了新可能。通过捕获和分析网络数据包、记录员工网络活动日志,甚至构建复杂的监控框架,Racket 能够满足企业的定制化需求,为企业信息安全和管理提供强有力支持。未来,基于 Racket 的创新解决方案将不断涌现。
50 6
|
2月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
123 6
|
3月前
|
机器学习/深度学习 人工智能
类人神经网络再进一步!DeepMind最新50页论文提出AligNet框架:用层次化视觉概念对齐人类
【10月更文挑战第18天】这篇论文提出了一种名为AligNet的框架,旨在通过将人类知识注入神经网络来解决其与人类认知的不匹配问题。AligNet通过训练教师模型模仿人类判断,并将人类化的结构和知识转移至预训练的视觉模型中,从而提高模型在多种任务上的泛化能力和稳健性。实验结果表明,人类对齐的模型在相似性任务和出分布情况下表现更佳。
80 3
|
3月前
|
安全 网络安全 区块链
网络安全与信息安全:构建数字世界的防线在当今数字化时代,网络安全已成为维护个人隐私、企业机密和国家安全的重要屏障。随着网络攻击手段的不断升级,从社交工程到先进的持续性威胁(APT),我们必须采取更加严密的防护措施。本文将深入探讨网络安全漏洞的形成原因、加密技术的应用以及提高公众安全意识的重要性,旨在为读者提供一个全面的网络安全知识框架。
在这个数字信息日益膨胀的时代,网络安全问题成为了每一个网民不可忽视的重大议题。从个人信息泄露到企业数据被盗,再到国家安全受到威胁,网络安全漏洞如同隐藏在暗处的“黑洞”,时刻准备吞噬掉我们的信息安全。而加密技术作为守护网络安全的重要工具之一,其重要性不言而喻。同时,提高公众的安全意识,也是防范网络风险的关键所在。本文将从网络安全漏洞的定义及成因出发,解析当前主流的加密技术,并强调提升安全意识的必要性,为读者提供一份详尽的网络安全指南。
|
21天前
|
机器学习/深度学习 算法 PyTorch
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
本文探讨了图神经网络(GNN)与大型语言模型(LLM)结合在知识图谱问答中的应用。研究首先基于G-Retriever构建了探索性模型,然后深入分析了GNN-RAG架构,通过敏感性研究和架构改进,显著提升了模型的推理能力和答案质量。实验结果表明,改进后的模型在多个评估指标上取得了显著提升,特别是在精确率和召回率方面。最后,文章提出了反思机制和教师网络的概念,进一步增强了模型的推理能力。
50 4
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
|
4月前
|
Rust 网络协议 Java
30天拿下Rust之网络编程
30天拿下Rust之网络编程
83 0
|
2月前
|
人工智能 自然语言处理
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
WebDreamer是一个基于大型语言模型(LLMs)的网络智能体框架,通过模拟网页交互来增强网络规划能力。它利用GPT-4o作为世界模型,预测用户行为及其结果,优化决策过程,提高性能和安全性。WebDreamer的核心在于“做梦”概念,即在实际采取行动前,用LLM预测每个可能步骤的结果,并选择最有可能实现目标的行动。
66 1
WebDreamer:基于大语言模型模拟网页交互增强网络规划能力的框架
|
2月前
|
JSON 数据处理 Swift
Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用
本文深入探讨了 Swift 中的网络编程,主要介绍了 URLSession 和 Alamofire 两大框架的特点、用法及实际应用。URLSession 由苹果提供,支持底层网络控制;Alamofire 则是在 URLSession 基础上增加了更简洁的接口和功能扩展。文章通过具体案例对比了两者的使用方法,帮助开发者根据需求选择合适的网络编程工具。
38 3
|
2月前
|
存储 安全 网络安全
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
60 1
|
2月前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
101 4

热门文章

最新文章