ZMQ之异步管家模式

简介: ZMQ之异步管家模式

ZMQ之异步管家模式


上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。

       理论归理论,还是需要由实践来检验。我们用一个简单的测试程序来看看逐个处理循环是否真的耗时。这个测试程序会发送一组消息,第一次它发一条收一条,第二次则一起发送再一起接收。两次结果应该是一样的,但速度截然不同。

       tripping: Round-trip demonstrator in C

//
//  Round-trip 模拟
//
//  本示例程序使用多线程的方式启动client、worker、以及代理,
//  当client处理完毕时会发送信号给主程序。
//
#include "czmq.h"
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
    void *client = zsocket_new (ctx, ZMQ_DEALER);
    zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1);
    zsocket_connect (client, "tcp://localhost:5555");
    printf ("开始测试...\n");
    zclock_sleep (100);
    int requests;
    int64_t start;
    printf ("同步 round-trip 测试...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");
        char *reply = zstr_recv (client);
        free (reply);
    }
    printf (" %d 次/秒\n",
        (1000 * 10000) / (int) (zclock_time () - start));
    printf ("异步 round-trip 测试...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++)
        zstr_send (client, "hello");
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        free (reply);
    }
    printf (" %d 次/秒\n",
        (1000 * 100000) / (int) (zclock_time () - start));
    zstr_send (pipe, "完成");
}
static void *
worker_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1);
    zsocket_connect (worker, "tcp://localhost:5556");
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}
static void *
broker_task (void *args)
{
    //  准备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");
    zsocket_bind (backend,  "tcp://*:5556");
    //  初始化轮询对象
    zmq_pollitem_t items [] = {
        { frontend, 0, ZMQ_POLLIN, 0 },
        { backend,  0, ZMQ_POLLIN, 0 }
    };
    while (1) {
        int rc = zmq_poll (items, 2, -1);
        if (rc == -1)
            break;              //  中断
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (frontend);
            zframe_t *address = zmsg_pop (msg);
            zframe_destroy (&address);
            zmsg_pushstr (msg, "W");
            zmsg_send (&msg, backend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (backend);
            zframe_t *address = zmsg_pop (msg);
            zframe_destroy (&address);
            zmsg_pushstr (msg, "C");
            zmsg_send (&msg, frontend);
        }
    }
    zctx_destroy (&ctx);
    return NULL;
}
int main (void)
{
    //  创建线程
    zctx_t *ctx = zctx_new ();
    void *client = zthread_fork (ctx, client_task, NULL);
    zthread_new (ctx, worker_task, NULL);
    zthread_new (ctx, broker_task, NULL);
    //  等待client端管道的信号
    char *signal = zstr_recv (client);
    free (signal);
    zctx_destroy (&ctx);
    return 0;
}

在我的开发环境中运行结果如下:

Setting up test...
Synchronous round-trip test...
 9057 calls/second
Asynchronous round-trip test...
 173010 calls/second

需要注意的是client在运行开始会暂停一段时间,这是因为在向ROUTER套接字发送消息时,若指定标识的套接字没有连接,那么ROUTER会直接丢弃该消息。这个示例中我们没有使用LRU算法,所以当worker连接速度稍慢时就有可能丢失数据,影响测试结果。

       我们可以看到,逐个处理循环比异步处理要慢将近20倍,让我们把它应用到管家模式中去。

       首先,让我们修改client的API,添加独立的发送和接收方法:

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
int      mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t  *mdcli_recv    (mdcli_t *self);

然后花很短的时间就能将同步的client API改造成异步的API:

       mdcliapi2: Majordomo asynchronous client API in C

