MemQ 实现异步任务

简介: 这几天在做推送相关的任务的时候发现了一段神奇的代码。$pushmsg = new NormalPushMsg($userid, $content, $clickurl,"");PushService::getInstance()->sendPushToMemq($pushmsg);一开始的时候我还纳闷,为什么不直接发呢,走这么大一圈子弯路到底是为了啥,后来想了想,发送push动辄几十上百万的用户,会是一个很耗时的操作。

"MemQ异步任务"

这几天在做推送相关的任务的时候发现了一段神奇的代码。

$pushmsg = new NormalPushMsg($userid, $content, $clickurl,"");
PushService::getInstance()->sendPushToMemq($pushmsg);

一开始的时候我还纳闷,为什么不直接发呢,走这么大一圈子弯路到底是为了啥,后来想了想,发送push动辄几十上百万的用户,会是一个很耗时的操作。短时间会对服务器造成不小的压力。而通过memq这样一种曲线救国的套路,倒也还是一个不错的方法。

下面就自己动手,也来实现一下这样的功能。

准备

总用量 16
-rw-r--r-- 1 root root  82 12月 19 14:15 config.ini
-rw-r--r-- 1 root root 339 12月 19 14:36 publish_tasks.php
-rw-r--r-- 1 root root 387 12月 19 14:38 runjobs_background.php
-rw-r--r-- 1 root root 176 12月 19 14:13 worker.php

config.ini

内容格式大概可以是下面的形式。

queuename = default
queueip = 192.168.1.1
queueport = 22201
classname = Worker

下面来讲讲每个参数对应的含义。
- queuename 是我们要进行存放的队列的名称
- queueip 即会在那台主机上执行对应的任务
- queueport 对应主机上memcache的端口
- classname 通过PHP的哪个类来执行对应的“异步”方法。

确切的来讲,对公司而言queueip和queueport是必须的,因为你会管理很多的主机,如果不进行限制,异步任务变多的时候,那就是一个灾难。但是对于本次试验而言,就没什么必要了。

worker.php

这个文件就是我们会具体调用的类文件,代码可以根据具体的业务需求而定。我这里只是演示一下,就随便写了。

<?php
class Worker {

    public function __construct() {
    }
    public function saveToLocal($msg="default data") {
        LiveLog("./worker.log", $msg);
    }
}

这个类的名称和上面的config.ini文件的classname保持一致就好了。否则会出现load错误的问题。

publish_tasks.php

按照一开始描述的,这个文件起到一个发布任务的作用。好比开启了一个给XXX用户群发送push召回的任务。这里还是简单的写一下。

<?php
header("Content-Type:text/html;charset=UTF-8");
require "/home/wwwroot/api.newtv.com/common/common.inc.php";

$memq = new useLiveMemQueue();
$key = "my:memq:test";
$msg = "哈哈哈哈哈啊哈哈哈,尴尬不?";
$result = $memq->cache($key, "saveToLocal('$msg')");
var_dump($result);

这里面有一些公司封装好的API,比如useLiveMemQueue类以及内部对应的cache方法。涉及到公司隐私性,就不再粘贴了。但是这都是对memcache的简单的封装,你自己花一点点时间也能写得出来的。

runjobs_background.php

任务已经被发布到了memq相应的队列中了,接下来就是要去消费它,否则很有可能导致服务器内存吃紧,那你的手机报警就哐哐的响了吧。

<?php
header("Content-Type:text/html;charset=UTF-8");
require "/home/wwwroot/api.newtv.com/common/common.inc.php";
require __DIR__."/worker.php";

$executor = null;
$memq = new useLiveMemQueue();
$method = trim($memq->get("my:memq:test"));
$classname = "Worker";
$executor = new $classname();

$execstr = "\$executor->$method".";";

$result = eval($execstr);
var_dump($result);

具体的原理,看完代码应该就明白了。其中最关键的就是PHP这门动态语言的优势。否则要使用Java这种编译性语言的话,还需要一套相应的反射机制了。

定时任务

一般来说,我们会写一个crontab脚本来定期的去“消费”memq中的“异步”任务,比如针对上面的实验,我们就可以写这样的一个crontab。

*/10 * * * * cd /home/wwwroot/workspace/mars/background/ && php runjobs_background.php 2>&1

每10分钟执行一次,相应的队列就会被消费掉了。但是这个10分钟只是一个频率。因为我们要进行消费的任务量会很大,10分钟内根本跑不完,在第二个10分钟的时候cron会再次开启一个进程,来进行消费。以此类推,就有可能会出现多个进程来消费相同的任务。

这个只要在不影响服务器性能的情况下,是没什么影响的,了解这么个情况就可以了。

验证结果

简单验证

好了,crontab部署后的10分钟已经到了,下面就来看看有没有生成
底层API
对应的worker.log文件吧。
成功生成了日志文件
异步运行结果

大规模测试

刚才只是在memq里面存放了一条“异步”任务,下面我们可以来试试多放点任务来试试。比如我们循环1000次cache操作,相当于在队列中存放了1000个待消费的任务。经过消费后,得到的结果如下。
查看worker.log结果

总结

到此,本次试验就结束了。相比较于自己随意的写代码,公司的代码库更具有实际意义,很多工程上的思维是学校的老师教不了的。多实践下,才能明白自身的不足之处。

这里使用的是PHP,所以在runjobs_background.php中执行起来才会很方便。如果使用其他的动态(解释性)语言,也会有如此感受。当然使用Java的话,就需要再写一个反射的工具类,多了一步,但也还是很方便的。

目录
相关文章
|
1月前
|
Java
使用线程池异步执行
使用线程池异步执行
18 0
|
1月前
|
Java
Future:异步任务结果获取
Future:异步任务结果获取
48 0
|
8月前
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
10月前
|
前端开发 JavaScript
说说你对事件循环的理解?
说说你对事件循环的理解?
|
11月前
|
Java BI Spring
定时任务和异步任务
定时任务和异步任务
|
11月前
|
JavaScript Java UED
CompletableFuture 异步处理
CompletableFuture 异步处理
103 0
|
Java C++
c++基于ThreadPool实现灵活的异步任务
c++基于ThreadPool实现灵活的异步任务
|
Java 数据处理 C++
c++11线程池的实现原理及回调函数的使用
c++11线程池的实现原理及回调函数的使用
|
Java 开发者 Spring
异步任务|学习笔记
快速学习异步任务
61 0
|
Java
从源码看异步任务计算FutureTask
大家是否熟悉FutureTask呢?或者说你有没有异步计算的需求呢?FutureTask就能够很好的帮助你实现异步计算,并且可以实现同步获取异步任务的计算结果。下面我们就一起从源码分析一下FutureTask。
81 0