Swoole来实现实时异步任务队列

简介:

假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。

服务端

在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Server  
  4. {  
  5.     private $serv;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->serv = new swoole_server("0.0.0.0"9501);  
  10.         $this->serv->set(array(  
  11.             'worker_num' => 1//一般设置为服务器CPU数的1-4倍  
  12.             'daemonize' => 1//以守护进程执行  
  13.             'max_request' => 10000,  
  14.             'dispatch_mode' => 2,  
  15.             'task_worker_num' => 8//task进程的数量  
  16.             "task_ipc_mode " => 3//使用消息队列通信,并设置为争抢模式  
  17.             //"log_file" => "log/taskqueueu.log" ,//日志  
  18.         ));  
  19.         $this->serv->on('Receive', array($this'onReceive'));  
  20.         // bind callback  
  21.         $this->serv->on('Task', array($this'onTask'));  
  22.         $this->serv->on('Finish', array($this'onFinish'));  
  23.         $this->serv->start();  
  24.     }  
  25.   
  26.     public function onReceive(swoole_server $serv, $fd, $from_id, $data)  
  27.     {  
  28.         //echo "Get Message From Client {$fd}:{$data}\n";  
  29.         // send a task to task worker.  
  30.         $serv->task($data);  
  31.     }  
  32.   
  33.     public function onTask($serv, $task_id, $from_id, $data)  
  34.     {  
  35.         $array = json_decode($data, true);  
  36.         if ($array['url']) {  
  37.             return $this->httpGet($array['url'], $array['param']);  
  38.         }  
  39.     }  
  40.   
  41.     public function onFinish($serv, $task_id, $data)  
  42.     {  
  43.         //echo "Task {$task_id} finish\n";  
  44.         //echo "Result: {$data}\n";  
  45.     }  
  46.   
  47.     protected function httpGet($url, $data)  
  48.     {  
  49.         if ($data) {  
  50.             $url .= '?' . http_build_query($data);  
  51.         }  
  52.         $curlObj = curl_init(); //初始化curl,  
  53.         curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址  
  54.         curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回  
  55.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);  
  56.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);  
  57.         curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息  
  58.         $response = curl_exec($curlObj); //执行  
  59.         curl_close($curlObj); //关闭会话  
  60.         return $response;  
  61.     }  
  62.   
  63. }  
  64.   
  65. $server = new Server();  

客户端

启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Client  
  4. {  
  5.     private $client;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  10.     }  
  11.   
  12.     public function connect()  
  13.     {  
  14.         if (!$this->client->connect("127.0.0.1"95011)) {  
  15.             throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));  
  16.         }  
  17.     }  
  18.   
  19.     public function send($data)  
  20.     {  
  21.         if ($this->client->isConnected()) {  
  22.             if (!is_string($data)) {  
  23.                 $data = json_encode($data);  
  24.             }  
  25.   
  26.             return $this->client->send($data);  
  27.         } else {  
  28.             throw new Exception('Swoole Server does not connected.');  
  29.         }  
  30.     }  
  31.   
  32.     public function close()  
  33.     {  
  34.         $this->client->close();  
  35.     }  
  36. }  
  37.   
  38. $data = array(  
  39.     "url" => "http://192.168.10.19/send_mail",  
  40.     "param" => array(  
  41.         "username" => 'test',  
  42.         "password" => 'test'  
  43.     )  
  44. );  
  45. $client = new Client();  
  46. $client->connect();  
  47. if ($client->send($data)) {  
  48.     echo 'success';  
  49. else {  
  50.     echo 'fail';  
  51. }  
  52. $client->close();  

在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。

相关文章
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
44 6
|
4月前
|
存储 前端开发 JavaScript
事件循环机制是什么
【8月更文挑战第3天】事件循环机制是什么
39 1
|
5月前
|
存储 监控 NoSQL
Celery是一个基于分布式消息传递的异步任务队列/作业队列
Celery是一个基于分布式消息传递的异步任务队列/作业队列
|
6月前
|
消息中间件 存储 NoSQL
Celery:高效异步任务队列的深度解析与应用实践
Celery 是一个流行的 Python 分布式任务队列,用于处理耗时的异步任务,提升Web应用性能。它包括消息中间件(如RabbitMQ、Redis)、任务生产者和消费者。Celery支持异步处理、分布式执行、任务调度、结果存储和错误处理。通过一个发送邮件验证码的实例,展示了如何安装配置、定义任务、触发任务以及查看执行结果。Celery的使用能有效优化应用响应速度和资源管理。
986 3
|
5月前
|
JavaScript 前端开发 API
js 运行机制(含异步机制、同步任务、异步任务、宏任务、微任务、Event Loop)
js 运行机制(含异步机制、同步任务、异步任务、宏任务、微任务、Event Loop)
49 0
|
7月前
|
消息中间件 监控 JavaScript
消息队列和事件循环
消息队列和事件循环
77 0
3 # 通过回调函数处理异步并发问题
3 # 通过回调函数处理异步并发问题
50 0
|
设计模式 Java 测试技术
异步模式之工作线程
异步模式之工作线程
|
消息中间件 JavaScript 前端开发
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
js的EventLoop事件循环机制调用栈、微任务、消息队列执行顺序优先级
114 0
|
消息中间件 Web App开发 存储
浅谈浏览器架构、单线程js、事件循环、消息队列、宏任务和微任务
关键词:多进程、单线程、事件循环、消息队列、宏任务、微任务
浅谈浏览器架构、单线程js、事件循环、消息队列、宏任务和微任务