/*  =====================================================================
    mdcliapi2.c
    Majordomo Protocol Client API (async version)
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
#include "mdcliapi2.h"
//  类结构
//  使用成员函数访问属性
struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连接至代理的套接字
    int verbose;                //  在标准输出打印运行状态
    int timeout;                //  请求超时时间
};
//  ---------------------------------------------------------------------
//  连接或重连代理
void s_mdcli_connect_to_broker (mdcli_t *self)
{
    if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);
}
//  ---------------------------------------------------------------------
//  构造函数
mdcli_t *
mdcli_new (char *broker, int verbose)
{
    assert (broker);
    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
    s_mdcli_connect_to_broker (self);
    return self;
}
//  ---------------------------------------------------------------------
//  析构函数
void
mdcli_destroy (mdcli_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  设置请求超时时间
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
    assert (self);
    self->timeout = timeout;
}
//  ---------------------------------------------------------------------
//  发送请求给代理
//  取得请求消息的所有权,发送后销毁
int
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
    assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;
    //  在消息顶部加入协议规定的帧
    //  Frame 0: empty (模拟REQ套接字的行为)
    //  Frame 1: "MDPCxy" (6个字节, MDP/Client x.y)
    //  Frame 2: Service name (看打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    zmsg_pushstr (request, "");
    if (self->verbose) {
        zclock_log ("I: 发送请求给 '%s' 服务:", service);
        zmsg_dump (request);
    }
    zmsg_send (&request, self->client);
    return 0;
}
//  ---------------------------------------------------------------------
//  获取应答消息,若无则返回NULL;
//  该函数不会尝试从代理的崩溃中恢复,
//  因为我们没有记录那些未收到应答的请求,所以也无法重发。
zmsg_t *
mdcli_recv (mdcli_t *self)
{
    assert (self);
    //  轮询套接字以获取应答
    zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } };
    int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
    if (rc == -1)
        return NULL;            //  中断
    //  收到应答后进行处理
    if (items [0].revents & ZMQ_POLLIN) {
        zmsg_t *msg = zmsg_recv (self->client);
        if (self->verbose) {
            zclock_log ("I: received reply:");
            zmsg_dump (msg);
        }
        //  不要处理错误,直接报出
        assert (zmsg_size (msg) >= 4);
        zframe_t *empty = zmsg_pop (msg);
        assert (zframe_streq (empty, ""));
        zframe_destroy (&empty);
        zframe_t *header = zmsg_pop (msg);
        assert (zframe_streq (header, MDPC_CLIENT));
        zframe_destroy (&header);
        zframe_t *service = zmsg_pop (msg);
        zframe_destroy (&service);
        return msg;     //  Success
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,正在中止client...\n");
    else
    if (self->verbose)
        zclock_log ("W: 严重错误,放弃请求");
    return NULL;
}

下面是对应的测试代码:

       mdclient2: Majordomo client application in C

//
//  异步管家模式 - client示例程序
//  使用mdcli API隐藏MDP协议的具体实现
//
//  直接编译源码,而不创建类库
#include "mdcliapi2.c"
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdcli_send (session, "echo", &request);
    }
    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdcli_recv (session);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  使用Ctrl-C中断
    }
    printf ("收到 %d 个应答\n", count);
    mdcli_destroy (&session);
    return 0;
}

代理和worker的代码没有变,因为我们并没有改变MDP协议。经过对client的改造,我们可以明显看到速度的提升。如以下是同步状况下处理10万条请求的时间:

$ time mdclient
100000 requests/replies processed
real    0m14.088s
user    0m1.310s
sys     0m2.670s

以下是异步请求的情况:

$ time mdclient2
100000 replies received
real    0m8.730s
user    0m0.920s
sys     0m1.550s

让我们建立10个worker,看看效果如何:

$ time mdclient2
100000 replies received
real    0m3.863s
user    0m0.730s
sys     0m0.470s

由于worker获得消息需要通过LRU队列机制,所以并不能做到完全的异步。但是,worker越多其效果也会越好。在我的测试机上,当worker的数量达到8个时,速度就不再提升了——四核处理器只能做这么多。但是,我们仍然获得了近四倍的速度提升,而改造过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制消息,而没有实现零拷贝。不过,我们已经做到每秒处理2.5万次请求-应答,已经很不错了。

       当然,异步的管家模式也并不完美,有一个显著的缺点:它无法从代理的崩溃中恢复。可以看到mdcliapi2的代码中并没有恢复连接的代码,重新连接需要有以下几点作为前提:

               1、每个请求都做了编号,每次应答也含有相应的编号,这就需要修改协议,明确定义。

               2、client的API需要保留并跟踪所有已发送、但仍未收到应答的请求。

               3、如果代理发生崩溃,client会重发所有消息。

       可以看到,高可靠性往往和复杂度成正比,值得在管家模式中应用这一机制吗?这就要看应用场景了。如果是一个名称查询服务,每次会话会调用一次,那不需要应用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就需要了。

相关文章
|
1月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
196 4
|
5月前
|
消息中间件 API RocketMQ
消息队列 MQ产品使用合集之设备在国外收不到指令,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
传感器 负载均衡 物联网
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
477 1
|
5月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
6月前
|
存储 负载均衡 安全
MQTT常见问题之MQTT使用共享订阅失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
移动开发 小程序 Go
【社区每周】小程序消息订阅插件升级为消息订阅接口(2022年8月第五期)
【社区每周】小程序消息订阅插件升级为消息订阅接口(2022年8月第五期)
42 0
|
消息中间件 网络协议 中间件
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
|
消息中间件 前端开发 NoSQL
Win11环境下使用Flask配合Celery异步推送实时/定时消息(Socket.io)
一般情况下,Celery被用来处理耗时任务,比如千篇一律的发邮件或者文件上传之类,本次使用Celery实时或者定时发送基于Websocket的消息队列,因为如果前端已经摒弃老旧的轮询策略,使用Websocket,后端则需要相应的配合Celery进行对持久化的Websocket链接主动推送消息,这种场景在生产环境中还是很常见的,但是网上却鲜有文章阐述,而Celery官方对此的说明是
Win11环境下使用Flask配合Celery异步推送实时/定时消息(Socket.io)
|
算法 程序员 API
ZMQ之面向服务的可靠队列(管家模式)
ZMQ之面向服务的可靠队列(管家模式)
ZMQ之面向服务的可靠队列(管家模式)
|
数据采集 负载均衡
我的mqtt协议和emqttd开源项目个人理解(18) - 一个客户端sub很多主题和数据,出现宕机?使用本地共享订阅解决!
我的mqtt协议和emqttd开源项目个人理解(18) - 一个客户端sub很多主题和数据,出现宕机?使用本地共享订阅解决!
444 0
我的mqtt协议和emqttd开源项目个人理解(18) - 一个客户端sub很多主题和数据,出现宕机?使用本地共享订阅解决!