前言
Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。Asynk任务
采用的是tokio
的特性,Worker工作在tokio
下。Blocking任务
使用的是std::thread
,Worker工作在一个单独的线程。
一、Fang安装
1. 添加依赖
添加Fang到你的Cargo.toml
文件中
注意
Fang仅支持rust 1.62+版本
仅使用Blocking
[dependencies] fang = { version = "0.7" , features = ["blocking"], default-features = false }
仅使用Asynk
[dependencies] fang = { version = "0.7" , features = ["asynk"], default-features = false }
同时使用Blocking和Asynk
fang = { version = "0.7" }
2. 创建数据库
这里需要使用
Diesel CLI
来完成数据库的迁移,将在后面的文章中介绍
在你的Postgres DB中创建fang_tasks
表,然后运行以下脚本
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished'); CREATE TABLE fang_tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), metadata jsonb NOT NULL, error_message TEXT, state fang_task_state default 'new' NOT NULL, task_type VARCHAR default 'common' NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata); CREATE TABLE fang_periodic_tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), metadata jsonb NOT NULL, period_in_seconds INTEGER NOT NULL, scheduled_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() ); CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at); CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
这些文件可以在源码目录
migrations
中找到,github搜Fang,然后进入下载源码。
二、使用
1.定义一个任务
Blocking任务
每个要被Fang执行的任务都必须实现fang::Runnable
特质,特质实现#[typetag::serde]
使之具有反序列化任务的属性。
use fang::Error; use fang::Runnable; use fang::typetag; use fang::PgConnection; use fang::serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] #[serde(crate = "fang::serde")] struct MyTask { pub number: u16, } #[typetag::serde] impl Runnable for MyTask { fn run(&self, _connection: &PgConnection) -> Result<(), Error> { println!("the number is {}", self.number); Ok(()) } }
run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。
Asynk任务
每个要被Fang执行的任务都必须实现fang::AsyncRunnable
特质
注意
不要实现两个同名的AsyncRunnable,这会导致typetag失败
use fang::AsyncRunnable; use fang::asynk::async_queue::AsyncQueueable; use fang::serde::{Deserialize, Serialize}; use fang::async_trait; #[derive(Serialize, Deserialize)] #[serde(crate = "fang::serde")] struct AsyncTask { pub number: u16, } #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) } // this func is optional to impl // Default task-type it is common fn task_type(&self) -> String { "my-task-type".to_string() } }
2.任务队列
Blocking任务
需要使用Queue::enqueue_task
来入队列
use fang::Queue; ... Queue::enqueue_task(&MyTask { number: 10 }).unwrap();
上面的示例在每次调用时都会创建一个新的 postgres 连接
重用相同的 postgres 连接来将多个任务排入队列
let queue = Queue::new(); for id in &unsynced_feed_ids { queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap(); }
或者使用PgConnection结构体
Queue::push_task_query(pg_connection, &new_task).unwrap();
Asynk任务
使用AsyncQueueable::insert_task
来入队,可以根据你自己后端来进行操作,默认为Postgres
use fang::asynk::async_queue::AsyncQueue; use fang::NoTls; use fang::AsyncRunnable; // 创建异步队列 let max_pool_size: u32 = 2; let mut queue = AsyncQueue::builder() // Postgres 数据库 url .uri("postgres://postgres:postgres@localhost/fang") // 允许的最大连接数控i昂 .max_pool_size(max_pool_size) // 如果希望任务中的唯一性,则为false .duplicated_tasks(true) .build(); // 要进行操作之前,总是要先连接 queue.connect(NoTls).await.unwrap();
举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。
let task = AsyncTask { 8 }; let task_returned = queue .insert_task(&task as &dyn AsyncRunnable) .await .unwrap();
3. 启动Worker
Blocking任务
每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
使用WorkerPool
来启动Worker,WorkerPool::new
接收一个整型参数,Worker的数量
use fang::WorkerPool; WorkerPool::new(10).start();
使用shutdown
停止线程
use fang::WorkerPool; worker_pool = WorkerPool::new(10).start().unwrap; worker_pool.shutdown()
Asynk任务
每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
使用AsyncWorkerPool
来启动Worker
use fang::asynk::async_worker_pool::AsyncWorkerPool; // 必须创建一个队列 // 插入一些任务 let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder() .number_of_workers(max_pool_size) .queue(queue.clone()) .build(); pool.start().await;
4. 配置
Blocking任务
在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用WorkerPool.new_with_params
来创建,它接受两个参数——工人数量和WorkerParams结构体。
WorkerParams
的定义是这样的
pub struct WorkerParams { pub retention_mode: Option<RetentionMode>, pub sleep_params: Option<SleepParams>, pub task_type: Option<String>, } pub enum RetentionMode { KeepAll, RemoveAll, RemoveFinished, } pub struct SleepParams { pub sleep_period: u64, pub max_sleep_period: u64, pub min_sleep_period: u64, pub sleep_step: u64, }
Asynk任务
使用AsyncWorkerPool
的builder方法即可。需要链式调用,创建一个AsyncWorkerPool
,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。
5. 配置Worker类型
可以指定Worker类型,来指定指定类型Worker执行指定类型的任务
Blocking任务
在Runnable
特质中添加方法
... #[typetag::serde] impl Runnable for MyTask { fn run(&self) -> Result<(), Error> { println!("the number is {}", self.number); Ok(()) } fn task_type(&self) -> String { "number".to_string() } }
设置task_type
let mut worker_params = WorkerParams::new(); worker_params.set_task_type("number".to_string()); WorkerPool::new_with_params(10, worker_params).start();
没有设置task_type
的Worker可以执行任何任务
Asynk任务
功能与Blocking任务
相同。使用AsyncWorker
的builer来设置
6. 配置保留模式
默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:
pub enum RetentionMode { KeepAll, \\ 不删除任务 RemoveAll, \\ 删除所有任务 RemoveFinished, \\ 默认值,完成就删除 }
Blocking任务
使用set_retention_mode
设置保留模式
let mut worker_params = WorkerParams::new(); worker_params.set_retention_mode(RetentionMode::RemoveAll); WorkerPool::new_with_params(10, worker_params).start();
Asynk任务
使用AsyncWorker
的builder。
7. 配置睡眠值
Blocking任务
使用 useSleepParams
来配置睡眠值:
pub struct SleepParams { pub sleep_period: u64, \\ 默认值 5 pub max_sleep_period: u64, \\ 默认值 15 pub min_sleep_period: u64, \\ 默认值 5 pub sleep_step: u64, \\ 默认值 5 }
如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。
使用set_sleep_params
来设置
let sleep_params = SleepParams { sleep_period: 2, max_sleep_period: 6, min_sleep_period: 2, sleep_step: 1, }; let mut worker_params = WorkerParams::new(); worker_params.set_sleep_params(sleep_params); WorkerPool::new_with_params(10, worker_params).start();
Asynk任务
使用AsyncWorker
的builder。
8. 定时任务
如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建
fang_periodic_tasks
表,就在本文安装那个部分。
Blocking任务
use fang::Scheduler; use fang::Queue; let queue = Queue::new(); queue .push_periodic_task(&SyncMyTask::default(), 120) .unwrap(); queue .push_periodic_task(&DeliverMyTask::default(), 60) .unwrap(); Scheduler::start(10, 5);
在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。
Scheduler::start(10, 5)启动调度程序。它接受两个参数:
- 数据库检查周期(以秒为单位)
- 可接受的错误限制(以秒为单位)
Asynk任务
use fang::asynk::async_scheduler::Scheduler; use fang::asynk::async_queue::AsyncQueueable; use fang::asynk::async_queue::AsyncQueue; // 在此之前构建一个Async队列 let schedule_in_future = Utc::now() + OtherDuration::seconds(5); let _periodic_task = queue.insert_periodic_task( &AsyncTask { number: 1 }, schedule_in_future, 10, ) .await; let check_period: u64 = 1; let error_margin_seconds: u64 = 2; let mut scheduler = Scheduler::builder() .check_period(check_period) .error_margin_seconds(error_margin_seconds) .queue(&mut queue as &mut dyn AsyncQueueable) .build(); // 在其他线程或循环之前添加更多任务 // 调度程序循环 scheduler.start().await.unwrap();
总结
以上就是本文的所有内容,介绍了Rust中借助Fang库
来实现后台任务,进行后台任务的处理,还有定时任务,配置等。