ZMQ之面向服务的可靠队列(管家模式)

简介: ZMQ之面向服务的可靠队列(管家模式)

ZMQ之面向服务的可靠队列(管家模式)


管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。

       引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。

在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。

       所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。

       管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的ZFL library。

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
zmsg_t  *mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);

就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。

mdwrk_t *mdwrk_new     (char *broker,char *service);
void     mdwrk_destroy (mdwrk_t **self_p);
zmsg_t  *mdwrk_recv    (mdwrk_t *self, zmsg_t *reply);

上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。

       两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:

       mdcliapi: Majordomo client API in C

/*  =====================================================================
    mdcliapi.c
    Majordomo Protocol Client API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
#include "mdcliapi.h"
//  类结构
//  我们会通过成员方法来访问这些属性
struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印当前活动
    int timeout;                //  请求超时时间
    int retries;                //  请求重试次数
};
//  ---------------------------------------------------------------------
//  连接或重连代理
void s_mdcli_connect_to_broker (mdcli_t *self)
{
    if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_REQ);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接至代理 %s...", self->broker);
}
//  ---------------------------------------------------------------------
//  构造函数
mdcli_t *
mdcli_new (char *broker, int verbose)
{
    assert (broker);
    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
    self->retries = 3;              //  尝试次数
    s_mdcli_connect_to_broker (self);
    return self;
}
//  ---------------------------------------------------------------------
//  析构函数
void
mdcli_destroy (mdcli_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  设定请求超时时间
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
    assert (self);
    self->timeout = timeout;
}
//  ---------------------------------------------------------------------
//  设定请求重试次数
void
mdcli_set_retries (mdcli_t *self, int retries)
{
    assert (self);
    self->retries = retries;
}
//  ---------------------------------------------------------------------
//  向代理发送请求,并尝试获取应答;
//  对消息保持所有权,发送后销毁;
//  返回应答消息,或NULL。
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
    assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;
    //  用协议前缀包装消息
    //  Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    //  Frame 2: 服务名称 (可打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    if (self->verbose) {
        zclock_log ("I: 发送请求给 '%s' 服务:", service);
        zmsg_dump (request);
    }
    int retries_left = self->retries;
    while (retries_left && !zctx_interrupted) {
        zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->client);
        while (TRUE) {
            //  轮询套接字以接收应答,有超时时间
            zmq_pollitem_t items [] = {
                { self->client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
            //  收到应答后进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (self->client);
                if (self->verbose) {
                    zclock_log ("I: received reply:");
                    zmsg_dump (msg);
                }
                //  不要尝试处理错误,直接报错即可
                assert (zmsg_size (msg) >= 3);
                zframe_t *header = zmsg_pop (msg);
                assert (zframe_streq (header, MDPC_CLIENT));
                zframe_destroy (&header);
                zframe_t *reply_service = zmsg_pop (msg);
                assert (zframe_streq (reply_service, service));
                zframe_destroy (&reply_service);
                zmsg_destroy (&request);
                return msg;     //  成功
            }
            else
            if (--retries_left) {
                if (self->verbose)
                    zclock_log ("W: no reply, reconnecting...");
                //  重连并重发消息
                s_mdcli_connect_to_broker (self);
                zmsg_t *msg = zmsg_dup (request);
                zmsg_send (&msg, self->client);
            }
            else {
                if (self->verbose)
                    zclock_log ("W: 发生严重错误,放弃重试。");
                break;          //  放弃
            }
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,结束client进程...\n");
    zmsg_destroy (&request);
    return NULL;
}

以下测试程序会执行10万次请求应答:

       mdclient: Majordomo client application in C

//
//  管家模式协议 - 客户端示例
//  使用mdcli API隐藏管家模式协议的内部实现
//
//  让我们直接编译这段代码,不生成类库
#include "mdcliapi.c"
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        zmsg_t *reply = mdcli_send (session, "echo", &request);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  中断或停止
    }
    printf ("已处理 %d 次请求-应答\n", count);
    mdcli_destroy (&session);
    return 0;
}

下面是worker的API:

       mdwrkapi: Majordomo worker API in C

/*  =====================================================================
    mdwrkapi.c
    Majordomo Protocol Worker API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
#include "mdwrkapi.h"
//  可靠性参数
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
//  类结构
//  使用成员函数访问属性
struct _mdwrk_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    char *service;
    void *worker;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印活动
    //  心跳设置
    uint64_t heartbeat_at;      //  发送心跳的时间
    size_t liveness;            //  尝试次数
    int heartbeat;              //  心跳延时,单位:毫秒
    int reconnect;              //  重连延时,单位:毫秒
    //  内部状态
    int expect_reply;           //  初始值为0
    //  应答地址,如果存在的话
    zframe_t *reply_to;
};
//  ---------------------------------------------------------------------
//  发送消息给代理
//  如果没有提供消息,则内部创建一个
static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
                        zmsg_t *msg)
{
    msg = msg? zmsg_dup (msg): zmsg_new ();
    //  将协议信封压入消息顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
    zmsg_pushstr (msg, "");
    if (self->verbose) {
        zclock_log ("I: sending %s to broker",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->worker);
}
//  ---------------------------------------------------------------------
//  连接或重连代理
void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
    if (self->worker)
        zsocket_destroy (self->ctx, self->worker);
    self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->worker, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);
    //  向代理注册服务类型
    s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
    //  当心跳健康度为零,表示代理已断开连接
    self->liveness = HEARTBEAT_LIVENESS;
    self->heartbeat_at = zclock_time () + self->heartbeat;
}
//  ---------------------------------------------------------------------
//  构造函数
mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
    assert (broker);
    assert (service);
    mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->service = strdup (service);
    self->verbose = verbose;
    self->heartbeat = 2500;     //  毫秒
    self->reconnect = 2500;     //  毫秒
    s_mdwrk_connect_to_broker (self);
    return self;
}
//  ---------------------------------------------------------------------
//  析构函数
void
mdwrk_destroy (mdwrk_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdwrk_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self->service);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  设置心跳延迟
void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
    self->heartbeat = heartbeat;
}
//  ---------------------------------------------------------------------
//  设置重连延迟
void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
    self->reconnect = reconnect;
}
//  ---------------------------------------------------------------------
//  若有应答则发送给代理,并等待新的请求
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
    //  格式化并发送请求传入的应答
    assert (reply_p);
    zmsg_t *reply = *reply_p;
    assert (reply || !self->expect_reply);
    if (reply) {
        assert (self->reply_to);
        zmsg_wrap (reply, self->reply_to);
        s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
        zmsg_destroy (reply_p);
    }
    self->expect_reply = 1;
    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->worker);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 从代理处获得消息:");
                zmsg_dump (msg);
            }
            self->liveness = HEARTBEAT_LIVENESS;
            //  不要处理错误,直接报错即可
            assert (zmsg_size (msg) >= 3);
            zframe_t *empty = zmsg_pop (msg);
            assert (zframe_streq (empty, ""));
            zframe_destroy (&empty);
            zframe_t *header = zmsg_pop (msg);
            assert (zframe_streq (header, MDPW_WORKER));
            zframe_destroy (&header);
            zframe_t *command = zmsg_pop (msg);
            if (zframe_streq (command, MDPW_REQUEST)) {
                //  这里需要将消息中空帧之前的所有地址都保存起来,
                //  但在这里我们暂时只保存一个
                self->reply_to = zmsg_unwrap (msg);
                zframe_destroy (&command);
                return msg;     //  处理请求
            }
            else
            if (zframe_streq (command, MDPW_HEARTBEAT))
                ;               //  不对心跳做任何处理
            else
            if (zframe_streq (command, MDPW_DISCONNECT))
                s_mdwrk_connect_to_broker (self);
            else {
                zclock_log ("E: 消息不合法");
                zmsg_dump (msg);
            }
            zframe_destroy (&command);
            zmsg_destroy (&msg);
        }
        else
        if (--self->liveness == 0) {
            if (self->verbose)
                zclock_log ("W: 失去与代理的连接 - 正在重试...");
            zclock_sleep (self->reconnect);
            s_mdwrk_connect_to_broker (self);
        }
        //  适时地发送消息
        if (zclock_time () > self->heartbeat_at) {
            s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
            self->heartbeat_at = zclock_time () + self->heartbeat;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,中止worker...\n");
    return NULL;
}

以下测试程序实现了名为echo的服务:

       mdworker: Majordomo worker application in C

//
//  管家模式协议 - worker示例
//  使用mdwrk API隐藏MDP协议的内部实现
//
//  让我们直接编译代码,而不创建类库
#include "mdwrkapi.c"
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdwrk_t *session = mdwrk_new (
        "tcp://localhost:5555", "echo", verbose);
    zmsg_t *reply = NULL;
    while (1) {
        zmsg_t *request = mdwrk_recv (session, &reply);
        if (request == NULL)
            break;              //  worker被中止
        reply = request;        //  echo服务……其实很复杂:)
    }
    mdwrk_destroy (&session);
    return 0;
}

几点说明:

               1、API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。

               2、wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。

               3、API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。

       也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。

       下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。

       为了让C语言代码更为易读易写,我使用了ZFL项目提供的哈希和链表容器,并命名为zhash和zlist。如果使用现代语言编写,那自然可以使用其内置的容器。

//
//  管家模式协议 - 代理
//  协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
//
#include "czmq.h"
#include "mdp.h"
//  一般我们会从配置文件中获取以下值
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  2500    //  单位:毫秒
#define HEARTBEAT_EXPIRY    HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
//  定义一个代理
typedef struct {
    zctx_t *ctx;                //  上下文
    void *socket;               //  用于连接client和worker的套接字
    int verbose;                //  使用标准输出打印活动信息
    char *endpoint;             //  代理绑定到的端点
    zhash_t *services;          //  已知服务的哈希表
    zhash_t *workers;           //  已知worker的哈希表
    zlist_t *waiting;           //  正在等待的worker队列
    uint64_t heartbeat_at;      //  发送心跳的时间
} broker_t;
//  定义一个服务
typedef struct {
    char *name;                 //  服务名称
    zlist_t *requests;          //  客户端请求队列
    zlist_t *waiting;           //  正在等待的worker队列
    size_t workers;             //  可用worker数
} service_t;
//  定义一个worker,状态为空闲或占用
typedef struct {
    char *identity;             //  worker的标识
    zframe_t *address;          //  地址帧
    service_t *service;         //  所属服务
    int64_t expiry;             //  过期时间,从未收到心跳起计时
} worker_t;
//  ---------------------------------------------------------------------
//  代理使用的函数
static broker_t *
    s_broker_new (int verbose);
static void
    s_broker_destroy (broker_t **self_p);
static void
    s_broker_bind (broker_t *self, char *endpoint);
static void
    s_broker_purge_workers (broker_t *self);
//  服务使用的函数
static service_t *
    s_service_require (broker_t *self, zframe_t *service_frame);
static void
    s_service_destroy (void *argument);
static void
    s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
    s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
//  worker使用的函数
static worker_t *
    s_worker_require (broker_t *self, zframe_t *address);
static void
    s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
    s_worker_destroy (void *argument);
static void
    s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
    s_worker_send (broker_t *self, worker_t *worker, char *command,
                   char *option, zmsg_t *msg);
static void
    s_worker_waiting (broker_t *self, worker_t *worker);
//  客户端使用的函数
static void
    s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
//  ---------------------------------------------------------------------
//  主程序
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    broker_t *self = s_broker_new (verbose);
    s_broker_bind (self, "tcp://*:5555");
    //  接受并处理消息,直至程序被中止
    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->socket,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
        //  Process next input message, if any
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->socket);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 收到消息:");
                zmsg_dump (msg);
            }
            zframe_t *sender = zmsg_pop (msg);
            zframe_t *empty  = zmsg_pop (msg);
            zframe_t *header = zmsg_pop (msg);
            if (zframe_streq (header, MDPC_CLIENT))
                s_client_process (self, sender, msg);
            else
            if (zframe_streq (header, MDPW_WORKER))
                s_worker_process (self, sender, msg);
            else {
                zclock_log ("E: 非法消息:");
                zmsg_dump (msg);
                zmsg_destroy (&msg);
            }
            zframe_destroy (&sender);
            zframe_destroy (&empty);
            zframe_destroy (&header);
        }
        //  断开并删除过期的worker
        //  适时地发送心跳给worker
        if (zclock_time () > self->heartbeat_at) {
            s_broker_purge_workers (self);
            worker_t *worker = (worker_t *) zlist_first (self->waiting);
            while (worker) {
                s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
                worker = (worker_t *) zlist_next (self->waiting);
            }
            self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,关闭中...\n");
    s_broker_destroy (&self);
    return 0;
}
//  ---------------------------------------------------------------------
//  代理对象的构造函数
static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    //  初始化代理状态
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}
//  ---------------------------------------------------------------------
//  代理对象的析构函数
static void
s_broker_destroy (broker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        broker_t *self = *self_p;
        zctx_destroy (&self->ctx);
        zhash_destroy (&self->services);
        zhash_destroy (&self->workers);
        zlist_destroy (&self->waiting);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  将代理套接字绑定至端点,可以重复调用该函数
//  我们使用一个套接字来同时处理client和worker
void
s_broker_bind (broker_t *self, char *endpoint)
{
    zsocket_bind (self->socket, endpoint);
    zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}
//  ---------------------------------------------------------------------
//  删除空闲状态中过期的worker
static void
s_broker_purge_workers (broker_t *self)
{
    worker_t *worker = (worker_t *) zlist_first (self->waiting);
    while (worker) {
        if (zclock_time () < worker->expiry)
            continue;              //  该worker未过期,停止搜索
        if (self->verbose)
            zclock_log ("I: 正在删除过期的worker: %s",
                worker->identity);
        s_worker_delete (self, worker, 0);
        worker = (worker_t *) zlist_first (self->waiting);
    }
}
//  ---------------------------------------------------------------------
//  定位或创建新的服务项
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
    assert (service_frame);
    char *name = zframe_strdup (service_frame);
    service_t *service =
        (service_t *) zhash_lookup (self->services, name);
    if (service == NULL) {
        service = (service_t *) zmalloc (sizeof (service_t));
        service->name = name;
        service->requests = zlist_new ();
        service->waiting = zlist_new ();
        zhash_insert (self->services, name, service);
        zhash_freefn (self->services, name, s_service_destroy);
        if (self->verbose)
            zclock_log ("I: 收到消息:");
    }
    else
        free (name);
    return service;
}
//  ---------------------------------------------------------------------
//  当服务从broker->services中移除时销毁该服务对象
static void
s_service_destroy (void *argument)
{
    service_t *service = (service_t *) argument;
    //  销毁请求队列中的所有项目
    while (zlist_size (service->requests)) {
        zmsg_t *msg = zlist_pop (service->requests);
        zmsg_destroy (&msg);
    }
    zlist_destroy (&service->requests);
    zlist_destroy (&service->waiting);
    free (service->name);
    free (service);
}
//  ---------------------------------------------------------------------
//  可能时,分发请求给等待中的worker
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{
    assert (service);
    if (msg)                    //  将消息加入队列
        zlist_append (service->requests, msg);
    s_broker_purge_workers (self);
    while (zlist_size (service->waiting)
        && zlist_size (service->requests))
    {
        worker_t *worker = zlist_pop (service->waiting);
        zlist_remove (self->waiting, worker);
        zmsg_t *msg = zlist_pop (service->requests);
        s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
        zmsg_destroy (&msg);
    }
}
//  ---------------------------------------------------------------------
//  使用8/MMI协定处理内部服务
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
    char *return_code;
    if (zframe_streq (service_frame, "mmi.service")) {
        char *name = zframe_strdup (zmsg_last (msg));
        service_t *service =
            (service_t *) zhash_lookup (self->services, name);
        return_code = service && service->workers? "200": "404";
        free (name);
    }
    else
        return_code = "501";
    zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
    //  移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
    zframe_t *client = zmsg_unwrap (msg);
    zmsg_push (msg, zframe_dup (service_frame));
    zmsg_pushstr (msg, MDPC_CLIENT);
    zmsg_wrap (msg, client);
    zmsg_send (&msg, self->socket);
}
//  ---------------------------------------------------------------------
//  按需创建worker
static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
    assert (address);
    //  self->workers使用wroker的标识为键
    char *identity = zframe_strhex (address);
    worker_t *worker =
        (worker_t *) zhash_lookup (self->workers, identity);
    if (worker == NULL) {
        worker = (worker_t *) zmalloc (sizeof (worker_t));
        worker->identity = identity;
        worker->address = zframe_dup (address);
        zhash_insert (self->workers, identity, worker);
        zhash_freefn (self->workers, identity, s_worker_destroy);
        if (self->verbose)
            zclock_log ("I: 正在注册新的worker: %s", identity);
    }
    else
        free (identity);
    return worker;
}
//  ---------------------------------------------------------------------
//  从所有数据结构中删除wroker,并销毁worker对象
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{
    assert (worker);
    if (disconnect)
        s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);
    if (worker->service) {
        zlist_remove (worker->service->waiting, worker);
        worker->service->workers--;
    }
    zlist_remove (self->waiting, worker);
    //  以下方法间接调用了s_worker_destroy()方法
    zhash_delete (self->workers, worker->identity);
}
//  ---------------------------------------------------------------------
//  当worker从broker->workers中移除时,销毁worker对象
static void
s_worker_destroy (void *argument)
{
    worker_t *worker = (worker_t *) argument;
    zframe_destroy (&worker->address);
    free (worker->identity);
    free (worker);
}
//  ---------------------------------------------------------------------
//  处理worker发送来的消息
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
    assert (zmsg_size (msg) >= 1);     //  消息中至少包含命令帧
    zframe_t *command = zmsg_pop (msg);
    char *identity = zframe_strhex (sender);
    int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
    free (identity);
    worker_t *worker = s_worker_require (self, sender);
    if (zframe_streq (command, MDPW_READY)) {
        //  若worker队列中已有该worker,但仍收到了它的“已就绪”消息,则删除这个worker。
        if (worker_ready)
            s_worker_delete (self, worker, 1);
        else
        if (zframe_size (sender) >= 4  //  服务名称为保留的服务
        &&  memcmp (zframe_data (sender), "mmi.", 4) == 0)
            s_worker_delete (self, worker, 1);
        else {
            //  将worker对应到服务,并置为空闲状态
            zframe_t *service_frame = zmsg_pop (msg);
            worker->service = s_service_require (self, service_frame);
            worker->service->workers++;
            s_worker_waiting (self, worker);
            zframe_destroy (&service_frame);
        }
    }
    else
    if (zframe_streq (command, MDPW_REPLY)) {
        if (worker_ready) {
            //  移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
            zframe_t *client = zmsg_unwrap (msg);
            zmsg_pushstr (msg, worker->service->name);
            zmsg_pushstr (msg, MDPC_CLIENT);
            zmsg_wrap (msg, client);
            zmsg_send (&msg, self->socket);
            s_worker_waiting (self, worker);
        }
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_HEARTBEAT)) {
        if (worker_ready)
            worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_DISCONNECT))
        s_worker_delete (self, worker, 0);
    else {
        zclock_log ("E: 非法消息");
        zmsg_dump (msg);
    }
    free (command);
    zmsg_destroy (&msg);
}
//  ---------------------------------------------------------------------
//  发送消息给worker
//  如果指针指向了一条消息,发送它,但不销毁它,因为这是调用者的工作
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
               char *option, zmsg_t *msg)
{
    msg = msg? zmsg_dup (msg): zmsg_new ();
    //  将协议信封压入消息顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
    //  在消息顶部插入路由帧
    zmsg_wrap (msg, zframe_dup (worker->address));
    if (self->verbose) {
        zclock_log ("I: 正在发送消息给worker %s",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->socket);
}
//  ---------------------------------------------------------------------
//  正在等待的worker
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
    //  将worker加入代理和服务的等待队列
    zlist_append (self->waiting, worker);
    zlist_append (worker->service->waiting, worker);
    worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
    s_service_dispatch (self, worker->service, NULL);
}
//  ---------------------------------------------------------------------
//  处理client发来的请求
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
    assert (zmsg_size (msg) >= 2);     //  服务名称 + 请求内容
    zframe_t *service_frame = zmsg_pop (msg);
    service_t *service = s_service_require (self, service_frame);
    //  为应答内容设置请求方的地址
    zmsg_wrap (msg, zframe_dup (sender));
    if (zframe_size (service_frame) >= 4
    &&  memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
        s_service_internal (self, service_frame, msg);
    else
        s_service_dispatch (self, service, msg);
    zframe_destroy (&service_frame);
}

这个例子应该是我们见过最复杂的一个示例了,大约有500行代码。编写这段代码并让其变的健壮,大约花费了两天的时间。但是,这也仅仅是一个完整的面向服务代理的一部分。

       几点说明:

               1、管家模式协议要求我们在一个套接字中同时处理client和worker,这一点对部署和管理代理很有益处:它只会在一个ZMQ端点上收发请求,而不是两个。

               2、代理很好地实现了MDP/0.1协议中规范的内容,包括当代理发送非法命令和心跳时断开的机制。

               3、可以将这段代码扩充为多线程,每个线程管理一个套接字、一组client和worker。这种做法在大型架构的拆分中显得很有趣。C语言代码已经是这样的格式了,因此很容易实现。

               4、还可以将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从本质上来说,代理是无状态的,只是保存了服务的存在与否,因此client和worker可以自行选择除此之外的代理来进行通信。

               5、示例代码中心跳的间隔为5秒,主要是为了减少调试时的输出。现实中的值应该设得低一些,但是,重试的过程应该设置得稍长一些,让服务有足够的时间启动,如10秒钟。

相关文章
|
消息中间件 Java Kafka
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享 分享人:阿里云MVP曾宪宇,2014开始 就职于启明信息,负责车联网平台的架构和建设,坐标吉林长春。 分享内容:结合主流MQ,介绍一款基于Java的开源消息队列客户端框架。
2984 0
一款消息队列的客户端框架——启明信息车联网MQ演进实践分享
|
25天前
|
中间件 数据库
MQ如何保障发送消息可靠
简易做法包括开启生产者重试及确认机制。更可靠但复杂的方案则涉及将消息存入数据库,通过状态码管理发送状态,结合定时任务检查并重发未成功发送的消息,同时利用确认回调确保消息发送成功。
47 4
|
6月前
|
消息中间件 存储 监控
中间件消息队列协议的缓冲能力
【6月更文挑战第5天】
30 2
|
6月前
|
消息中间件 网络协议
【消息队列开发】 设计网络通信协议
【消息队列开发】 设计网络通信协议
|
7月前
|
消息中间件 缓存 API
|
7月前
|
消息中间件 网络协议 Ubuntu
实现高效消息传递:使用RabbitMQ构建可复用的企业级消息系统
RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
|
消息中间件 存储 监控
消息队列设计:构建高效可靠的分布式系统
在现代分布式系统中,消息队列被广泛应用于解耦和异步处理。它提供了高可靠性、高性能的消息传递机制,使得系统组件之间可以松耦合地协作。在本文中,我们将探讨如何设计一个高效可靠的消息队列系统。 架构设计原则
183 0
|
消息中间件 网络协议 中间件
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
|
算法 网络协议 前端开发
ZMQ之异步管家模式
ZMQ之异步管家模式