Swoole来实现实时异步任务队列

简介:

假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步。把“发邮件”这个操作封装,然后后台异步地执行1万遍。这样的话,用户提交网页后,他所等待的时间只是“把发邮件任务请求推送进队列里”的时间。而我们的后台服务将在用户看不见的地方跑。
在实现“异步队列”这点上,有人采用MySQL表或者redis来存放待发送的邮件,然后,每分钟定时读取待发送列表,然后处理。这便是定时异步任务队列。但当前提交的任务要一分钟后才能执行,在某些实时性要求应用场景里还是不快。有些场景要求,只有一提交任务,便马上执行,但用户不需要等待返回结果。
本文将探讨用php扩展swoole实现实时异步任务队列的方案。

服务端

在打算放置脚本的目录(你也可以自行新建)新建Server.php,代码如下

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Server  
  4. {  
  5.     private $serv;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->serv = new swoole_server("0.0.0.0"9501);  
  10.         $this->serv->set(array(  
  11.             'worker_num' => 1//一般设置为服务器CPU数的1-4倍  
  12.             'daemonize' => 1//以守护进程执行  
  13.             'max_request' => 10000,  
  14.             'dispatch_mode' => 2,  
  15.             'task_worker_num' => 8//task进程的数量  
  16.             "task_ipc_mode " => 3//使用消息队列通信,并设置为争抢模式  
  17.             //"log_file" => "log/taskqueueu.log" ,//日志  
  18.         ));  
  19.         $this->serv->on('Receive', array($this'onReceive'));  
  20.         // bind callback  
  21.         $this->serv->on('Task', array($this'onTask'));  
  22.         $this->serv->on('Finish', array($this'onFinish'));  
  23.         $this->serv->start();  
  24.     }  
  25.   
  26.     public function onReceive(swoole_server $serv, $fd, $from_id, $data)  
  27.     {  
  28.         //echo "Get Message From Client {$fd}:{$data}\n";  
  29.         // send a task to task worker.  
  30.         $serv->task($data);  
  31.     }  
  32.   
  33.     public function onTask($serv, $task_id, $from_id, $data)  
  34.     {  
  35.         $array = json_decode($data, true);  
  36.         if ($array['url']) {  
  37.             return $this->httpGet($array['url'], $array['param']);  
  38.         }  
  39.     }  
  40.   
  41.     public function onFinish($serv, $task_id, $data)  
  42.     {  
  43.         //echo "Task {$task_id} finish\n";  
  44.         //echo "Result: {$data}\n";  
  45.     }  
  46.   
  47.     protected function httpGet($url, $data)  
  48.     {  
  49.         if ($data) {  
  50.             $url .= '?' . http_build_query($data);  
  51.         }  
  52.         $curlObj = curl_init(); //初始化curl,  
  53.         curl_setopt($curlObj, CURLOPT_URL, $url); //设置网址  
  54.         curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); //将curl_exec的结果返回  
  55.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);  
  56.         curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);  
  57.         curl_setopt($curlObj, CURLOPT_HEADER, 0); //是否输出返回头信息  
  58.         $response = curl_exec($curlObj); //执行  
  59.         curl_close($curlObj); //关闭会话  
  60.         return $response;  
  61.     }  
  62.   
  63. }  
  64.   
  65. $server = new Server();  

客户端

启动服务后,让我们看看如何调用服务。新建测试文件Client_test.php

Java代码   收藏代码
  1. <?php  
  2.   
  3. class Client  
  4. {  
  5.     private $client;  
  6.   
  7.     public function __construct()  
  8.     {  
  9.         $this->client = new swoole_client(SWOOLE_SOCK_TCP);  
  10.     }  
  11.   
  12.     public function connect()  
  13.     {  
  14.         if (!$this->client->connect("127.0.0.1"95011)) {  
  15.             throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));  
  16.         }  
  17.     }  
  18.   
  19.     public function send($data)  
  20.     {  
  21.         if ($this->client->isConnected()) {  
  22.             if (!is_string($data)) {  
  23.                 $data = json_encode($data);  
  24.             }  
  25.   
  26.             return $this->client->send($data);  
  27.         } else {  
  28.             throw new Exception('Swoole Server does not connected.');  
  29.         }  
  30.     }  
  31.   
  32.     public function close()  
  33.     {  
  34.         $this->client->close();  
  35.     }  
  36. }  
  37.   
  38. $data = array(  
  39.     "url" => "http://192.168.10.19/send_mail",  
  40.     "param" => array(  
  41.         "username" => 'test',  
  42.         "password" => 'test'  
  43.     )  
  44. );  
  45. $client = new Client();  
  46. $client->connect();  
  47. if ($client->send($data)) {  
  48.     echo 'success';  
  49. else {  
  50.     echo 'fail';  
  51. }  
  52. $client->close();  

在上面代码中,url即为任务所在地址,param为所需传递参数。
保存好代码,在命令行或者浏览器中执行Client_test.php,便实现了异步任务队列。你所填写的URL,将会在每次异步任务被提交后,以HTTP GET的方式异步执行。

相关文章
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
36 1
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
40 1
|
4月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
5月前
|
存储 监控 NoSQL
Celery是一个基于分布式消息传递的异步任务队列/作业队列
Celery是一个基于分布式消息传递的异步任务队列/作业队列
|
6月前
|
消息中间件 存储 NoSQL
Celery:高效异步任务队列的深度解析与应用实践
Celery 是一个流行的 Python 分布式任务队列,用于处理耗时的异步任务,提升Web应用性能。它包括消息中间件(如RabbitMQ、Redis)、任务生产者和消费者。Celery支持异步处理、分布式执行、任务调度、结果存储和错误处理。通过一个发送邮件验证码的实例,展示了如何安装配置、定义任务、触发任务以及查看执行结果。Celery的使用能有效优化应用响应速度和资源管理。
1015 3
|
7月前
|
缓存 Java Spring
@EventPublisher + @Async 异步事件流详解
本文主要介绍Spring事件流和`@Async`异步线程池处理,以及`@Async`默认线程池可能会导致的问题及解决方法。 在@Async注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险
3 # 通过回调函数处理异步并发问题
3 # 通过回调函数处理异步并发问题
50 0
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
安全 Java 容器
多线程案例(2)-阻塞式队列
多线程案例(2)-阻塞式队列
73 0