Swoole来实现实时异步任务队列-阿里云开发者社区

开发者社区> 航空母舰> 正文

Swoole来实现实时异步任务队列

简介:
+关注继续查看

假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。

服务端

在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下

Java代码  收藏代码
  1. <?php  
  2.   
  3. class Server  
  4. {  
  5.     private $serv;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->serv = new swoole_server("0.0.0.0"9501);  
  10.         $this->serv->set(array(  
  11.             'worker_num' => 1//一般设置为服务器CPU数的1-4倍  
  12.             'daemonize' => 1//以守护进程执行  
  13.             'max_request' => 10000,  
  14.             'dispatch_mode' => 2,  
  15.             'task_worker_num' => 8//task进程的数量  
  16.             "task_ipc_mode " => 3//使用消息队列通信,并设置为争抢模式  
  17.             //"log_file" => "log/taskqueueu.log" ,//日志  
  18.         ));  
  19.         $this->serv->on('Receive', array($this'onReceive'));  
  20.         // bind callback  
  21.         $this->serv->on('Task', array($this'onTask'));  
  22.         $this->serv->on('Finish', array($this'onFinish'));  
  23.         $this->serv->start();  
  24.     }  
  25.   
  26.     public function onReceive(swoole_server $serv, $fd, $from_id, $data)  
  27.     {  
  28.         //echo "Get Message From Client {$fd}:{$data}\n";  
  29.         // send a task to task worker.  
  30.         $serv->task($data);  
  31.     }  
  32.   
  33.     public function onTask($serv, $task_id, $from_id, $data)  
  34.     {  
  35.         $array = json_decode($data, true);  
  36.         if ($array['url']) {  
  37.             return $this->httpGet($array['url'], $array['param']);  
  38.         }  
  39.     }  
  40.   
  41.     public function onFinish($serv, $task_id, $data)  
  42.     {  
  43.         //echo "Task {$task_id} finish\n";  
  44.         //echo "Result: {$data}\n";  
  45.     }  
  46.   
  47.     protected function httpGet($url, $data)  
  48.     {  
  49.         if ($data) {  
  50.             $url .= '?' . http_build_query($data);  
  51.         }  
  52.         $curlObj = curl_init(); //初始化curl,  
  53.         curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址  
  54.         curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回  
  55.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);  
  56.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);  
  57.         curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息  
  58.         $response = curl_exec($curlObj); //执行  
  59.         curl_close($curlObj); //关闭会话  
  60.         return $response;  
  61.     }  
  62.   
  63. }  
  64.   
  65. $server = new Server();  

客户端

启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php

Java代码  收藏代码
  1. <?php  
  2.   
  3. class Client  
  4. {  
  5.     private $client;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  10.     }  
  11.   
  12.     public function connect()  
  13.     {  
  14.         if (!$this->client->connect("127.0.0.1"95011)) {  
  15.             throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));  
  16.         }  
  17.     }  
  18.   
  19.     public function send($data)  
  20.     {  
  21.         if ($this->client->isConnected()) {  
  22.             if (!is_string($data)) {  
  23.                 $data = json_encode($data);  
  24.             }  
  25.   
  26.             return $this->client->send($data);  
  27.         } else {  
  28.             throw new Exception('Swoole Server does not connected.');  
  29.         }  
  30.     }  
  31.   
  32.     public function close()  
  33.     {  
  34.         $this->client->close();  
  35.     }  
  36. }  
  37.   
  38. $data = array(  
  39.     "url" => "http://192.168.10.19/send_mail",  
  40.     "param" => array(  
  41.         "username" => 'test',  
  42.         "password" => 'test'  
  43.     )  
  44. );  
  45. $client = new Client();  
  46. $client->connect();  
  47. if ($client->send($data)) {  
  48.     echo 'success';  
  49. else {  
  50.     echo 'fail';  
  51. }  
  52. $client->close();  

在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
精通SpringBoot——第十篇:使用Quartz实现动态配置定时任务
spring boot 整合quartz实现数据库动态配置定时任务
17268 0
有赞实时任务优化:Flink Checkpoint 异常解析与应用实践
本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint 原理流程以及常见原因分析,让用户能够更好的理解 Flink Checkpoint,从而开发出更健壮的实时任务。
1282 0
【换脸AI升级版】面部表情、身体动作、视线方向都能实时迁移
“变脸”技术已经不新奇,来自德国慕尼黑工业大学、斯坦福大学等的一组研究人员最近开发了一个叫“HeadOn”的AI,它可以“变人”——根据输入人物的动作,实时地改变视频中人物的面部表情、眼球运动和身体动作,使得图像中的人看起来像是真的在说话和移动一样。
1424 0
TableStore: 海量结构化数据实时备份实战
# TableStore: 海量结构化数据实时备份实战 ## 数据备份简介 在信息技术与数据管理领域,备份是指将文件系统或数据库系统中的数据加以复制,一旦发生灾难或者错误操作时,得以方便而及时地恢复系统的有效数据和正常运作。
16965 0
有赞实时任务优化:Flink Checkpoint 异常解析与应用实践
本文结合 Flink 1.9 版本,重点讲述 Flink Checkpoint 原理流程以及常见原因分析,让用户能够更好的理解 Flink Checkpoint,从而开发出更健壮的实时任务。
778 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
4518 0
深入 Java Timer 定时任务调度器实现原理
使用 Java 来调度定时任务时,我们经常会使用 Timer 类搞定。Timer 简单易用,其源码阅读起来也非常清晰,本节我们来仔细分析一下 Timer 类,来看看 JDK 源码的编写者是如何实现一个稳定可靠的简单调度器。
1130 0
springboot整合Quartz实现动态配置定时任务
在我们日常的开发中,很多时候,定时任务都不是写死的,而是写到数据库中,从而实现定时任务的动态配置,下面就通过一个简单的示例,来实现这个功能。
2596 0
Spring整合Quartz实现动态定时任务
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hans201507/article/details/50593834 1.
723 0
+关注
514
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载