ZMQ之克隆模式的可靠性

简介: ZMQ之克隆模式的可靠性

ZMQ之克隆模式的可靠性


克隆服务器的可靠性

       克隆模型1至5相对比较简单,下面我们会探讨一个非常复杂的模型。可以发现,为了构建可靠的消息队列,我们需要花费非常多的精力。所以我们经常会问:有必要这么做吗?如果说你能够接受可靠性不够高的、或者说已经足够好的架构,那恭喜你,你在成本和收益之间找到了平衡。虽然我们会偶尔丢失一些消息,但从经济的角度来说还是合理的。不管怎样,下面我们就来介绍这个复杂的模型。

       在模型3中,你会关闭和重启服务,这会导致数据的丢失。任何后续加入的客户端只能得到重启之后的那些数据,而非所有的。下面就让我们想办法让克隆模式能够承担服务器重启的故障。

       以下列举我们需要处理的问题:

               1、克隆服务器进程崩溃并自动或手工重启。进程丢失了所有数据,所以必须从别处进行恢复。

               2、克隆服务器硬件故障,长时间不能恢复。客户端需要切换至另一个可用的服务端。

               3、克隆服务器从网络上断开,如交换机发生故障等。它会在某个时点重连,但期间的数据就需要替代的服务器负责处理。

       第一步我们需要增加一个服务器。我们可以使用第四章中提到的双子星模式,它是一个反应堆,而我们的程序经过整理后也是一个反应堆,因此可以互相协作。

       我们需要保证更新事件在主服务器崩溃时仍能保留,最简单的机制就是同时发送给两台服务器。

       备机就可以当做一台客户端来运行,像其他客户端一样从主机获取更新事件。同时它又能从客户端获取更新事件——虽然不应该以此更新数据,但可以先暂存起来。

       所以,相较于模型5,模型6中引入了以下特性:

               1、客户端发送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字会在没有接收方时阻塞,且会进行负载均衡——我们需要两台服务器都接收到消息。我们会在服务器端绑定SUB套接字,在客户端连接PUB套接字。

               2、我们在服务器发送给客户端的更新事件中加入心跳,这样客户端可以知道主机是否已死,然后切换至备机。

               3、我们使用双子星模式的bstar反应堆类来创建主机和备机。双子星模式中需要有一个“投票”套接字,来协助判定对方节点是否已死。这里我们使用快照请求来作为“投票”。

               4、我们将为所有的更新事件添加UUID属性,它由客户端生成,服务端会将其发布给所有客户端。

               5、备机将维护一个“待处理列表”,保存来自客户端、尚未由服务端发布的更新事件;或者反过来,来自服务端、尚未从客户端收到的更新事件。这个列表从旧到新排列,这样就能方便地从顶部删除消息。

       我们可以为客户端设计一个有限状态机,它有三种状态:

               1、客户端打开并连接了套接字,然后向服务端发送快照请求。为了避免消息风暴,它只会请求两次。

               2、客户端等待快照应答,如果获得了则保存它;如果没有获得,则向第二个服务器发送请求。

               3、客户端收到快照,便开始等待更新事件。如果在一定时间内没有收到服务端响应,则会连接第二个服务端。

       客户端会一直循环下去,可能在程序刚启动时,部分客户端会试图连接主机,部分连接备机,相信双子星模式会很好地处理这一情况的。

       我们可以将客户端状态图绘制出来:

故障恢复的步骤如下:

               1、客户端检测到主机不再发送心跳,因此转而连接备机,并请求一份新的快照;

               2、备机开始接收快照请求,并检测到主机死亡,于是开始作为主机运行;

               3、备机将待处理列表中的更新事件写入自身状态中,然后开始处理快照请求。

       当主机恢复连接时:

               1、启动为slave状态,并作为克隆模式客户端连接备机;

               2、同时,使用SUB套接字从客户端接收更新事件。

       我们做两点假设:

               1、至少有一台主机会继续运行。如果两台主机都崩溃了,那我们将丢失所有的服务端数据,无法恢复。

               2、不同的客户端不会同时更新同一个键值对。客户端的更新事件会先后到达两个服务器,因此更新的顺序可能会不一致。单个客户端的更新事件到达两台服务器的顺序是相同的,所以不用担心。

       下面是整体架构图:


