【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang

简介: 【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang



前言

Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。Asynk任务采用的是tokio的特性,Worker工作在tokio下。Blocking任务使用的是std::thread,Worker工作在一个单独的线程。


一、Fang安装

1. 添加依赖

添加Fang到你的Cargo.toml文件中

注意 Fang仅支持rust 1.62+版本

仅使用Blocking

[dependencies]
fang = { version = "0.7" , features = ["blocking"], default-features = false }

仅使用Asynk

[dependencies]
fang = { version = "0.7" , features = ["asynk"], default-features = false }

同时使用Blocking和Asynk

fang = { version = "0.7" }

2. 创建数据库

这里需要使用Diesel CLI来完成数据库的迁移,将在后面的文章中介绍

在你的Postgres DB中创建fang_tasks表,然后运行以下脚本

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');
CREATE TABLE fang_tasks (
     id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
     metadata jsonb NOT NULL,
     error_message TEXT,
     state fang_task_state default 'new' NOT NULL,
     task_type VARCHAR default 'common' NOT NULL,
     created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
     updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);
CREATE TABLE fang_periodic_tasks (
  id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
  metadata jsonb NOT NULL,
  period_in_seconds INTEGER NOT NULL,
  scheduled_at TIMESTAMP WITH TIME ZONE,
  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);
CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);

这些文件可以在源码目录migrations中找到,github搜Fang,然后进入下载源码。

二、使用

1.定义一个任务

Blocking任务

每个要被Fang执行的任务都必须实现fang::Runnable特质,特质实现#[typetag::serde]使之具有反序列化任务的属性。

use fang::Error;
use fang::Runnable;
use fang::typetag;
use fang::PgConnection;
use fang::serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct MyTask {
    pub number: u16,
}
#[typetag::serde]
impl Runnable for MyTask {
    fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
        println!("the number is {}", self.number);
        Ok(())
    }
}

run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。

Asynk任务

每个要被Fang执行的任务都必须实现fang::AsyncRunnable特质

注意 不要实现两个同名的AsyncRunnable,这会导致typetag失败

use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
  pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
    async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
        Ok(())
    }
    // this func is optional to impl
    // Default task-type it is common
    fn task_type(&self) -> String {
        "my-task-type".to_string()
    }
}

2.任务队列

Blocking任务

需要使用Queue::enqueue_task来入队列

use fang::Queue;
...
Queue::enqueue_task(&MyTask { number: 10 }).unwrap();

上面的示例在每次调用时都会创建一个新的 postgres 连接

重用相同的 postgres 连接来将多个任务排入队列

let queue = Queue::new();
for id in &unsynced_feed_ids {
    queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
}

或者使用PgConnection结构体

Queue::push_task_query(pg_connection, &new_task).unwrap();

Asynk任务

使用AsyncQueueable::insert_task来入队,可以根据你自己后端来进行操作,默认为Postgres

use fang::asynk::async_queue::AsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;
// 创建异步队列
let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
    // Postgres 数据库 url
    .uri("postgres://postgres:postgres@localhost/fang")
    // 允许的最大连接数控i昂
    .max_pool_size(max_pool_size)
    // 如果希望任务中的唯一性,则为false
    .duplicated_tasks(true)
    .build();
// 要进行操作之前,总是要先连接
queue.connect(NoTls).await.unwrap();

举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。

let task = AsyncTask { 8 };
let task_returned = queue
  .insert_task(&task as &dyn AsyncRunnable)
  .await
  .unwrap();

3. 启动Worker

Blocking任务

每个Worker都在一个单独的线程中运行。如果panic,会重新启动。

使用WorkerPool来启动Worker,WorkerPool::new接收一个整型参数,Worker的数量

use fang::WorkerPool;
WorkerPool::new(10).start();

使用shutdown停止线程

use fang::WorkerPool;
worker_pool = WorkerPool::new(10).start().unwrap;
worker_pool.shutdown()

Asynk任务

每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。

使用AsyncWorkerPool来启动Worker

use fang::asynk::async_worker_pool::AsyncWorkerPool;
// 必须创建一个队列
// 插入一些任务
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
        .number_of_workers(max_pool_size)
        .queue(queue.clone())
        .build();
pool.start().await;

4. 配置

Blocking任务

在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用WorkerPool.new_with_params来创建,它接受两个参数——工人数量和WorkerParams结构体。

WorkerParams的定义是这样的

pub struct WorkerParams {
    pub retention_mode: Option<RetentionMode>,
    pub sleep_params: Option<SleepParams>,
    pub task_type: Option<String>,
}
pub enum RetentionMode {
    KeepAll,
    RemoveAll,
    RemoveFinished,
}
pub struct SleepParams {
    pub sleep_period: u64,
    pub max_sleep_period: u64,
    pub min_sleep_period: u64,
    pub sleep_step: u64,
}

Asynk任务

使用AsyncWorkerPool的builder方法即可。需要链式调用,创建一个AsyncWorkerPool,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。

5. 配置Worker类型

可以指定Worker类型,来指定指定类型Worker执行指定类型的任务

Blocking任务

Runnable特质中添加方法

...
#[typetag::serde]
impl Runnable for MyTask {
    fn run(&self) -> Result<(), Error> {
        println!("the number is {}", self.number);
        Ok(())
    }
    fn task_type(&self) -> String {
        "number".to_string()
    }
}

设置task_type

let mut worker_params = WorkerParams::new();
worker_params.set_task_type("number".to_string());
WorkerPool::new_with_params(10, worker_params).start();

