Swoole来实现实时异步任务队列

简介:

假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。

服务端

在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Server  
  4. {  
  5.     private $serv;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->serv = new swoole_server("0.0.0.0"9501);  
  10.         $this->serv->set(array(  
  11.             'worker_num' => 1//一般设置为服务器CPU数的1-4倍  
  12.             'daemonize' => 1//以守护进程执行  
  13.             'max_request' => 10000,  
  14.             'dispatch_mode' => 2,  
  15.             'task_worker_num' => 8//task进程的数量  
  16.             "task_ipc_mode " => 3//使用消息队列通信,并设置为争抢模式  
  17.             //"log_file" => "log/taskqueueu.log" ,//日志  
  18.         ));  
  19.         $this->serv->on('Receive', array($this'onReceive'));  
  20.         // bind callback  
  21.         $this->serv->on('Task', array($this'onTask'));  
  22.         $this->serv->on('Finish', array($this'onFinish'));  
  23.         $this->serv->start();  
  24.     }  
  25.   
  26.     public function onReceive(swoole_server $serv, $fd, $from_id, $data)  
  27.     {  
  28.         //echo "Get Message From Client {$fd}:{$data}\n";  
  29.         // send a task to task worker.  
  30.         $serv->task($data);  
  31.     }  
  32.   
  33.     public function onTask($serv, $task_id, $from_id, $data)  
  34.     {  
  35.         $array = json_decode($data, true);  
  36.         if ($array['url']) {  
  37.             return $this->httpGet($array['url'], $array['param']);  
  38.         }  
  39.     }  
  40.   
  41.     public function onFinish($serv, $task_id, $data)  
  42.     {  
  43.         //echo "Task {$task_id} finish\n";  
  44.         //echo "Result: {$data}\n";  
  45.     }  
  46.   
  47.     protected function httpGet($url, $data)  
  48.     {  
  49.         if ($data) {  
  50.             $url .= '?' . http_build_query($data);  
  51.         }  
  52.         $curlObj = curl_init(); //初始化curl,  
  53.         curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址  
  54.         curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回  
  55.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);  
  56.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);  
  57.         curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息  
  58.         $response = curl_exec($curlObj); //执行  
  59.         curl_close($curlObj); //关闭会话  
  60.         return $response;  
  61.     }  
  62.   
  63. }  
  64.   
  65. $server = new Server();  

客户端

启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Client  
  4. {  
  5.     private $client;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  10.     }  
  11.   
  12.     public function connect()  
  13.     {  
  14.         if (!$this->client->connect("127.0.0.1"95011)) {  
  15.             throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));  
  16.         }  
  17.     }  
  18.   
  19.     public function send($data)  
  20.     {  
  21.         if ($this->client->isConnected()) {  
  22.             if (!is_string($data)) {  
  23.                 $data = json_encode($data);  
  24.             }  
  25.   
  26.             return $this->client->send($data);  
  27.         } else {  
  28.             throw new Exception('Swoole Server does not connected.');  
  29.         }  
  30.     }  
  31.   
  32.     public function close()  
  33.     {  
  34.         $this->client->close();  
  35.     }  
  36. }  
  37.   
  38. $data = array(  
  39.     "url" => "http://192.168.10.19/send_mail",  
  40.     "param" => array(  
  41.         "username" => 'test',  
  42.         "password" => 'test'  
  43.     )  
  44. );  
  45. $client = new Client();  
  46. $client->connect();  
  47. if ($client->send($data)) {  
  48.     echo 'success';  
  49. else {  
  50.     echo 'fail';  
  51. }  
  52. $client->close();  

在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。

相关文章
|
编译器 程序员 C++
2023-4-6-C++11、C++14、C++17、C++20版本新特性系统全面的学习!(二)
2023-4-6-C++11、C++14、C++17、C++20版本新特性系统全面的学习!
670 0
2023-4-6-C++11、C++14、C++17、C++20版本新特性系统全面的学习!(二)
|
存储 监控 NoSQL
快速认识OTS
## 什么是OTS   OTS 是Open Table Service的简称,现在已更名为表格存储Table Store,官网对它的解释为:OTS是构建在阿里云飞天分布式系统之上的 NoSQL 数据库服务,提供海量结构化数据的存储和实时访问。OTS 以实例和表的形式组织数据,通过数据分片和负载均衡技术,达到规模的无缝扩展。OTS 向应用程序屏蔽底层硬件平台的故障和错误,能自动从各类错误中快速
47154 2
|
8月前
|
JSON 人工智能 前端开发
前端开发中使用whistle代理工具
Whistle是一款强大的代理工具,相比Charles、Fiddler更轻量且功能丰富。它适用于前端开发中的多种场景,如接口数据Mock、接口代理、静态资源代理等。通过简单的规则配置,可将接口指向本地JSON文件,解决跨域问题,或代理静态资源以满足特定域名访问需求。此外,Whistle还支持本地端口间转发与移动端请求抓包,搭配SwitchyOmega插件使用效果更佳。需注意,使用前请确保已安装Node环境并参考官方文档完成基础配置。
|
9月前
|
机器学习/深度学习 人工智能 搜索推荐
《探秘AI驱动的个性化推荐系统:精准触达用户的科技密码》
在这个信息爆炸的时代,AI驱动的个性化推荐系统应运而生,通过数据收集与处理、构建用户画像、核心算法(协同过滤与基于内容的推荐)及深度学习技术,精准洞察用户需求。它广泛应用于电商、视频平台等领域,提升用户体验和商业效益。尽管面临数据稀疏性、隐私保护等挑战,未来将更加精准、实时并注重用户隐私。
577 1
《探秘AI驱动的个性化推荐系统:精准触达用户的科技密码》
|
算法
路径规划算法 - 求解最短路径 - Dijkstra(迪杰斯特拉)算法
路径规划算法 - 求解最短路径 - Dijkstra(迪杰斯特拉)算法
710 0
|
存储 前端开发
canvas自定义绘制顺序解决遮挡问题
canvas自定义绘制顺序解决遮挡问题
502 0
|
安全 编译器 C语言
一文讲清楚内联函数 inline
在C语言中,如果一些函数被频繁调用,不断地有函数入栈,即函数栈,会造成栈空间或栈内存的大量消耗。 为了解决这个问题,特别的引入了inline修饰符,表示为内联函数。
|
缓存 JavaScript 测试技术
Vue 3实战:打造交互丰富的任务管理应用
Vue 3实战:打造交互丰富的任务管理应用
327 0
|
编解码
一文带你了解 嵌入式Typec 接口切换开关
一文带你了解 嵌入式Typec 接口切换开关
381 0
|
Python
Python中绘制K线图
要在Python中绘制K线图,可以使用matplotlib和mplfinance库。mplfinance库是一个用于绘制金融数据的强大工具,可以方便地绘制K线图。
330 2