Swoole来实现实时异步任务队列

简介:

假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。

服务端

在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Server  
  4. {  
  5.     private $serv;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->serv = new swoole_server("0.0.0.0"9501);  
  10.         $this->serv->set(array(  
  11.             'worker_num' => 1//一般设置为服务器CPU数的1-4倍  
  12.             'daemonize' => 1//以守护进程执行  
  13.             'max_request' => 10000,  
  14.             'dispatch_mode' => 2,  
  15.             'task_worker_num' => 8//task进程的数量  
  16.             "task_ipc_mode " => 3//使用消息队列通信,并设置为争抢模式  
  17.             //"log_file" => "log/taskqueueu.log" ,//日志  
  18.         ));  
  19.         $this->serv->on('Receive', array($this'onReceive'));  
  20.         // bind callback  
  21.         $this->serv->on('Task', array($this'onTask'));  
  22.         $this->serv->on('Finish', array($this'onFinish'));  
  23.         $this->serv->start();  
  24.     }  
  25.   
  26.     public function onReceive(swoole_server $serv, $fd, $from_id, $data)  
  27.     {  
  28.         //echo "Get Message From Client {$fd}:{$data}\n";  
  29.         // send a task to task worker.  
  30.         $serv->task($data);  
  31.     }  
  32.   
  33.     public function onTask($serv, $task_id, $from_id, $data)  
  34.     {  
  35.         $array = json_decode($data, true);  
  36.         if ($array['url']) {  
  37.             return $this->httpGet($array['url'], $array['param']);  
  38.         }  
  39.     }  
  40.   
  41.     public function onFinish($serv, $task_id, $data)  
  42.     {  
  43.         //echo "Task {$task_id} finish\n";  
  44.         //echo "Result: {$data}\n";  
  45.     }  
  46.   
  47.     protected function httpGet($url, $data)  
  48.     {  
  49.         if ($data) {  
  50.             $url .= '?' . http_build_query($data);  
  51.         }  
  52.         $curlObj = curl_init(); //初始化curl,  
  53.         curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址  
  54.         curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回  
  55.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);  
  56.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);  
  57.         curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息  
  58.         $response = curl_exec($curlObj); //执行  
  59.         curl_close($curlObj); //关闭会话  
  60.         return $response;  
  61.     }  
  62.   
  63. }  
  64.   
  65. $server = new Server();  

客户端

启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Client  
  4. {  
  5.     private $client;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  10.     }  
  11.   
  12.     public function connect()  
  13.     {  
  14.         if (!$this->client->connect("127.0.0.1"95011)) {  
  15.             throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));  
  16.         }  
  17.     }  
  18.   
  19.     public function send($data)  
  20.     {  
  21.         if ($this->client->isConnected()) {  
  22.             if (!is_string($data)) {  
  23.                 $data = json_encode($data);  
  24.             }  
  25.   
  26.             return $this->client->send($data);  
  27.         } else {  
  28.             throw new Exception('Swoole Server does not connected.');  
  29.         }  
  30.     }  
  31.   
  32.     public function close()  
  33.     {  
  34.         $this->client->close();  
  35.     }  
  36. }  
  37.   
  38. $data = array(  
  39.     "url" => "http://192.168.10.19/send_mail",  
  40.     "param" => array(  
  41.         "username" => 'test',  
  42.         "password" => 'test'  
  43.     )  
  44. );  
  45. $client = new Client();  
  46. $client->connect();  
  47. if ($client->send($data)) {  
  48.     echo 'success';  
  49. else {  
  50.     echo 'fail';  
  51. }  
  52. $client->close();  

在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。

相关文章
|
6月前
|
消息中间件 存储 负载均衡
拆解一下消息队列、任务队列、任务调度系统
拆解一下消息队列、任务队列、任务调度系统
138 0
|
4月前
|
消息中间件 监控 JavaScript
消息队列和事件循环
消息队列和事件循环
46 0
|
8月前
|
消息中间件 存储 监控
消息队列与任务队列的区别
消息队列和任务队列是我们在软件系统中常常遇到的概念。尽管它们的名字相似,但实际上它们有不同的用途和工作原理。本文将介绍消息队列和任务队列之间的区别。
333 0
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
91 0
|
NoSQL JavaScript 前端开发
Redis+NodeJS实现能处理海量数据的异步任务队列系统
前言 在最近的业务中,接到了一个需要处理约十万条数据的需求。这些数据都以字符串的形式给到,并且处理它们的步骤是异步且耗时的(平均处理一条数据需要 25s 的时间)。如果以串行的方式实现,其耗时是相当长的: 总耗时时间 = 数据量 × 单条数据处理时间 T = N * t (N = 100,000; t = 25s) 总耗时时间 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天 显然,我们不能简单地把数据一条一条地处理。那么有没有办法能够减少处理的时间呢?经过调研后发现,使用异步任务队列是个不错的办法。
423 0
Redis+NodeJS实现能处理海量数据的异步任务队列系统
|
消息中间件 监控 NoSQL
Hyperf结合Redis异步队列任务async-queue实现后台操作日志写入
Hyperf结合Redis异步队列任务async-queue实现后台操作日志写入
378 0
Hyperf结合Redis异步队列任务async-queue实现后台操作日志写入
|
消息中间件 Kafka 开发者
带回调函数的生产者 | 学习笔记
快速学习带回调函数的生产者
109 0
带回调函数的生产者 | 学习笔记
|
消息中间件 存储 负载均衡
解决方案之任务队列
在一些系统中,会有对某些任务状态进行跟踪,如果任务失败需要重新执行任务。本文主要是针对这种请求提出解决方案,因为时间原因,方案还没有在代码中实现。但是经过和 朋友 的推演,是目前能想到的比较有效的方案了。
133 0
解决方案之任务队列