ZMQ请求应答模式之无中间件的可靠性--自由者模式
一、引言
我们讲了那么多关于中间件的示例,好像有些违背“ZMQ是无中间件”的说法。但要知道在现实生活中,中间件一直是让人又爱又恨的东西。实践中的很多消息架构能都在使用中间件进行分布式架构的搭建,所以说最终的决定还是需要你自己去权衡的。这也是为什么虽然我能驾车10分钟到一个大型商场里购买五箱音量,但我还是会选择走10分钟到楼下的便利店里去买。这种出于经济方面的考虑(时间、精力、成本等)不仅在日常生活中很常见,在软件架构中也很重要。
这就是为什么ZMQ不会强制使用带有中间件的架构,但仍提供了像内置装置这样的中间件供编程人员自由选用。
这一节我们会打破以往使用中间件进行可靠性设计的架构,转而使用点对点架构,即自由者模式,来进行可靠的消息传输。我们的示例程序会是一个名称解析服务。ZMQ中的一个常见问题是:我们如何得知需要连接的端点?在代码中直接写入TCP/IP地址肯定是不合适的;使用配置文件会造成管理上的不便。试想一下,你要在上百台计算机中进行配置,只是为了让它们知道google.com的IP地址是74.125.230.82。
一个ZMQ的名称解析服务需要实现的功能有:
1、将逻辑名称解析为一个或多个端点地址,包括绑定端和连接端。实际使用时,名称服务会提供一组端点。
2、允许我们在不同的环境下,即开发环境和生产环境,进行解析。
3、该服务必须是可靠的,否则应用程序将无法连接到网络。
为管家模式提供名称解析服务会很有用,虽然将代理程序的端点对外暴露也很简单,但是如果用好名称解析服务,那它将成为唯一一个对外暴露的接口,将更便于管理。
我们需要处理的故障类型有:服务崩溃或重启、服务过载、网络因素等。为获取可靠性,我们必须建立一个服务群,当某个服务端崩溃后,客户端可以连接其他的服务端。实践中,两个服务端就已经足够了,但事实上服务端的数量可以是任意个。
在这个架构中,大量客户端和少量服务端进行通信,服务端将套接字绑定至单独的端口,这和管家模式中的代理有很大不同。对于客户端来说,它有这样几种选择:
1、客户端可以使用REQ套接字和懒惰海盗模式,但需要有一个机制防止客户端不断地请求已停止的服务端。
2、客户端可以使用DEALER套接字,向所有的服务端发送请求。很简单,但并不太妙;
3、客户端使用ROUTER套接字,连接特定的服务端。但客户端如何得知服务端的套接字标识呢?一种方式是让服务端主动连接客户端(很复杂),或者将服务端标识写入代码进行固化(很混乱)。
二、模型一:简单重试
让我们先尝试简单的方案,重写懒惰海盗模式,让其能够和多个服务端进行通信。启动服务端时用命令行参数指定端口。然后启动多个服务端。
flserver1: Freelance server, Model One in C
// // 自由者模式 - 服务端 - 模型1 // 提供echo服务 // #include "czmq.h" int main (int argc, char *argv []) { if (argc < 2) { printf ("I: syntax: %s <endpoint>\n", argv [0]); exit (EXIT_SUCCESS); } zctx_t *ctx = zctx_new (); void *server = zsocket_new (ctx, ZMQ_REP); zsocket_bind (server, argv [1]); printf ("I: echo服务端点: %s\n", argv [1]); while (TRUE) { zmsg_t *msg = zmsg_recv (server); if (!msg) break; // 中断 zmsg_send (&msg, server); } if (zctx_interrupted) printf ("W: 中断\n"); zctx_destroy (&ctx); return 0; }
启动客户端,指定一个或多个端点:
flclient1: Freelance client, Model One in C
// // 自由者模式 - 客户端 - 模型1 // 使用REQ套接字请求一个或多个服务端 // #include "czmq.h" #define REQUEST_TIMEOUT 1000 #define MAX_RETRIES 3 // 尝试次数 static zmsg_t * s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request) { printf ("I: 在端点 %s 上尝试请求echo服务...\n", endpoint); void *client = zsocket_new (ctx, ZMQ_REQ); zsocket_connect (client, endpoint); // 发送请求,并等待应答 zmsg_t *msg = zmsg_dup (request); zmsg_send (&msg, client); zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } }; zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC); zmsg_t *reply = NULL; if (items [0].revents & ZMQ_POLLIN) reply = zmsg_recv (client); // 关闭套接字 zsocket_destroy (ctx, client); return reply; } int main (int argc, char *argv []) { zctx_t *ctx = zctx_new (); zmsg_t *request = zmsg_new (); zmsg_addstr (request, "Hello world"); zmsg_t *reply = NULL; int endpoints = argc - 1; if (endpoints == 0) printf ("I: syntax: %s <endpoint> ...\n", argv [0]); else if (endpoints == 1) { // 若只有一个端点,则尝试N次 int retries; for (retries = 0; retries < MAX_RETRIES; retries++) { char *endpoint = argv [1]; reply = s_try_request (ctx, endpoint, request); if (reply) break; // 成功 printf ("W: 没有收到 %s 的应答, 准备重试...\n", endpoint); } } else { // 若有多个端点,则每个尝试一次 int endpoint_nbr; for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) { char *endpoint = argv [endpoint_nbr + 1]; reply = s_try_request (ctx, endpoint, request); if (reply) break; // Successful printf ("W: 没有收到 %s 的应答\n", endpoint); } } if (reply) printf ("服务运作正常\n"); zmsg_destroy (&request); zmsg_destroy (&reply); zctx_destroy (&ctx); return 0; }
可用如下命令运行:
flserver1 tcp://*:5555 & flserver1 tcp://*:5556 & flclient1 tcp://localhost:5555 tcp://localhost:5556
客户端的核心机制是懒惰海盗模式,即获得一次成功的应答后就结束。会有两种情况:
1、如果只有一个服务端,客户端会再尝试N次后停止,这和懒惰海盗模式的逻辑一致。
2、如果有多个服务端,客户端会每个尝试一次,收到应答后停止。
这种机制补充了海盗模式,使其能够克服只有一个服务端的情况。
但是,这种设计无法在现实程序中使用:当有很多客户端连接了服务端,而主服务端崩溃了,那所有客户端都需要在超时后才能继续执行。
三、模型二:批量发送
下面让我们使用DEALER套接字。我们的目标是能再最短的时间里收到一个应答,不能受主服务端崩溃的影响。可以采取以下措施:
1、连接所有的服务端。
2、当有请求时,一次性发送给所有的服务端。
3、等待第一个应答。
4、忽略其他应答。
这样设计客户端时,当发送请求后,所有的服务端都会收到这个请求,并返回应答。如果某个服务端断开连接了,ZMQ可能会将请求发给其他服务端,导致某些服务端会收到两次请求。
更麻烦的是客户端无法得知应答的数量,容易发生混乱。
我们可以为请求进行编号,忽略不匹配的应答。我们要对服务端进行改造,返回的消息中需要包含请求编号:
flserver2: Freelance server, Model Two in C
// // 自由者模式 - 服务端 - 模型2 // 返回带有请求编号的OK信息 // #include "czmq.h" int main (int argc, char *argv []) { if (argc < 2) { printf ("I: syntax: %s <endpoint>\n", argv [0]); exit (EXIT_SUCCESS); } zctx_t *ctx = zctx_new (); void *server = zsocket_new (ctx, ZMQ_REP); zsocket_bind (server, argv [1]); printf ("I: 服务已就绪 %s\n", argv [1]); while (TRUE) { zmsg_t *request = zmsg_recv (server); if (!request) break; // 中断 // 判断请求内容是否正确 assert (zmsg_size (request) == 2); zframe_t *address = zmsg_pop (request); zmsg_destroy (&request); zmsg_t *reply = zmsg_new (); zmsg_add (reply, address); zmsg_addstr (reply, "OK"); zmsg_send (&reply, server); } if (zctx_interrupted) printf ("W: interrupted\n"); zctx_destroy (&ctx); return 0; }
客户端代码:
flclient2: Freelance client, Model Two in C
// // 自由者模式 - 客户端 - 模型2 // 使用DEALER套接字发送批量消息 // #include "czmq.h" // 超时时间 #define GLOBAL_TIMEOUT 2500 // 将客户端API封装成一个类 #ifdef __cplusplus extern "C" { #endif // 声明类结构 typedef struct _flclient_t flclient_t; flclient_t * flclient_new (void); void flclient_destroy (flclient_t **self_p); void flclient_connect (flclient_t *self, char *endpoint); zmsg_t * flclient_request (flclient_t *self, zmsg_t **request_p); #ifdef __cplusplus } #endif int main (int argc, char *argv []) { if (argc == 1) { printf ("I: syntax: %s <endpoint> ...\n", argv [0]); exit (EXIT_SUCCESS); } // 创建自由者模式客户端 flclient_t *client = flclient_new (); // 连接至各个端点 int argn; for (argn = 1; argn < argc; argn++) flclient_connect (client, argv [argn]); // 发送一组请求,并记录时间 int requests = 10000; uint64_t start = zclock_time (); while (requests--) { zmsg_t *request = zmsg_new (); zmsg_addstr (request, "random name"); zmsg_t *reply = flclient_request (client, &request); if (!reply) { printf ("E: 名称解析服务不可用,正在退出\n"); break; } zmsg_destroy (&reply); } printf ("平均请求时间: %d 微秒\n", (int) (zclock_time () - start) / 10); flclient_destroy (&client); return 0; } // -------------------------------------------------------------------- // 类结构 struct _flclient_t { zctx_t *ctx; // 上下文 void *socket; // 用于和服务端通信的DEALER套接字 size_t servers; // 以连接的服务端数量 uint sequence; // 已发送的请求数 }; // -------------------------------------------------------------------- // Constructor flclient_t * flclient_new (void) { flclient_t *self; self = (flclient_t *) zmalloc (sizeof (flclient_t)); self->ctx = zctx_new (); self->socket = zsocket_new (self->ctx, ZMQ_DEALER); return self; } // -------------------------------------------------------------------- // 析构函数 void flclient_destroy (flclient_t **self_p) { assert (self_p); if (*self_p) { flclient_t *self = *self_p; zctx_destroy (&self->ctx); free (self); *self_p = NULL; } } // -------------------------------------------------------------------- // 连接至新的服务端端点 void flclient_connect (flclient_t *self, char *endpoint) { assert (self); zsocket_connect (self->socket, endpoint); self->servers++; } // -------------------------------------------------------------------- // 发送请求,接收应答 // 发送后销毁请求 zmsg_t * flclient_request (flclient_t *self, zmsg_t **request_p) { assert (self); assert (*request_p); zmsg_t *request = *request_p; // 向消息添加编号和空帧 char sequence_text [10]; sprintf (sequence_text, "%u", ++self->sequence); zmsg_pushstr (request, sequence_text); zmsg_pushstr (request, ""); // 向所有已连接的服务端发送请求 int server; for (server = 0; server < self->servers; server++) { zmsg_t *msg = zmsg_dup (request); zmsg_send (&msg, self->socket); } // 接收来自任何服务端的应答 // 因为我们可能poll多次,所以每次都进行计算 zmsg_t *reply = NULL; uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT; while (zclock_time () < endtime) { zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } }; zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC); if (items [0].revents & ZMQ_POLLIN) { // 应答内容是 [empty][sequence][OK] reply = zmsg_recv (self->socket); assert (zmsg_size (reply) == 3); free (zmsg_popstr (reply)); char *sequence = zmsg_popstr (reply); int sequence_nbr = atoi (sequence); free (sequence); if (sequence_nbr == self->sequence) break; } } zmsg_destroy (request_p); return reply; }
几点说明:
1、客户端被封装成了一个API类,将复杂的代码都包装了起来。
2、户端会在几秒之后放弃寻找可用的服务端。
3、客户端需要创建一个合法的REP信封,所以需要添加一个空帧。
程序中,客户端发出了1万次名称解析请求(虽然是假的),并计算平均耗费时间。在我的测试机上,有一个服务端时,耗时60微妙;三个时80微妙。
该模型的优缺点是:
1、优点:简单,容易理解和编写。
2、优点:它工作迅速,有重试机制。
3、缺点:占用了额外的网络带宽。
4、缺点:我们不能为服务端设置优先级,如主服务、次服务等。
5、缺点:服务端不能同时处理多个请求。
四、模型三:Complex and Nasty
批量发送模型看起来不太真实,那就让我们来探索最后这个极度复杂的模型。很有可能在编写完之后我们又会转而使用批量发送,哈哈,这就是我的作风。
我们可以将客户端使用的套接字更换为ROUTER,让我们能够向特定的服务端发送请求,停止向已死亡的服务端发送请求,从而做得尽可能地智能。我们还可以将服务端的套接字更换为ROUTER,从而突破单线程的瓶颈。
但是,使用ROUTER-ROUTER套接字连接两个瞬时套接字是不可行的,节点只有在收到第一条消息时才会为对方生成套接字标识。唯一的方法是让其中一个节点使用持久化的套接字,比较好的方式是让客户端知道服务端的标识,即服务端作为持久化的套接字。
为了避免产生新的配置项,我们直接使用服务端的端点作为套接字标识。
回想一下ZMQ套接字标识是如何工作的。服务端的ROUTER套接字为自己设置一个标识(在绑定之前),当客户端连接时,通过一个握手的过程来交换双方的标识。客户端的ROUTER套接字会先发送一条空消息,服务端为客户端生成一个随机的UUID。然后,服务端会向客户端发送自己的标识。
这样一来,客户端就可以将消息发送给特定的服务端了。不过还有一个问题:我们不知道服务端会在什么时候完成这个握手的过程。如果服务端是在线的,那可能几毫秒就能完成。如果不在线,那可能需要很久很久。
这里有一个矛盾:我们需要知道服务端何时连接成功且能够开始工作。自由者模式不像中间件模式,它的服务端必须要先发送请求后才能的应答。所以在服务端发送消息给客户端之前,客户端必须要先请求服务端,这看似是不可能的。
我有一个解决方法,那就是批量发送。这里发送的不是真正的请求,而是一个试探性的心跳(PING-PONG)。当收到应答时,就说明对方是在线的。
下面让我们制定一个协议,来定义自由者模式是如何传递这种心跳的:10/FLP | ZeroMQ RFC
实现这个协议的服务端很方便,下面就是经过改造的echo服务:
flserver3: Freelance server, Model Three in C
// // 自由者模式 - 服务端 - 模型3 // 使用ROUTER-ROUTER套接字进行通信;单线程。 // #include "czmq.h" int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); zctx_t *ctx = zctx_new (); // 准备服务端套接字,其标识和端点名相同 char *bind_endpoint = "tcp://*:5555"; char *connect_endpoint = "tcp://localhost:5555"; void *server = zsocket_new (ctx, ZMQ_ROUTER); zmq_setsockopt (server, ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint)); zsocket_bind (server, bind_endpoint); printf ("I: 服务端已准备就绪 %s\n", bind_endpoint); while (!zctx_interrupted) { zmsg_t *request = zmsg_recv (server); if (verbose && request) zmsg_dump (request); if (!request) break; // 中断 // Frame 0: 客户端标识 // Frame 1: 心跳,或客户端控制信息帧 // Frame 2: 请求内容 zframe_t *address = zmsg_pop (request); zframe_t *control = zmsg_pop (request); zmsg_t *reply = zmsg_new (); if (zframe_streq (control, "PONG")) zmsg_addstr (reply, "PONG"); else { zmsg_add (reply, control); zmsg_addstr (reply, "OK"); } zmsg_destroy (&request); zmsg_push (reply, address); if (verbose && reply) zmsg_dump (reply); zmsg_send (&reply, server); } if (zctx_interrupted) printf ("W: 中断\n"); zctx_destroy (&ctx); return 0; }
但是,自由者模式的客户端会变得大一写。为了清晰期间,我们将其拆分为两个类来实现。首先是在上层使用的程序:
flclient3: Freelance client, Model Three in C
// // 自由者模式 - 客户端 - 模型3 // 使用flcliapi类来封装自由者模式 // // 直接编译,不建类库 #include "flcliapi.c" int main (void) { // 创建自由者模式实例 flcliapi_t *client = flcliapi_new (); // 链接至服务器端点 flcliapi_connect (client, "tcp://localhost:5555"); flcliapi_connect (client, "tcp://localhost:5556"); flcliapi_connect (client, "tcp://localhost:5557"); // 发送随机请求,计算时间 int requests = 1000; uint64_t start = zclock_time (); while (requests--) { zmsg_t *request = zmsg_new (); zmsg_addstr (request, "random name"); zmsg_t *reply = flcliapi_request (client, &request); if (!reply) { printf ("E: 名称解析服务不可用,正在退出\n"); break; } zmsg_destroy (&reply); } printf ("平均执行时间: %d usec\n", (int) (zclock_time () - start) / 10); flcliapi_destroy (&client); return 0; }
下面是该模式复杂的实现过程:
flcliapi: Freelance client API in C
/* ===================================================================== flcliapi - Freelance Pattern agent class Model 3: uses ROUTER socket to address specific services --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "flcliapi.h" // 请求超时时间 #define GLOBAL_TIMEOUT 3000 // msecs // 心跳间隔 #define PING_INTERVAL 2000 // msecs // 判定服务死亡的时间 #define SERVER_TTL 6000 // msecs // ===================================================================== // 同步部分,在应用程序层面运行 // --------------------------------------------------------------------- // 类结构 struct _flcliapi_t { zctx_t *ctx; // 上下文 void *pipe; // 用于和主线程通信的套接字 }; // 这是运行后台代理程序的线程 static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe); // --------------------------------------------------------------------- // 构造函数 flcliapi_t * flcliapi_new (void) { flcliapi_t *self; self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t)); self->ctx = zctx_new (); self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL); return self; } // --------------------------------------------------------------------- // 析构函数 void flcliapi_destroy (flcliapi_t **self_p) { assert (self_p); if (*self_p) { flcliapi_t *self = *self_p; zctx_destroy (&self->ctx); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 连接至新服务器端点 // 消息内容:[CONNECT][endpoint] void flcliapi_connect (flcliapi_t *self, char *endpoint) { assert (self); assert (endpoint); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "CONNECT"); zmsg_addstr (msg, endpoint); zmsg_send (&msg, self->pipe); zclock_sleep (100); // 等待连接 } // --------------------------------------------------------------------- // 发送并销毁请求,接收应答 zmsg_t * flcliapi_request (flcliapi_t *self, zmsg_t **request_p) { assert (self); assert (*request_p); zmsg_pushstr (*request_p, "REQUEST"); zmsg_send (request_p, self->pipe); zmsg_t *reply = zmsg_recv (self->pipe); if (reply) { char *status = zmsg_popstr (reply); if (streq (status, "FAILED")) zmsg_destroy (&reply); free (status); } return reply; } // ===================================================================== // 异步部分,在后台运行 // --------------------------------------------------------------------- // 单个服务端信息 typedef struct { char *endpoint; // 服务端端点/套接字标识 uint alive; // 是否在线 int64_t ping_at; // 下一次心跳时间 int64_t expires; // 过期时间 } server_t; server_t * server_new (char *endpoint) { server_t *self = (server_t *) zmalloc (sizeof (server_t)); self->endpoint = strdup (endpoint); self->alive = 0; self->ping_at = zclock_time () + PING_INTERVAL; self->expires = zclock_time () + SERVER_TTL; return self; } void server_destroy (server_t **self_p) { assert (self_p); if (*self_p) { server_t *self = *self_p; free (self->endpoint); free (self); *self_p = NULL; } } int server_ping (char *key, void *server, void *socket) { server_t *self = (server_t *) server; if (zclock_time () >= self->ping_at) { zmsg_t *ping = zmsg_new (); zmsg_addstr (ping, self->endpoint); zmsg_addstr (ping, "PING"); zmsg_send (&ping, socket); self->ping_at = zclock_time () + PING_INTERVAL; } return 0; } int server_tickless (char *key, void *server, void *arg) { server_t *self = (server_t *) server; uint64_t *tickless = (uint64_t *) arg; if (*tickless > self->ping_at) *tickless = self->ping_at; return 0; } // --------------------------------------------------------------------- // 后台处理程序信息 typedef struct { zctx_t *ctx; // 上下文 void *pipe; // 用于应用程序通信的套接字 void *router; // 用于服务端通信的套接字 zhash_t *servers; // 已连接的服务端 zlist_t *actives; // 在线的服务端 uint sequence; // 请求编号 zmsg_t *request; // 当前请求 zmsg_t *reply; // 当前应答 int64_t expires; // 请求过期时间 } agent_t; agent_t * agent_new (zctx_t *ctx, void *pipe) { agent_t *self = (agent_t *) zmalloc (sizeof (agent_t)); self->ctx = ctx; self->pipe = pipe; self->router = zsocket_new (self->ctx, ZMQ_ROUTER); self->servers = zhash_new (); self->actives = zlist_new (); return self; } void agent_destroy (agent_t **self_p) { assert (self_p); if (*self_p) { agent_t *self = *self_p; zhash_destroy (&self->servers); zlist_destroy (&self->actives); zmsg_destroy (&self->request); zmsg_destroy (&self->reply); free (self); *self_p = NULL; } } // 当服务端从列表中移除时,回调该函数。 static void s_server_free (void *argument) { server_t *server = (server_t *) argument; server_destroy (&server); } void agent_control_message (agent_t *self) { zmsg_t *msg = zmsg_recv (self->pipe); char *command = zmsg_popstr (msg); if (streq (command, "CONNECT")) { char *endpoint = zmsg_popstr (msg); printf ("I: connecting to %s...\n", endpoint); int rc = zmq_connect (self->router, endpoint); assert (rc == 0); server_t *server = server_new (endpoint); zhash_insert (self->servers, endpoint, server); zhash_freefn (self->servers, endpoint, s_server_free); zlist_append (self->actives, server); server->ping_at = zclock_time () + PING_INTERVAL; server->expires = zclock_time () + SERVER_TTL; free (endpoint); } else if (streq (command, "REQUEST")) { assert (!self->request); // 遵循请求-应答循环 // 将请求编号和空帧加入消息顶部 char sequence_text [10]; sprintf (sequence_text, "%u", ++self->sequence); zmsg_pushstr (msg, sequence_text); // 获取请求消息的所有权 self->request = msg; msg = NULL; // 设置请求过期时间 self->expires = zclock_time () + GLOBAL_TIMEOUT; } free (command); zmsg_destroy (&msg); } void agent_router_message (agent_t *self) { zmsg_t *reply = zmsg_recv (self->router); // 第一帧是应答的服务端标识 char *endpoint = zmsg_popstr (reply); server_t *server = (server_t *) zhash_lookup (self->servers, endpoint); assert (server); free (endpoint); if (!server->alive) { zlist_append (self->actives, server); server->alive = 1; } server->ping_at = zclock_time () + PING_INTERVAL; server->expires = zclock_time () + SERVER_TTL; // 第二帧是应答的编号 char *sequence = zmsg_popstr (reply); if (atoi (sequence) == self->sequence) { zmsg_pushstr (reply, "OK"); zmsg_send (&reply, self->pipe); zmsg_destroy (&self->request); } else zmsg_destroy (&reply); } // --------------------------------------------------------------------- // 异步的后台代理会维护一个服务端池,处理请求和应答。 static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe) { agent_t *self = agent_new (ctx, pipe); zmq_pollitem_t items [] = { { self->pipe, 0, ZMQ_POLLIN, 0 }, { self->router, 0, ZMQ_POLLIN, 0 } }; while (!zctx_interrupted) { // 计算超时时间 uint64_t tickless = zclock_time () + 1000 * 3600; if (self->request && tickless > self->expires) tickless = self->expires; zhash_foreach (self->servers, server_tickless, &tickless); int rc = zmq_poll (items, 2, (tickless - zclock_time ()) * ZMQ_POLL_MSEC); if (rc == -1) break; // 上下文对象被关闭 if (items [0].revents & ZMQ_POLLIN) agent_control_message (self); if (items [1].revents & ZMQ_POLLIN) agent_router_message (self); // 如果我们需要处理一项请求,将其发送给下一个可用的服务端 if (self->request) { if (zclock_time () >= self->expires) { // 请求超时 zstr_send (self->pipe, "FAILED"); zmsg_destroy (&self->request); } else { // 寻找可用的服务端 while (zlist_size (self->actives)) { server_t *server = (server_t *) zlist_first (self->actives); if (zclock_time () >= server->expires) { zlist_pop (self->actives); server->alive = 0; } else { zmsg_t *request = zmsg_dup (self->request); zmsg_pushstr (request, server->endpoint); zmsg_send (&request, self->router); break; } } } } // 断开并删除已过期的服务端 // 发送心跳给空闲服务器 zhash_foreach (self->servers, server_ping, self->router); } agent_destroy (&self); }