开始编程之前,我们需要将客户端重构成一个可复用的类。在ZMQ中写异步类有时是为了练习如何写出优雅的代码,但这里我们确实是希望克隆模式可以成为一种易于使用的程序。上述架构的伸缩性来源于客户端的正确行为,因此有必要将其封装成一份API。要在客户端中进行故障恢复还是比较复杂的,试想一下自由者模式和克隆模式结合起来会是什么样的吧。

       按照我的习惯,我会先写出一份API的列表,然后加以实现。让我们假想一个名为clone的API,在其基础之上编写克隆模式客户端API。将代码封装为API显然会提升代码的稳定性,就以模型5为例,客户端需要打开三个套接字,端点名称直接写在了代码里。我们可以创建这样一组API:

    //  为每个套接字指定端点
    clone_subscribe (clone, "tcp://localhost:5556");
    clone_snapshot  (clone, "tcp://localhost:5557");
    clone_updates   (clone, "tcp://localhost:5558");
    //  由于有两个服务端,因此再执行一次
    clone_subscribe (clone, "tcp://localhost:5566");
    clone_snapshot  (clone, "tcp://localhost:5567");
    clone_updates   (clone, "tcp://localhost:5568");

但这种写法还是比较啰嗦的,因为没有必要将API内部的一些设计暴露给编程人员。现在我们会使用三个套接字,而将来可能就会使用两个,或者四个。我们不可能让所有的应用程序都相应地修改吧?让我们把这些信息包装到API中:

    //  指定主备服务器端点
    clone_connect (clone, "tcp://localhost:5551");
    clone_connect (clone, "tcp://localhost:5561");

这样一来代码就变得非常简洁,不过也会对现有代码的内部就够造成影响。我们需要从一个端点中推算出三个端点。一种方法是假设客户端和服务端使用三个连续的端点通信,并将这个规则写入协议;另一个方法是向服务器索取缺少的端点信息。我们使用第一种较为简单的方法:

               1、服务器状态ROUTER在端点P;

               2、服务器更新事件PUB在端点P + 1;

               3、服务器更新事件SUB在端点P + 2。

       clone类和第四章的flcliapi类很类似,由两部分组成:

               1、一个在后台运行的异步克隆模式代理。该代理处理所有的I/O操作,实时地和服务器进行通信;

               2、一个在前台应用程序中同步运行的clone类。当你创建了一个clone对象后,它会自动创建后台的clone线程;当你销毁clone对象,该后台线程也会被销毁。

       前台的clone类会使用inproc管道和后台的代理进行通信。C语言中,czmq线程会自动为我们创建这个管道。这也是ZMQ多线程编程的常规方式。

       如果没有ZMQ,这种异步的设计将很难处理高压工作,而ZMQ会让其变得简单。编写出来额代码会相对比较复杂。我们可以用反应堆的模式来编写,但这会进一步增加复杂度,且影响应用程序的使用。因此,我们的设计的API将更像是一个能够和服务器进行通信的键值表:

clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);

下面就是克隆模式客户端模型6的代码,因为调用了API,所以非常简短:

       clonecli6: Clone client, Model Six in C

//
//  克隆模式 - 客户端 - 模型6
//
//  直接编译,不建类库
#include "clone.c"
#define SUBTREE "/client/"
int main (void)
{
    //  创建分布式哈希表
    clone_t *clone = clone_new ();
    //  配置
    clone_subtree (clone, SUBTREE);
    clone_connect (clone, "tcp://localhost", "5556");
    clone_connect (clone, "tcp://localhost", "5566");
    //  插入随机键值
    while (!zctx_interrupted) {
        //  生成随机值
        char key [255];
        char value [10];
        sprintf (key, "%s%d", SUBTREE, randof (10000));
        sprintf (value, "%d", randof (1000000));
        clone_set (clone, key, value, randof (30));
        sleep (1);
    }
    clone_destroy (&clone);
    return 0;
}

