ZMQ之异步管家模式
上文那种实现管家模式的方法比较简单,client还是简单海盗模式中的,仅仅是用API重写了一下。我在测试机上运行了程序,处理10万条请求大约需要14秒的时间,这和代码也有一些关系,因为复制消息帧的时间浪费了CPU处理时间。但真正的问题在于,我们总是逐个循环进行处理(round-trip),即发送-接收-发送-接收……ZMQ内部禁用了TCP发包优化算法(Nagle's algorithm),但逐个处理循环还是比较浪费。
理论归理论,还是需要由实践来检验。我们用一个简单的测试程序来看看逐个处理循环是否真的耗时。这个测试程序会发送一组消息,第一次它发一条收一条,第二次则一起发送再一起接收。两次结果应该是一样的,但速度截然不同。
tripping: Round-trip demonstrator in C
// // Round-trip 模拟 // // 本示例程序使用多线程的方式启动client、worker、以及代理, // 当client处理完毕时会发送信号给主程序。 // #include "czmq.h" static void client_task (void *args, zctx_t *ctx, void *pipe) { void *client = zsocket_new (ctx, ZMQ_DEALER); zmq_setsockopt (client, ZMQ_IDENTITY, "C", 1); zsocket_connect (client, "tcp://localhost:5555"); printf ("开始测试...\n"); zclock_sleep (100); int requests; int64_t start; printf ("同步 round-trip 测试...\n"); start = zclock_time (); for (requests = 0; requests < 10000; requests++) { zstr_send (client, "hello"); char *reply = zstr_recv (client); free (reply); } printf (" %d 次/秒\n", (1000 * 10000) / (int) (zclock_time () - start)); printf ("异步 round-trip 测试...\n"); start = zclock_time (); for (requests = 0; requests < 100000; requests++) zstr_send (client, "hello"); for (requests = 0; requests < 100000; requests++) { char *reply = zstr_recv (client); free (reply); } printf (" %d 次/秒\n", (1000 * 100000) / (int) (zclock_time () - start)); zstr_send (pipe, "完成"); } static void * worker_task (void *args) { zctx_t *ctx = zctx_new (); void *worker = zsocket_new (ctx, ZMQ_DEALER); zmq_setsockopt (worker, ZMQ_IDENTITY, "W", 1); zsocket_connect (worker, "tcp://localhost:5556"); while (1) { zmsg_t *msg = zmsg_recv (worker); zmsg_send (&msg, worker); } zctx_destroy (&ctx); return NULL; } static void * broker_task (void *args) { // 准备上下文和套接字 zctx_t *ctx = zctx_new (); void *frontend = zsocket_new (ctx, ZMQ_ROUTER); void *backend = zsocket_new (ctx, ZMQ_ROUTER); zsocket_bind (frontend, "tcp://*:5555"); zsocket_bind (backend, "tcp://*:5556"); // 初始化轮询对象 zmq_pollitem_t items [] = { { frontend, 0, ZMQ_POLLIN, 0 }, { backend, 0, ZMQ_POLLIN, 0 } }; while (1) { int rc = zmq_poll (items, 2, -1); if (rc == -1) break; // 中断 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (frontend); zframe_t *address = zmsg_pop (msg); zframe_destroy (&address); zmsg_pushstr (msg, "W"); zmsg_send (&msg, backend); } if (items [1].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (backend); zframe_t *address = zmsg_pop (msg); zframe_destroy (&address); zmsg_pushstr (msg, "C"); zmsg_send (&msg, frontend); } } zctx_destroy (&ctx); return NULL; } int main (void) { // 创建线程 zctx_t *ctx = zctx_new (); void *client = zthread_fork (ctx, client_task, NULL); zthread_new (ctx, worker_task, NULL); zthread_new (ctx, broker_task, NULL); // 等待client端管道的信号 char *signal = zstr_recv (client); free (signal); zctx_destroy (&ctx); return 0; }
在我的开发环境中运行结果如下:
Setting up test... Synchronous round-trip test... 9057 calls/second Asynchronous round-trip test... 173010 calls/second
需要注意的是client在运行开始会暂停一段时间,这是因为在向ROUTER套接字发送消息时,若指定标识的套接字没有连接,那么ROUTER会直接丢弃该消息。这个示例中我们没有使用LRU算法,所以当worker连接速度稍慢时就有可能丢失数据,影响测试结果。
我们可以看到,逐个处理循环比异步处理要慢将近20倍,让我们把它应用到管家模式中去。
首先,让我们修改client的API,添加独立的发送和接收方法:
mdcli_t *mdcli_new (char *broker); void mdcli_destroy (mdcli_t **self_p); int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p); zmsg_t *mdcli_recv (mdcli_t *self);
然后花很短的时间就能将同步的client API改造成异步的API:
mdcliapi2: Majordomo asynchronous client API in C
/* ===================================================================== mdcliapi2.c Majordomo Protocol Client API (async version) Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "mdcliapi2.h" // 类结构 // 使用成员函数访问属性 struct _mdcli_t { zctx_t *ctx; // 上下文 char *broker; void *client; // 连接至代理的套接字 int verbose; // 在标准输出打印运行状态 int timeout; // 请求超时时间 }; // --------------------------------------------------------------------- // 连接或重连代理 void s_mdcli_connect_to_broker (mdcli_t *self) { if (self->client) zsocket_destroy (self->ctx, self->client); self->client = zsocket_new (self->ctx, ZMQ_DEALER); zmq_connect (self->client, self->broker); if (self->verbose) zclock_log ("I: 正在连接代理 %s...", self->broker); } // --------------------------------------------------------------------- // 构造函数 mdcli_t * mdcli_new (char *broker, int verbose) { assert (broker); mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t)); self->ctx = zctx_new (); self->broker = strdup (broker); self->verbose = verbose; self->timeout = 2500; // 毫秒 s_mdcli_connect_to_broker (self); return self; } // --------------------------------------------------------------------- // 析构函数 void mdcli_destroy (mdcli_t **self_p) { assert (self_p); if (*self_p) { mdcli_t *self = *self_p; zctx_destroy (&self->ctx); free (self->broker); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 设置请求超时时间 void mdcli_set_timeout (mdcli_t *self, int timeout) { assert (self); self->timeout = timeout; } // --------------------------------------------------------------------- // 发送请求给代理 // 取得请求消息的所有权,发送后销毁 int mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p) { assert (self); assert (request_p); zmsg_t *request = *request_p; // 在消息顶部加入协议规定的帧 // Frame 0: empty (模拟REQ套接字的行为) // Frame 1: "MDPCxy" (6个字节, MDP/Client x.y) // Frame 2: Service name (看打印字符串) zmsg_pushstr (request, service); zmsg_pushstr (request, MDPC_CLIENT); zmsg_pushstr (request, ""); if (self->verbose) { zclock_log ("I: 发送请求给 '%s' 服务:", service); zmsg_dump (request); } zmsg_send (&request, self->client); return 0; } // --------------------------------------------------------------------- // 获取应答消息,若无则返回NULL; // 该函数不会尝试从代理的崩溃中恢复, // 因为我们没有记录那些未收到应答的请求,所以也无法重发。 zmsg_t * mdcli_recv (mdcli_t *self) { assert (self); // 轮询套接字以获取应答 zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC); if (rc == -1) return NULL; // 中断 // 收到应答后进行处理 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (self->client); if (self->verbose) { zclock_log ("I: received reply:"); zmsg_dump (msg); } // 不要处理错误,直接报出 assert (zmsg_size (msg) >= 4); zframe_t *empty = zmsg_pop (msg); assert (zframe_streq (empty, "")); zframe_destroy (&empty); zframe_t *header = zmsg_pop (msg); assert (zframe_streq (header, MDPC_CLIENT)); zframe_destroy (&header); zframe_t *service = zmsg_pop (msg); zframe_destroy (&service); return msg; // Success } if (zctx_interrupted) printf ("W: 收到中断消息,正在中止client...\n"); else if (self->verbose) zclock_log ("W: 严重错误,放弃请求"); return NULL; }
下面是对应的测试代码:
mdclient2: Majordomo client application in C
// // 异步管家模式 - client示例程序 // 使用mdcli API隐藏MDP协议的具体实现 // // 直接编译源码,而不创建类库 #include "mdcliapi2.c" int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose); int count; for (count = 0; count < 100000; count++) { zmsg_t *request = zmsg_new (); zmsg_pushstr (request, "Hello world"); mdcli_send (session, "echo", &request); } for (count = 0; count < 100000; count++) { zmsg_t *reply = mdcli_recv (session); if (reply) zmsg_destroy (&reply); else break; // 使用Ctrl-C中断 } printf ("收到 %d 个应答\n", count); mdcli_destroy (&session); return 0; }
代理和worker的代码没有变,因为我们并没有改变MDP协议。经过对client的改造,我们可以明显看到速度的提升。如以下是同步状况下处理10万条请求的时间:
$ time mdclient 100000 requests/replies processed real 0m14.088s user 0m1.310s sys 0m2.670s
以下是异步请求的情况:
$ time mdclient2 100000 replies received real 0m8.730s user 0m0.920s sys 0m1.550s
让我们建立10个worker,看看效果如何:
$ time mdclient2 100000 replies received real 0m3.863s user 0m0.730s sys 0m0.470s
由于worker获得消息需要通过LRU队列机制,所以并不能做到完全的异步。但是,worker越多其效果也会越好。在我的测试机上,当worker的数量达到8个时,速度就不再提升了——四核处理器只能做这么多。但是,我们仍然获得了近四倍的速度提升,而改造过程只有几分钟而已。此外,代理其实还没有进行优化,它仍会复制消息,而没有实现零拷贝。不过,我们已经做到每秒处理2.5万次请求-应答,已经很不错了。
当然,异步的管家模式也并不完美,有一个显著的缺点:它无法从代理的崩溃中恢复。可以看到mdcliapi2的代码中并没有恢复连接的代码,重新连接需要有以下几点作为前提:
1、每个请求都做了编号,每次应答也含有相应的编号,这就需要修改协议,明确定义。
2、client的API需要保留并跟踪所有已发送、但仍未收到应答的请求。
3、如果代理发生崩溃,client会重发所有消息。
可以看到,高可靠性往往和复杂度成正比,值得在管家模式中应用这一机制吗?这就要看应用场景了。如果是一个名称查询服务,每次会话会调用一次,那不需要应用这一机制;如果是一个位于前端的网页服务,有数千个客户端相连,那可能就需要了。