没有设置task_type的Worker可以执行任何任务

Asynk任务

功能与Blocking任务相同。使用AsyncWorker的builer来设置

6. 配置保留模式

默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:

pub enum RetentionMode {
    KeepAll,        \\ 不删除任务
    RemoveAll,      \\ 删除所有任务
    RemoveFinished, \\ 默认值,完成就删除
}

Blocking任务

使用set_retention_mode设置保留模式

let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);
WorkerPool::new_with_params(10, worker_params).start();

Asynk任务

使用AsyncWorker的builder。

7. 配置睡眠值

Blocking任务

使用 useSleepParams来配置睡眠值:

pub struct SleepParams {
    pub sleep_period: u64,     \\ 默认值 5
    pub max_sleep_period: u64, \\ 默认值 15
    pub min_sleep_period: u64, \\ 默认值 5
    pub sleep_step: u64,       \\ 默认值 5
}

如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。

使用set_sleep_params来设置

let sleep_params = SleepParams {
    sleep_period: 2,
    max_sleep_period: 6,
    min_sleep_period: 2,
    sleep_step: 1,
};
let mut worker_params = WorkerParams::new();
worker_params.set_sleep_params(sleep_params);
WorkerPool::new_with_params(10, worker_params).start();

Asynk任务

使用AsyncWorker的builder。

8. 定时任务

如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建fang_periodic_tasks表,就在本文安装那个部分。

Blocking任务

use fang::Scheduler;
use fang::Queue;
let queue = Queue::new();
queue
     .push_periodic_task(&SyncMyTask::default(), 120)
     .unwrap();
queue
     .push_periodic_task(&DeliverMyTask::default(), 60)
     .unwrap();
Scheduler::start(10, 5);

在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。

Scheduler::start(10, 5)启动调度程序。它接受两个参数:

  • 数据库检查周期(以秒为单位)
  • 可接受的错误限制(以秒为单位)

Asynk任务

use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;
// 在此之前构建一个Async队列
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = queue.insert_periodic_task(
    &AsyncTask { number: 1 },
    schedule_in_future,
    10,
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder()
    .check_period(check_period)
    .error_margin_seconds(error_margin_seconds)
    .queue(&mut queue as &mut dyn AsyncQueueable)
    .build();
// 在其他线程或循环之前添加更多任务
// 调度程序循环
scheduler.start().await.unwrap();

总结

以上就是本文的所有内容,介绍了Rust中借助Fang库来实现后台任务,进行后台任务的处理,还有定时任务,配置等。

目录
相关文章
|
2天前
|
Rust 安全 开发者
Rust之旅:打造并发布你的首个Rust库
本文将引导读者走进Rust的世界,从基础概念讲起,逐步深入到如何创建、测试、打包和发布自己的Rust库。通过实际操作,我们将理解Rust的独特之处,并学会如何分享自己的代码到Rust社区,从而为开源世界做出贡献。
|
2天前
|
存储 JSON Rust
【一起学Rust | 进阶篇 | reqwest库】纯 Rust 编写的 HTTP 客户端——reqwest
【一起学Rust | 进阶篇 | reqwest库】纯 Rust 编写的 HTTP 客户端——reqwest
327 0
|
2天前
|
存储 Rust 自然语言处理
【一起学Rust | 进阶篇 | thesaurus-rs库】Rust 的离线同义词库——thesaurus-rs
【一起学Rust | 进阶篇 | thesaurus-rs库】Rust 的离线同义词库——thesaurus-rs
36 0
|
2天前
|
Rust Unix Linux
【一起学Rust | 进阶篇 | Service Manager库】Rust专用跨平台服务管理库
【一起学Rust | 进阶篇 | Service Manager库】Rust专用跨平台服务管理库
199 0
|
2天前
|
Rust Ubuntu Linux
【一起学Rust | 进阶篇 | RMQTT库】RMQTT消息服务器——安装与集群配置
【一起学Rust | 进阶篇 | RMQTT库】RMQTT消息服务器——安装与集群配置
156 0
|
2天前
|
存储 Rust 安全
Rust标准库概览:集合、IO、时间与更多
本文将带领读者深入了解Rust标准库中的一些核心模块,包括集合类型、输入/输出处理、时间日期功能等。我们将通过实例和解释,探讨这些模块如何使Rust成为高效且安全的系统编程语言。
|
2天前
|
Rust Java Linux
【一起学Rust | 进阶篇 | jni库】JNI实现Java与Rust进行交互
【一起学Rust | 进阶篇 | jni库】JNI实现Java与Rust进行交互
89 0
|
2天前
|
存储 Rust C语言
【一起学Rust | 进阶篇 | Grid库】二维表数据结构——Grid
【一起学Rust | 进阶篇 | Grid库】二维表数据结构——Grid
95 0
|
2天前
|
Rust 数据安全/隐私保护
rust每日一库 rand 生成随机数
rust每日一库 rand 生成随机数
61 0
|
10月前
|
Rust 程序员 索引
Rust 标准库字符串类型String及其46种常用方法
String是一个可变引用,而&str是对该字符串的不可变引用,即可以更改String的数据,但是不能操作&str的数据。String 类型来自标准库,它是可修改、可变长度、可拥有所有权的同样使用UTF-8编码,且它不以空(null)值终止,实际上就是对Vec的包装,在堆内存上分配一个字符串。由&[u8]表示,UTF-8编码的字符串的引用,字符串字面值,也称作字符串切片。
206 1