以下是clone类的实现:

       clone: Clone class in C

/*  =====================================================================
    clone - client-side Clone Pattern class
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
#include "clone.h"
//  请求超时时间
#define GLOBAL_TIMEOUT  4000    //  msecs
//  判定服务器死亡的时间
#define SERVER_TTL      5000    //  msecs
//  服务器数量
#define SERVER_MAX      2
//  =====================================================================
//  同步部分,在应用程序线程中工作
//  ---------------------------------------------------------------------
//  类结构
struct _clone_t {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  和后台代理间的通信套接字
};
//  该线程用于处理真正的clone类
static void clone_agent (void *args, zctx_t *ctx, void *pipe);
//  ---------------------------------------------------------------------
//  构造函数
clone_t *
clone_new (void)
{
    clone_t
        *self;
    self = (clone_t *) zmalloc (sizeof (clone_t));
    self->ctx = zctx_new ();
    self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
    return self;
}
//  ---------------------------------------------------------------------
//  析构函数
void
clone_destroy (clone_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        clone_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  在链接之前指定快照和更新事件的子树
//  发送给后台代理的消息内容为[SUBTREE][subtree]
void clone_subtree (clone_t *self, char *subtree)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "SUBTREE");
    zmsg_addstr (msg, subtree);
    zmsg_send (&msg, self->pipe);
}
//  ---------------------------------------------------------------------
//  连接至新的服务器端点
//  消息内容:[CONNECT][endpoint][service]
void
clone_connect (clone_t *self, char *address, char *service)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
}
//  ---------------------------------------------------------------------
//  设置新值
//  消息内容:[SET][key][value][ttl]
void
clone_set (clone_t *self, char *key, char *value, int ttl)
{
    char ttlstr [10];
    sprintf (ttlstr, "%d", ttl);
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "SET");
    zmsg_addstr (msg, key);
    zmsg_addstr (msg, value);
    zmsg_addstr (msg, ttlstr);
    zmsg_send (&msg, self->pipe);
}
//  ---------------------------------------------------------------------
//  取值
//  消息内容:[GET][key]
//  如果没有clone可用,会返回NULL
char *
clone_get (clone_t *self, char *key)
{
    assert (self);
    assert (key);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "GET");
    zmsg_addstr (msg, key);
    zmsg_send (&msg, self->pipe);
    zmsg_t *reply = zmsg_recv (self->pipe);
    if (reply) {
        char *value = zmsg_popstr (reply);
        zmsg_destroy (&reply);
        return value;
    }
    return NULL;
}
//  =====================================================================
//  异步部分,在后台运行
//  ---------------------------------------------------------------------
//  单个服务端信息
typedef struct {
    char *address;              //  服务端地址
    int port;                   //  端口
    void *snapshot;             //  快照套接字
    void *subscriber;           //  接收更新事件的套接字
    uint64_t expiry;            //  服务器过期时间
    uint requests;              //  收到的快照请求数
} server_t;
static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
    server_t *self = (server_t *) zmalloc (sizeof (server_t));
    zclock_log ("I: adding server %s:%d...", address, port);
    self->address = strdup (address);
    self->port = port;
    self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (self->snapshot, "%s:%d", address, port);
    self->subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
    zsockopt_set_subscribe (self->subscriber, subtree);
    return self;
}
static void
server_destroy (server_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        server_t *self = *self_p;
        free (self->address);
        free (self);
        *self_p = NULL;
    }
}
//  ---------------------------------------------------------------------
//  后台代理类
//  状态
#define STATE_INITIAL       0   //  连接之前
#define STATE_SYNCING       1   //  正在同步
#define STATE_ACTIVE        2   //  正在更新
typedef struct {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  与主线程通信的套接字
    zhash_t *kvmap;             //  键值表
    char *subtree;              //  子树
    server_t *server [SERVER_MAX];
    uint nbr_servers;           //  范围:0 - SERVER_MAX
    uint state;                 //  当前状态
    uint cur_server;            //  当前master,0/1
    int64_t sequence;           //  键值对编号
    void *publisher;            //  发布更新事件的套接字
} agent_t;
static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
    agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
    self->ctx = ctx;
    self->pipe = pipe;
    self->kvmap = zhash_new ();
    self->subtree = strdup ("");
    self->state = STATE_INITIAL;
    self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
    return self;
}
static void
agent_destroy (agent_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        agent_t *self = *self_p;
        int server_nbr;
        for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
            server_destroy (&self->server [server_nbr]);
        zhash_destroy (&self->kvmap);
        free (self->subtree);
        free (self);
        *self_p = NULL;
    }
}
//  若线程被中断则返回-1
static int
agent_control_message (agent_t *self)
{
    zmsg_t *msg = zmsg_recv (self->pipe);
    char *command = zmsg_popstr (msg);
    if (command == NULL)
        return -1;
    if (streq (command, "SUBTREE")) {
        free (self->subtree);
        self->subtree = zmsg_popstr (msg);
    }
    else
    if (streq (command, "CONNECT")) {
        char *address = zmsg_popstr (msg);
        char *service = zmsg_popstr (msg);
        if (self->nbr_servers < SERVER_MAX) {
            self->server [self->nbr_servers++] = server_new (
                self->ctx, address, atoi (service), self->subtree);
            //  广播更新事件
            zsocket_connect (self->publisher, "%s:%d",
                address, atoi (service) + 2);
        }
        else
            zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
        free (address);
        free (service);
    }
    else
    if (streq (command, "SET")) {
        char *key = zmsg_popstr (msg);
        char *value = zmsg_popstr (msg);
        char *ttl = zmsg_popstr (msg);
        zhash_update (self->kvmap, key, (byte *) value);
        zhash_freefn (self->kvmap, key, free);
        //  向服务端发送键值对
        kvmsg_t *kvmsg = kvmsg_new (0);
        kvmsg_set_key  (kvmsg, key);
        kvmsg_set_uuid (kvmsg);
        kvmsg_fmt_body (kvmsg, "%s", value);
        kvmsg_set_prop (kvmsg, "ttl", ttl);
        kvmsg_send     (kvmsg, self->publisher);
        kvmsg_destroy (&kvmsg);
puts (key);
        free (ttl);
        free (key);             //  键值对实际由哈希表对象控制
    }
    else
    if (streq (command, "GET")) {
        char *key = zmsg_popstr (msg);
        char *value = zhash_lookup (self->kvmap, key);
        if (value)
            zstr_send (self->pipe, value);
        else
            zstr_send (self->pipe, "");
        free (key);
        free (value);
    }
    free (command);
    zmsg_destroy (&msg);
    return 0;
}
//  ---------------------------------------------------------------------
//  异步的后台代理会维护一个服务端池,并处理来自应用程序的请求或应答。
static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
    agent_t *self = agent_new (ctx, pipe);
    while (TRUE) {
        zmq_pollitem_t poll_set [] = {
            { pipe, 0, ZMQ_POLLIN, 0 },
            { 0,    0, ZMQ_POLLIN, 0 }
        };
        int poll_timer = -1;
        int poll_size = 2;
        server_t *server = self->server [self->cur_server];
        switch (self->state) {
            case STATE_INITIAL:
                //  该状态下,如果有可用服务,会发送快照请求
                if (self->nbr_servers > 0) {
                    zclock_log ("I: 正在等待服务器 %s:%d...",
                        server->address, server->port);
                    if (server->requests < 2) {
                        zstr_sendm (server->snapshot, "ICANHAZ?");
                        zstr_send  (server->snapshot, self->subtree);
                        server->requests++;
                    }
                    server->expiry = zclock_time () + SERVER_TTL;
                    self->state = STATE_SYNCING;
                    poll_set [1].socket = server->snapshot;
                }
                else
                    poll_size = 1;
                break;
            case STATE_SYNCING:
                //  该状态下我们从服务器端接收快照内容,若失败则尝试其他服务器
                poll_set [1].socket = server->snapshot;
                break;
            case STATE_ACTIVE:
                //  该状态下我们从服务器获取更新事件,失败则尝试其他服务器
                poll_set [1].socket = server->subscriber;
                break;
        }
        if (server) {
            poll_timer = (server->expiry - zclock_time ())
                       * ZMQ_POLL_MSEC;
            if (poll_timer < 0)
                poll_timer = 0;
        }
        //  ------------------------------------------------------------
        //  poll循环
        int rc = zmq_poll (poll_set, poll_size, poll_timer);
        if (rc == -1)
            break;              //  上下文已被关闭
        if (poll_set [0].revents & ZMQ_POLLIN) {
            if (agent_control_message (self))
                break;          //  中断
        }
        else
        if (poll_set [1].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
            if (!kvmsg)
                break;          //  中断
            //  任何服务端的消息将重置它的过期时间
            server->expiry = zclock_time () + SERVER_TTL;
            if (self->state == STATE_SYNCING) {
                //  保存快照内容
                server->requests = 0;
                if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
                    self->sequence = kvmsg_sequence (kvmsg);
                    self->state = STATE_ACTIVE;
                    zclock_log ("I: received from %s:%d snapshot=%d",
                        server->address, server->port,
                        (int) self->sequence);
                    kvmsg_destroy (&kvmsg);
                }
                else
                    kvmsg_store (&kvmsg, self->kvmap);
            }
            else
            if (self->state == STATE_ACTIVE) {
                //  丢弃过期的更新事件
                if (kvmsg_sequence (kvmsg) > self->sequence) {
                    self->sequence = kvmsg_sequence (kvmsg);
                    kvmsg_store (&kvmsg, self->kvmap);
                    zclock_log ("I: received from %s:%d update=%d",
                        server->address, server->port,
                        (int) self->sequence);
                }
                else
                    kvmsg_destroy (&kvmsg);
            }
        }
        else {
            //  服务端已死,尝试其他服务器
            zclock_log ("I: 服务器 %s:%d 无响应",
                    server->address, server->port);
            self->cur_server = (self->cur_server + 1) % self->nbr_servers;
            self->state = STATE_INITIAL;
        }
    }
    agent_destroy (&self);
}

最后是克隆服务器的模型6代码:

       clonesrv6: Clone server, Model Six in C

//
// 克隆模式 - 服务端 - 模型6
//
//  直接编译,不建类库
#include "bstar.c"
#include "kvmsg.c"
//  bstar反应堆API
static int s_snapshots  (zloop_t *loop, void *socket, void *args);
static int s_collector  (zloop_t *loop, void *socket, void *args);
static int s_flush_ttl  (zloop_t *loop, void *socket, void *args);
static int s_send_hugz  (zloop_t *loop, void *socket, void *args);
static int s_new_master (zloop_t *loop, void *unused, void *args);
static int s_new_slave  (zloop_t *loop, void *unused, void *args);
static int s_subscriber (zloop_t *loop, void *socket, void *args);
//  服务端属性
typedef struct {
    zctx_t *ctx;                //  上下文
    zhash_t *kvmap;             //  存放键值对
    bstar_t *bstar;             //  bstar反应堆核心
    int64_t sequence;           //  更新事件编号
    int port;                   //  主端口
    int peer;                   //  同伴端口
    void *publisher;            //  发布更新事件的端口
    void *collector;            //  接收客户端更新事件的端口
    void *subscriber;           //  接受同伴更新事件的端口
    zlist_t *pending;           //  延迟的更新事件
    Bool primary;               //  是否为主机
    Bool master;                //  是否为master
    Bool slave;                 //  是否为slave
} clonesrv_t;
int main (int argc, char *argv [])
{
    clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
    if (argc == 2 && streq (argv [1], "-p")) {
        zclock_log ("I: 作为主机master运行,正在等待备机slave连接。");
        self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
                                 "tcp://localhost:5004");
        bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER,
                     s_snapshots, self);
        self->port = 5556;
        self->peer = 5566;
        self->primary = TRUE;
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {
        zclock_log ("I: 作为备机slave运行,正在等待主机master连接。");
        self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
                                 "tcp://localhost:5003");
        bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER,
                     s_snapshots, self);
        self->port = 5566;
        self->peer = 5556;
        self->primary = FALSE;
    }
    else {
        printf ("Usage: clonesrv4 { -p | -b }\n");
        free (self);
        exit (0);
    }
    //  主机将成为master
    if (self->primary)
        self->kvmap = zhash_new ();
    self->ctx = zctx_new ();
    self->pending = zlist_new ();
    bstar_set_verbose (self->bstar, TRUE);
    //  设置克隆服务端套接字
    self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
    self->collector = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
    zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
    //  作为克隆客户端连接同伴
    self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1);
    //  注册状态事件处理器
    bstar_new_master (self->bstar, s_new_master, self);
    bstar_new_slave (self->bstar, s_new_slave, self);
    //  注册bstar反应堆其他事件处理器
    zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
    zloop_timer  (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
    zloop_timer  (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
    //  开启bstar反应堆
    bstar_start (self->bstar);
    //  中断,终止。
    while (zlist_size (self->pending)) {
        kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
        kvmsg_destroy (&kvmsg);
    }
    zlist_destroy (&self->pending);
    bstar_destroy (&self->bstar);
    zhash_destroy (&self->kvmap);
    zctx_destroy (&self->ctx);
    free (self);
    return 0;
}
//  ---------------------------------------------------------------------
//  发送快照内容
static int s_send_single (char *key, void *data, void *args);
//  请求方信息
typedef struct {
    void *socket;           //  ROUTER套接字
    zframe_t *identity;     //  请求放标识
    char *subtree;          //  子树
} kvroute_t;
static int
s_snapshots (zloop_t *loop, void *snapshot, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    zframe_t *identity = zframe_recv (snapshot);
    if (identity) {
        //  请求在消息的第二帧中
        char *request = zstr_recv (snapshot);
        char *subtree = NULL;
        if (streq (request, "ICANHAZ?")) {
            free (request);
            subtree = zstr_recv (snapshot);
        }
        else
            printf ("E: 错误的请求,正在退出……\n");
        if (subtree) {
            //  发送状态快照
            kvroute_t routing = { snapshot, identity, subtree };
            zhash_foreach (self->kvmap, s_send_single, &routing);
            //  发送终止消息,以及消息编号
            zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence);
            zframe_send (&identity, snapshot, ZFRAME_MORE);
            kvmsg_t *kvmsg = kvmsg_new (self->sequence);
            kvmsg_set_key  (kvmsg, "KTHXBAI");
            kvmsg_set_body (kvmsg, (byte *) subtree, 0);
            kvmsg_send     (kvmsg, snapshot);
            kvmsg_destroy (&kvmsg);
            free (subtree);
        }
    }
    return 0;
}
//  每次发送一个快照键值对
static int
s_send_single (char *key, void *data, void *args)
{
    kvroute_t *kvroute = (kvroute_t *) args;
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
    &&  memcmp (kvroute->subtree,
                kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
        //  先发送接收方的地址
        zframe_send (&kvroute->identity,
            kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
        kvmsg_send (kvmsg, kvroute->socket);
    }
    return 0;
}
//  ---------------------------------------------------------------------
//  从客户端收集更新事件
//  如果我们是master,则将该事件写入kvmap对象;
//  如果我们是slave,则将其写入延迟队列
static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
static int
s_collector (zloop_t *loop, void *collector, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    kvmsg_t *kvmsg = kvmsg_recv (collector);
    kvmsg_dump (kvmsg);
    if (kvmsg) {
        if (self->master) {
            kvmsg_set_sequence (kvmsg, ++self->sequence);
            kvmsg_send (kvmsg, self->publisher);
            int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
            if (ttl)
                kvmsg_set_prop (kvmsg, "ttl",
                    "%" PRId64, zclock_time () + ttl * 1000);
            kvmsg_store (&kvmsg, self->kvmap);
            zclock_log ("I: 正在发布更新事件:%d", (int) self->sequence);
        }
        else {
            //  如果我们已经从master中获得了该事件,则丢弃该消息
            if (s_was_pending (self, kvmsg))
                kvmsg_destroy (&kvmsg);
            else
                zlist_append (self->pending, kvmsg);
        }
    }
    return 0;
}
//  如果消息已在延迟队列中,则删除它并返回TRUE
static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
{
    kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
    while (held) {
        if (memcmp (kvmsg_uuid (kvmsg),
                    kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
            zlist_remove (self->pending, held);
            return TRUE;
        }
        held = (kvmsg_t *) zlist_next (self->pending);
    }
    return FALSE;
}
//  ---------------------------------------------------------------------
//  删除带有过期时间的瞬间值
static int s_flush_single (char *key, void *data, void *args);
static int
s_flush_ttl (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    zhash_foreach (self->kvmap, s_flush_single, args);
    return 0;
}
//  如果键值对过期,则进行删除操作,并广播该事件
static int
s_flush_single (char *key, void *data, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    int64_t ttl;
    sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
    if (ttl && zclock_time () >= ttl) {
        kvmsg_set_sequence (kvmsg, ++self->sequence);
        kvmsg_set_body (kvmsg, (byte *) "", 0);
        kvmsg_send (kvmsg, self->publisher);
        kvmsg_store (&kvmsg, self->kvmap);
        zclock_log ("I: 正在发布删除事件:%d", (int) self->sequence);
    }
    return 0;
}
//  ---------------------------------------------------------------------
//  发送心跳
static int
s_send_hugz (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    kvmsg_t *kvmsg = kvmsg_new (self->sequence);
    kvmsg_set_key  (kvmsg, "HUGZ");
    kvmsg_set_body (kvmsg, (byte *) "", 0);
    kvmsg_send     (kvmsg, self->publisher);
    kvmsg_destroy (&kvmsg);
    return 0;
}
//  ---------------------------------------------------------------------
//  状态改变事件处理函数
//  我们将转变为master
//
//  备机先将延迟列表中的事件更新到自己的快照中,
//  并开始接收客户端发来的快照请求。
static int
s_new_master (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    self->master = TRUE;
    self->slave = FALSE;
    zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
    //  应用延迟列表中的事件
    while (zlist_size (self->pending)) {
        kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
        kvmsg_set_sequence (kvmsg, ++self->sequence);
        kvmsg_send (kvmsg, self->publisher);
        kvmsg_store (&kvmsg, self->kvmap);
        zclock_log ("I: 正在发布延迟列表中的更新事件:%d", (int) self->sequence);
    }
    return 0;
}
//  ---------------------------------------------------------------------
//  正在切换为slave
static int
s_new_slave (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    zhash_destroy (&self->kvmap);
    self->master = FALSE;
    self->slave = TRUE;
    zloop_reader (bstar_zloop (self->bstar), self->subscriber,
                  s_subscriber, self);
    return 0;
}
//  ---------------------------------------------------------------------
//  从同伴主机(master)接收更新事件;
//  接收该类更新事件时,我们一定是slave。
static int
s_subscriber (zloop_t *loop, void *subscriber, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    //  获取快照,如果需要的话。
    if (self->kvmap == NULL) {
        self->kvmap = zhash_new ();
        void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
        zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
        zclock_log ("I: 正在请求快照:tcp://localhost:%d",
                    self->peer);
        zstr_send (snapshot, "ICANHAZ?");
        while (TRUE) {
            kvmsg_t *kvmsg = kvmsg_recv (snapshot);
            if (!kvmsg)
                break;          //  中断
            if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
                self->sequence = kvmsg_sequence (kvmsg);
                kvmsg_destroy (&kvmsg);
                break;          //  完成
            }
            kvmsg_store (&kvmsg, self->kvmap);
        }
        zclock_log ("I: 收到快照,版本号:%d", (int) self->sequence);
        zsocket_destroy (self->ctx, snapshot);
    }
    //  查找并删除
    kvmsg_t *kvmsg = kvmsg_recv (subscriber);
    if (!kvmsg)
        return 0;
    if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
        if (!s_was_pending (self, kvmsg)) {
            //  如果master的更新事件比客户端的事件早到,则将master的事件存入延迟列表,
            //  当收到客户端更新事件时会将其从列表中清除。
            zlist_append (self->pending, kvmsg_dup (kvmsg));
        }
        //  如果更新事件比kvmap版本高,则应用它
        if (kvmsg_sequence (kvmsg) > self->sequence) {
            self->sequence = kvmsg_sequence (kvmsg);
            kvmsg_store (&kvmsg, self->kvmap);
            zclock_log ("I: 收到更新事件:%d", (int) self->sequence);
        }
        else
            kvmsg_destroy (&kvmsg);
    }
    else
        kvmsg_destroy (&kvmsg);
    return 0;
}

这段程序只有几百行,但还是花了一些时间来进行调通的。这个模型中包含了故障恢复,瞬间值,子树等等。虽然我们前期设计得很完备,但要在多个套接字之间进行调试还是很困难的。以下是我的工作方式:

               1、由于使用了反应堆(bstar,建立在zloop之上),我们节省了大量的代码,让程序变得简洁明了。整个服务以一个线程运行,因此不会出现跨线程的问题。只需将结构指针(self)传递给所有的处理器即可。此外,使用发应堆后可以让代码更为模块化,易于重用。

               2、我们逐个模块进行调试,只有某个模块能够正常运行时才会进入下一步。由于使用了四五个套接字,因此调试的工作量是很大的。我直接将调试信息输出到了屏幕上,因为实在没有必要专门开一个调试器来工作。

               3、因为一直在使用valgrind工具进行测试,因此我能确定程序没有内存泄漏的问题。在C语言中,内存泄漏是我们非常关心的问题,因为没有什么垃圾回收机制可以帮你完成。正确地使用像kvmsg、czmq之类的抽象层可以很好地避免内存泄漏。

       这段程序肯定还会存在一些BUG,部分读者可能会帮助我调试和修复,我在此表示感谢。

       测试模型6时,先开启主机和备机,再打开一组客户端,顺序随意。随机地中止某个服务进程,如果程序设计得是正确的,那客户端获得的数据应该都是一致的。

相关文章
|
6月前
|
消息中间件 负载均衡 监控
【ZMQ PUB模式指南】深入探究ZeroMQ的PUB-SUB模式:C++编程实践、底层原理与最佳实践
【ZMQ PUB模式指南】深入探究ZeroMQ的PUB-SUB模式:C++编程实践、底层原理与最佳实践
1890 1
在FlashFXP中设置数据传输模式为PORT(主动模式)的两种方法
在FlashFXP中设置数据传输模式为PORT(主动模式)的两种方法
|
3月前
|
存储 缓存 定位技术
如果遇到网络延迟问题,有哪些方法可以快速解决以保证视频源同步?
如果遇到网络延迟问题,有哪些方法可以快速解决以保证视频源同步?
|
传感器 负载均衡 物联网
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
MQTT v5共享订阅是怎么回事?如何使用共享订阅提高消息订阅的灵活性和可伸缩性?
477 1
|
消息中间件 存储 Kafka
MQ 学习日志(六) 保证消息的可靠性传输
消息的可靠性传输 简述
115 0
|
消息中间件 网络协议 中间件
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
ZMQ请求应答模式之无中间件的可靠性--自由者模式
|
消息中间件 存储 负载均衡
浅谈分布式环境下WebSocket消息共享问题
浅谈分布式环境下WebSocket消息共享问题
11037 0
|
缓存 安全 程序员
ZMQ之脱机可靠性--巨人模式
ZMQ之脱机可靠性--巨人模式
ZMQ之脱机可靠性--巨人模式
|
负载均衡 监控 程序员
ZMQ之高可靠对称节点--双子星模式
ZMQ之高可靠对称节点--双子星模式
ZMQ之高可靠对称节点--双子星模式
|
算法 程序员 API
ZMQ之面向服务的可靠队列(管家模式)
ZMQ之面向服务的可靠队列(管家模式)
ZMQ之面向服务的可靠队列(管家模式)