ZMQ之面向服务的可靠队列(管家模式)
管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。
引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。
在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。
所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。
管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的ZFL library。
mdcli_t *mdcli_new (char *broker); void mdcli_destroy (mdcli_t **self_p); zmsg_t *mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p);
就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。
mdwrk_t *mdwrk_new (char *broker,char *service); void mdwrk_destroy (mdwrk_t **self_p); zmsg_t *mdwrk_recv (mdwrk_t *self, zmsg_t *reply);
上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。
两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:
mdcliapi: Majordomo client API in C
/* ===================================================================== mdcliapi.c Majordomo Protocol Client API Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "mdcliapi.h" // 类结构 // 我们会通过成员方法来访问这些属性 struct _mdcli_t { zctx_t *ctx; // 上下文 char *broker; void *client; // 连接至代理的套接字 int verbose; // 使用标准输出打印当前活动 int timeout; // 请求超时时间 int retries; // 请求重试次数 }; // --------------------------------------------------------------------- // 连接或重连代理 void s_mdcli_connect_to_broker (mdcli_t *self) { if (self->client) zsocket_destroy (self->ctx, self->client); self->client = zsocket_new (self->ctx, ZMQ_REQ); zmq_connect (self->client, self->broker); if (self->verbose) zclock_log ("I: 正在连接至代理 %s...", self->broker); } // --------------------------------------------------------------------- // 构造函数 mdcli_t * mdcli_new (char *broker, int verbose) { assert (broker); mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t)); self->ctx = zctx_new (); self->broker = strdup (broker); self->verbose = verbose; self->timeout = 2500; // 毫秒 self->retries = 3; // 尝试次数 s_mdcli_connect_to_broker (self); return self; } // --------------------------------------------------------------------- // 析构函数 void mdcli_destroy (mdcli_t **self_p) { assert (self_p); if (*self_p) { mdcli_t *self = *self_p; zctx_destroy (&self->ctx); free (self->broker); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 设定请求超时时间 void mdcli_set_timeout (mdcli_t *self, int timeout) { assert (self); self->timeout = timeout; } // --------------------------------------------------------------------- // 设定请求重试次数 void mdcli_set_retries (mdcli_t *self, int retries) { assert (self); self->retries = retries; } // --------------------------------------------------------------------- // 向代理发送请求,并尝试获取应答; // 对消息保持所有权,发送后销毁; // 返回应答消息,或NULL。 zmsg_t * mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p) { assert (self); assert (request_p); zmsg_t *request = *request_p; // 用协议前缀包装消息 // Frame 1: "MDPCxy" (six bytes, MDP/Client x.y) // Frame 2: 服务名称 (可打印字符串) zmsg_pushstr (request, service); zmsg_pushstr (request, MDPC_CLIENT); if (self->verbose) { zclock_log ("I: 发送请求给 '%s' 服务:", service); zmsg_dump (request); } int retries_left = self->retries; while (retries_left && !zctx_interrupted) { zmsg_t *msg = zmsg_dup (request); zmsg_send (&msg, self->client); while (TRUE) { // 轮询套接字以接收应答,有超时时间 zmq_pollitem_t items [] = { { self->client, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC); if (rc == -1) break; // 中断 // 收到应答后进行处理 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (self->client); if (self->verbose) { zclock_log ("I: received reply:"); zmsg_dump (msg); } // 不要尝试处理错误,直接报错即可 assert (zmsg_size (msg) >= 3); zframe_t *header = zmsg_pop (msg); assert (zframe_streq (header, MDPC_CLIENT)); zframe_destroy (&header); zframe_t *reply_service = zmsg_pop (msg); assert (zframe_streq (reply_service, service)); zframe_destroy (&reply_service); zmsg_destroy (&request); return msg; // 成功 } else if (--retries_left) { if (self->verbose) zclock_log ("W: no reply, reconnecting..."); // 重连并重发消息 s_mdcli_connect_to_broker (self); zmsg_t *msg = zmsg_dup (request); zmsg_send (&msg, self->client); } else { if (self->verbose) zclock_log ("W: 发生严重错误,放弃重试。"); break; // 放弃 } } } if (zctx_interrupted) printf ("W: 收到中断消息,结束client进程...\n"); zmsg_destroy (&request); return NULL; }
以下测试程序会执行10万次请求应答:
mdclient: Majordomo client application in C
// // 管家模式协议 - 客户端示例 // 使用mdcli API隐藏管家模式协议的内部实现 // // 让我们直接编译这段代码,不生成类库 #include "mdcliapi.c" int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose); int count; for (count = 0; count < 100000; count++) { zmsg_t *request = zmsg_new (); zmsg_pushstr (request, "Hello world"); zmsg_t *reply = mdcli_send (session, "echo", &request); if (reply) zmsg_destroy (&reply); else break; // 中断或停止 } printf ("已处理 %d 次请求-应答\n", count); mdcli_destroy (&session); return 0; }
下面是worker的API:
mdwrkapi: Majordomo worker API in C
/* ===================================================================== mdwrkapi.c Majordomo Protocol Worker API Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "mdwrkapi.h" // 可靠性参数 #define HEARTBEAT_LIVENESS 3 // 合理值:3-5 // 类结构 // 使用成员函数访问属性 struct _mdwrk_t { zctx_t *ctx; // 上下文 char *broker; char *service; void *worker; // 连接至代理的套接字 int verbose; // 使用标准输出打印活动 // 心跳设置 uint64_t heartbeat_at; // 发送心跳的时间 size_t liveness; // 尝试次数 int heartbeat; // 心跳延时,单位:毫秒 int reconnect; // 重连延时,单位:毫秒 // 内部状态 int expect_reply; // 初始值为0 // 应答地址,如果存在的话 zframe_t *reply_to; }; // --------------------------------------------------------------------- // 发送消息给代理 // 如果没有提供消息,则内部创建一个 static void s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option, zmsg_t *msg) { msg = msg? zmsg_dup (msg): zmsg_new (); // 将协议信封压入消息顶部 if (option) zmsg_pushstr (msg, option); zmsg_pushstr (msg, command); zmsg_pushstr (msg, MDPW_WORKER); zmsg_pushstr (msg, ""); if (self->verbose) { zclock_log ("I: sending %s to broker", mdps_commands [(int) *command]); zmsg_dump (msg); } zmsg_send (&msg, self->worker); } // --------------------------------------------------------------------- // 连接或重连代理 void s_mdwrk_connect_to_broker (mdwrk_t *self) { if (self->worker) zsocket_destroy (self->ctx, self->worker); self->worker = zsocket_new (self->ctx, ZMQ_DEALER); zmq_connect (self->worker, self->broker); if (self->verbose) zclock_log ("I: 正在连接代理 %s...", self->broker); // 向代理注册服务类型 s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL); // 当心跳健康度为零,表示代理已断开连接 self->liveness = HEARTBEAT_LIVENESS; self->heartbeat_at = zclock_time () + self->heartbeat; } // --------------------------------------------------------------------- // 构造函数 mdwrk_t * mdwrk_new (char *broker,char *service, int verbose) { assert (broker); assert (service); mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t)); self->ctx = zctx_new (); self->broker = strdup (broker); self->service = strdup (service); self->verbose = verbose; self->heartbeat = 2500; // 毫秒 self->reconnect = 2500; // 毫秒 s_mdwrk_connect_to_broker (self); return self; } // --------------------------------------------------------------------- // 析构函数 void mdwrk_destroy (mdwrk_t **self_p) { assert (self_p); if (*self_p) { mdwrk_t *self = *self_p; zctx_destroy (&self->ctx); free (self->broker); free (self->service); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 设置心跳延迟 void mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat) { self->heartbeat = heartbeat; } // --------------------------------------------------------------------- // 设置重连延迟 void mdwrk_set_reconnect (mdwrk_t *self, int reconnect) { self->reconnect = reconnect; } // --------------------------------------------------------------------- // 若有应答则发送给代理,并等待新的请求 zmsg_t * mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p) { // 格式化并发送请求传入的应答 assert (reply_p); zmsg_t *reply = *reply_p; assert (reply || !self->expect_reply); if (reply) { assert (self->reply_to); zmsg_wrap (reply, self->reply_to); s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply); zmsg_destroy (reply_p); } self->expect_reply = 1; while (TRUE) { zmq_pollitem_t items [] = { { self->worker, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC); if (rc == -1) break; // 中断 if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (self->worker); if (!msg) break; // 中断 if (self->verbose) { zclock_log ("I: 从代理处获得消息:"); zmsg_dump (msg); } self->liveness = HEARTBEAT_LIVENESS; // 不要处理错误,直接报错即可 assert (zmsg_size (msg) >= 3); zframe_t *empty = zmsg_pop (msg); assert (zframe_streq (empty, "")); zframe_destroy (&empty); zframe_t *header = zmsg_pop (msg); assert (zframe_streq (header, MDPW_WORKER)); zframe_destroy (&header); zframe_t *command = zmsg_pop (msg); if (zframe_streq (command, MDPW_REQUEST)) { // 这里需要将消息中空帧之前的所有地址都保存起来, // 但在这里我们暂时只保存一个 self->reply_to = zmsg_unwrap (msg); zframe_destroy (&command); return msg; // 处理请求 } else if (zframe_streq (command, MDPW_HEARTBEAT)) ; // 不对心跳做任何处理 else if (zframe_streq (command, MDPW_DISCONNECT)) s_mdwrk_connect_to_broker (self); else { zclock_log ("E: 消息不合法"); zmsg_dump (msg); } zframe_destroy (&command); zmsg_destroy (&msg); } else if (--self->liveness == 0) { if (self->verbose) zclock_log ("W: 失去与代理的连接 - 正在重试..."); zclock_sleep (self->reconnect); s_mdwrk_connect_to_broker (self); } // 适时地发送消息 if (zclock_time () > self->heartbeat_at) { s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL); self->heartbeat_at = zclock_time () + self->heartbeat; } } if (zctx_interrupted) printf ("W: 收到中断消息,中止worker...\n"); return NULL; }
以下测试程序实现了名为echo的服务:
mdworker: Majordomo worker application in C
// // 管家模式协议 - worker示例 // 使用mdwrk API隐藏MDP协议的内部实现 // // 让我们直接编译代码,而不创建类库 #include "mdwrkapi.c" int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); mdwrk_t *session = mdwrk_new ( "tcp://localhost:5555", "echo", verbose); zmsg_t *reply = NULL; while (1) { zmsg_t *request = mdwrk_recv (session, &reply); if (request == NULL) break; // worker被中止 reply = request; // echo服务……其实很复杂:) } mdwrk_destroy (&session); return 0; }
几点说明:
1、API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。
2、wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。
3、API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。
也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。
下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。
为了让C语言代码更为易读易写,我使用了ZFL项目提供的哈希和链表容器,并命名为zhash和zlist。如果使用现代语言编写,那自然可以使用其内置的容器。
// // 管家模式协议 - 代理 // 协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现 // #include "czmq.h" #include "mdp.h" // 一般我们会从配置文件中获取以下值 #define HEARTBEAT_LIVENESS 3 // 合理值:3-5 #define HEARTBEAT_INTERVAL 2500 // 单位:毫秒 #define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS // 定义一个代理 typedef struct { zctx_t *ctx; // 上下文 void *socket; // 用于连接client和worker的套接字 int verbose; // 使用标准输出打印活动信息 char *endpoint; // 代理绑定到的端点 zhash_t *services; // 已知服务的哈希表 zhash_t *workers; // 已知worker的哈希表 zlist_t *waiting; // 正在等待的worker队列 uint64_t heartbeat_at; // 发送心跳的时间 } broker_t; // 定义一个服务 typedef struct { char *name; // 服务名称 zlist_t *requests; // 客户端请求队列 zlist_t *waiting; // 正在等待的worker队列 size_t workers; // 可用worker数 } service_t; // 定义一个worker,状态为空闲或占用 typedef struct { char *identity; // worker的标识 zframe_t *address; // 地址帧 service_t *service; // 所属服务 int64_t expiry; // 过期时间,从未收到心跳起计时 } worker_t; // --------------------------------------------------------------------- // 代理使用的函数 static broker_t * s_broker_new (int verbose); static void s_broker_destroy (broker_t **self_p); static void s_broker_bind (broker_t *self, char *endpoint); static void s_broker_purge_workers (broker_t *self); // 服务使用的函数 static service_t * s_service_require (broker_t *self, zframe_t *service_frame); static void s_service_destroy (void *argument); static void s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg); static void s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg); // worker使用的函数 static worker_t * s_worker_require (broker_t *self, zframe_t *address); static void s_worker_delete (broker_t *self, worker_t *worker, int disconnect); static void s_worker_destroy (void *argument); static void s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg); static void s_worker_send (broker_t *self, worker_t *worker, char *command, char *option, zmsg_t *msg); static void s_worker_waiting (broker_t *self, worker_t *worker); // 客户端使用的函数 static void s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg); // --------------------------------------------------------------------- // 主程序 int main (int argc, char *argv []) { int verbose = (argc > 1 && streq (argv [1], "-v")); broker_t *self = s_broker_new (verbose); s_broker_bind (self, "tcp://*:5555"); // 接受并处理消息,直至程序被中止 while (TRUE) { zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } }; int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC); if (rc == -1) break; // 中断 // Process next input message, if any if (items [0].revents & ZMQ_POLLIN) { zmsg_t *msg = zmsg_recv (self->socket); if (!msg) break; // 中断 if (self->verbose) { zclock_log ("I: 收到消息:"); zmsg_dump (msg); } zframe_t *sender = zmsg_pop (msg); zframe_t *empty = zmsg_pop (msg); zframe_t *header = zmsg_pop (msg); if (zframe_streq (header, MDPC_CLIENT)) s_client_process (self, sender, msg); else if (zframe_streq (header, MDPW_WORKER)) s_worker_process (self, sender, msg); else { zclock_log ("E: 非法消息:"); zmsg_dump (msg); zmsg_destroy (&msg); } zframe_destroy (&sender); zframe_destroy (&empty); zframe_destroy (&header); } // 断开并删除过期的worker // 适时地发送心跳给worker if (zclock_time () > self->heartbeat_at) { s_broker_purge_workers (self); worker_t *worker = (worker_t *) zlist_first (self->waiting); while (worker) { s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL); worker = (worker_t *) zlist_next (self->waiting); } self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL; } } if (zctx_interrupted) printf ("W: 收到中断消息,关闭中...\n"); s_broker_destroy (&self); return 0; } // --------------------------------------------------------------------- // 代理对象的构造函数 static broker_t * s_broker_new (int verbose) { broker_t *self = (broker_t *) zmalloc (sizeof (broker_t)); // 初始化代理状态 self->ctx = zctx_new (); self->socket = zsocket_new (self->ctx, ZMQ_ROUTER); self->verbose = verbose; self->services = zhash_new (); self->workers = zhash_new (); self->waiting = zlist_new (); self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL; return self; } // --------------------------------------------------------------------- // 代理对象的析构函数 static void s_broker_destroy (broker_t **self_p) { assert (self_p); if (*self_p) { broker_t *self = *self_p; zctx_destroy (&self->ctx); zhash_destroy (&self->services); zhash_destroy (&self->workers); zlist_destroy (&self->waiting); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 将代理套接字绑定至端点,可以重复调用该函数 // 我们使用一个套接字来同时处理client和worker void s_broker_bind (broker_t *self, char *endpoint) { zsocket_bind (self->socket, endpoint); zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint); } // --------------------------------------------------------------------- // 删除空闲状态中过期的worker static void s_broker_purge_workers (broker_t *self) { worker_t *worker = (worker_t *) zlist_first (self->waiting); while (worker) { if (zclock_time () < worker->expiry) continue; // 该worker未过期,停止搜索 if (self->verbose) zclock_log ("I: 正在删除过期的worker: %s", worker->identity); s_worker_delete (self, worker, 0); worker = (worker_t *) zlist_first (self->waiting); } } // --------------------------------------------------------------------- // 定位或创建新的服务项 static service_t * s_service_require (broker_t *self, zframe_t *service_frame) { assert (service_frame); char *name = zframe_strdup (service_frame); service_t *service = (service_t *) zhash_lookup (self->services, name); if (service == NULL) { service = (service_t *) zmalloc (sizeof (service_t)); service->name = name; service->requests = zlist_new (); service->waiting = zlist_new (); zhash_insert (self->services, name, service); zhash_freefn (self->services, name, s_service_destroy); if (self->verbose) zclock_log ("I: 收到消息:"); } else free (name); return service; } // --------------------------------------------------------------------- // 当服务从broker->services中移除时销毁该服务对象 static void s_service_destroy (void *argument) { service_t *service = (service_t *) argument; // 销毁请求队列中的所有项目 while (zlist_size (service->requests)) { zmsg_t *msg = zlist_pop (service->requests); zmsg_destroy (&msg); } zlist_destroy (&service->requests); zlist_destroy (&service->waiting); free (service->name); free (service); } // --------------------------------------------------------------------- // 可能时,分发请求给等待中的worker static void s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg) { assert (service); if (msg) // 将消息加入队列 zlist_append (service->requests, msg); s_broker_purge_workers (self); while (zlist_size (service->waiting) && zlist_size (service->requests)) { worker_t *worker = zlist_pop (service->waiting); zlist_remove (self->waiting, worker); zmsg_t *msg = zlist_pop (service->requests); s_worker_send (self, worker, MDPW_REQUEST, NULL, msg); zmsg_destroy (&msg); } } // --------------------------------------------------------------------- // 使用8/MMI协定处理内部服务 static void s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg) { char *return_code; if (zframe_streq (service_frame, "mmi.service")) { char *name = zframe_strdup (zmsg_last (msg)); service_t *service = (service_t *) zhash_lookup (self->services, name); return_code = service && service->workers? "200": "404"; free (name); } else return_code = "501"; zframe_reset (zmsg_last (msg), return_code, strlen (return_code)); // 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封 zframe_t *client = zmsg_unwrap (msg); zmsg_push (msg, zframe_dup (service_frame)); zmsg_pushstr (msg, MDPC_CLIENT); zmsg_wrap (msg, client); zmsg_send (&msg, self->socket); } // --------------------------------------------------------------------- // 按需创建worker static worker_t * s_worker_require (broker_t *self, zframe_t *address) { assert (address); // self->workers使用wroker的标识为键 char *identity = zframe_strhex (address); worker_t *worker = (worker_t *) zhash_lookup (self->workers, identity); if (worker == NULL) { worker = (worker_t *) zmalloc (sizeof (worker_t)); worker->identity = identity; worker->address = zframe_dup (address); zhash_insert (self->workers, identity, worker); zhash_freefn (self->workers, identity, s_worker_destroy); if (self->verbose) zclock_log ("I: 正在注册新的worker: %s", identity); } else free (identity); return worker; } // --------------------------------------------------------------------- // 从所有数据结构中删除wroker,并销毁worker对象 static void s_worker_delete (broker_t *self, worker_t *worker, int disconnect) { assert (worker); if (disconnect) s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL); if (worker->service) { zlist_remove (worker->service->waiting, worker); worker->service->workers--; } zlist_remove (self->waiting, worker); // 以下方法间接调用了s_worker_destroy()方法 zhash_delete (self->workers, worker->identity); } // --------------------------------------------------------------------- // 当worker从broker->workers中移除时,销毁worker对象 static void s_worker_destroy (void *argument) { worker_t *worker = (worker_t *) argument; zframe_destroy (&worker->address); free (worker->identity); free (worker); } // --------------------------------------------------------------------- // 处理worker发送来的消息 static void s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg) { assert (zmsg_size (msg) >= 1); // 消息中至少包含命令帧 zframe_t *command = zmsg_pop (msg); char *identity = zframe_strhex (sender); int worker_ready = (zhash_lookup (self->workers, identity) != NULL); free (identity); worker_t *worker = s_worker_require (self, sender); if (zframe_streq (command, MDPW_READY)) { // 若worker队列中已有该worker,但仍收到了它的“已就绪”消息,则删除这个worker。 if (worker_ready) s_worker_delete (self, worker, 1); else if (zframe_size (sender) >= 4 // 服务名称为保留的服务 && memcmp (zframe_data (sender), "mmi.", 4) == 0) s_worker_delete (self, worker, 1); else { // 将worker对应到服务,并置为空闲状态 zframe_t *service_frame = zmsg_pop (msg); worker->service = s_service_require (self, service_frame); worker->service->workers++; s_worker_waiting (self, worker); zframe_destroy (&service_frame); } } else if (zframe_streq (command, MDPW_REPLY)) { if (worker_ready) { // 移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封 zframe_t *client = zmsg_unwrap (msg); zmsg_pushstr (msg, worker->service->name); zmsg_pushstr (msg, MDPC_CLIENT); zmsg_wrap (msg, client); zmsg_send (&msg, self->socket); s_worker_waiting (self, worker); } else s_worker_delete (self, worker, 1); } else if (zframe_streq (command, MDPW_HEARTBEAT)) { if (worker_ready) worker->expiry = zclock_time () + HEARTBEAT_EXPIRY; else s_worker_delete (self, worker, 1); } else if (zframe_streq (command, MDPW_DISCONNECT)) s_worker_delete (self, worker, 0); else { zclock_log ("E: 非法消息"); zmsg_dump (msg); } free (command); zmsg_destroy (&msg); } // --------------------------------------------------------------------- // 发送消息给worker // 如果指针指向了一条消息,发送它,但不销毁它,因为这是调用者的工作 static void s_worker_send (broker_t *self, worker_t *worker, char *command, char *option, zmsg_t *msg) { msg = msg? zmsg_dup (msg): zmsg_new (); // 将协议信封压入消息顶部 if (option) zmsg_pushstr (msg, option); zmsg_pushstr (msg, command); zmsg_pushstr (msg, MDPW_WORKER); // 在消息顶部插入路由帧 zmsg_wrap (msg, zframe_dup (worker->address)); if (self->verbose) { zclock_log ("I: 正在发送消息给worker %s", mdps_commands [(int) *command]); zmsg_dump (msg); } zmsg_send (&msg, self->socket); } // --------------------------------------------------------------------- // 正在等待的worker static void s_worker_waiting (broker_t *self, worker_t *worker) { // 将worker加入代理和服务的等待队列 zlist_append (self->waiting, worker); zlist_append (worker->service->waiting, worker); worker->expiry = zclock_time () + HEARTBEAT_EXPIRY; s_service_dispatch (self, worker->service, NULL); } // --------------------------------------------------------------------- // 处理client发来的请求 static void s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg) { assert (zmsg_size (msg) >= 2); // 服务名称 + 请求内容 zframe_t *service_frame = zmsg_pop (msg); service_t *service = s_service_require (self, service_frame); // 为应答内容设置请求方的地址 zmsg_wrap (msg, zframe_dup (sender)); if (zframe_size (service_frame) >= 4 && memcmp (zframe_data (service_frame), "mmi.", 4) == 0) s_service_internal (self, service_frame, msg); else s_service_dispatch (self, service, msg); zframe_destroy (&service_frame); }
这个例子应该是我们见过最复杂的一个示例了,大约有500行代码。编写这段代码并让其变的健壮,大约花费了两天的时间。但是,这也仅仅是一个完整的面向服务代理的一部分。
几点说明:
1、管家模式协议要求我们在一个套接字中同时处理client和worker,这一点对部署和管理代理很有益处:它只会在一个ZMQ端点上收发请求,而不是两个。
2、代理很好地实现了MDP/0.1协议中规范的内容,包括当代理发送非法命令和心跳时断开的机制。
3、可以将这段代码扩充为多线程,每个线程管理一个套接字、一组client和worker。这种做法在大型架构的拆分中显得很有趣。C语言代码已经是这样的格式了,因此很容易实现。
4、还可以将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从本质上来说,代理是无状态的,只是保存了服务的存在与否,因此client和worker可以自行选择除此之外的代理来进行通信。
5、示例代码中心跳的间隔为5秒,主要是为了减少调试时的输出。现实中的值应该设得低一些,但是,重试的过程应该设置得稍长一些,让服务有足够的时间启动,如10秒钟。