ZMQ之克隆模式的可靠性
克隆服务器的可靠性
克隆模型1至5相对比较简单,下面我们会探讨一个非常复杂的模型。可以发现,为了构建可靠的消息队列,我们需要花费非常多的精力。所以我们经常会问:有必要这么做吗?如果说你能够接受可靠性不够高的、或者说已经足够好的架构,那恭喜你,你在成本和收益之间找到了平衡。虽然我们会偶尔丢失一些消息,但从经济的角度来说还是合理的。不管怎样,下面我们就来介绍这个复杂的模型。
在模型3中,你会关闭和重启服务,这会导致数据的丢失。任何后续加入的客户端只能得到重启之后的那些数据,而非所有的。下面就让我们想办法让克隆模式能够承担服务器重启的故障。
以下列举我们需要处理的问题:
1、克隆服务器进程崩溃并自动或手工重启。进程丢失了所有数据,所以必须从别处进行恢复。
2、克隆服务器硬件故障,长时间不能恢复。客户端需要切换至另一个可用的服务端。
3、克隆服务器从网络上断开,如交换机发生故障等。它会在某个时点重连,但期间的数据就需要替代的服务器负责处理。
第一步我们需要增加一个服务器。我们可以使用第四章中提到的双子星模式,它是一个反应堆,而我们的程序经过整理后也是一个反应堆,因此可以互相协作。
我们需要保证更新事件在主服务器崩溃时仍能保留,最简单的机制就是同时发送给两台服务器。
备机就可以当做一台客户端来运行,像其他客户端一样从主机获取更新事件。同时它又能从客户端获取更新事件——虽然不应该以此更新数据,但可以先暂存起来。
所以,相较于模型5,模型6中引入了以下特性:
1、客户端发送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字会在没有接收方时阻塞,且会进行负载均衡——我们需要两台服务器都接收到消息。我们会在服务器端绑定SUB套接字,在客户端连接PUB套接字。
2、我们在服务器发送给客户端的更新事件中加入心跳,这样客户端可以知道主机是否已死,然后切换至备机。
3、我们使用双子星模式的bstar反应堆类来创建主机和备机。双子星模式中需要有一个“投票”套接字,来协助判定对方节点是否已死。这里我们使用快照请求来作为“投票”。
4、我们将为所有的更新事件添加UUID属性,它由客户端生成,服务端会将其发布给所有客户端。
5、备机将维护一个“待处理列表”,保存来自客户端、尚未由服务端发布的更新事件;或者反过来,来自服务端、尚未从客户端收到的更新事件。这个列表从旧到新排列,这样就能方便地从顶部删除消息。
我们可以为客户端设计一个有限状态机,它有三种状态:
1、客户端打开并连接了套接字,然后向服务端发送快照请求。为了避免消息风暴,它只会请求两次。
2、客户端等待快照应答,如果获得了则保存它;如果没有获得,则向第二个服务器发送请求。
3、客户端收到快照,便开始等待更新事件。如果在一定时间内没有收到服务端响应,则会连接第二个服务端。
客户端会一直循环下去,可能在程序刚启动时,部分客户端会试图连接主机,部分连接备机,相信双子星模式会很好地处理这一情况的。
我们可以将客户端状态图绘制出来:
故障恢复的步骤如下:
1、客户端检测到主机不再发送心跳,因此转而连接备机,并请求一份新的快照;
2、备机开始接收快照请求,并检测到主机死亡,于是开始作为主机运行;
3、备机将待处理列表中的更新事件写入自身状态中,然后开始处理快照请求。
当主机恢复连接时:
1、启动为slave状态,并作为克隆模式客户端连接备机;
2、同时,使用SUB套接字从客户端接收更新事件。
我们做两点假设:
1、至少有一台主机会继续运行。如果两台主机都崩溃了,那我们将丢失所有的服务端数据,无法恢复。
2、不同的客户端不会同时更新同一个键值对。客户端的更新事件会先后到达两个服务器,因此更新的顺序可能会不一致。单个客户端的更新事件到达两台服务器的顺序是相同的,所以不用担心。
下面是整体架构图:
开始编程之前,我们需要将客户端重构成一个可复用的类。在ZMQ中写异步类有时是为了练习如何写出优雅的代码,但这里我们确实是希望克隆模式可以成为一种易于使用的程序。上述架构的伸缩性来源于客户端的正确行为,因此有必要将其封装成一份API。要在客户端中进行故障恢复还是比较复杂的,试想一下自由者模式和克隆模式结合起来会是什么样的吧。
按照我的习惯,我会先写出一份API的列表,然后加以实现。让我们假想一个名为clone的API,在其基础之上编写克隆模式客户端API。将代码封装为API显然会提升代码的稳定性,就以模型5为例,客户端需要打开三个套接字,端点名称直接写在了代码里。我们可以创建这样一组API:
// 为每个套接字指定端点 clone_subscribe (clone, "tcp://localhost:5556"); clone_snapshot (clone, "tcp://localhost:5557"); clone_updates (clone, "tcp://localhost:5558"); // 由于有两个服务端,因此再执行一次 clone_subscribe (clone, "tcp://localhost:5566"); clone_snapshot (clone, "tcp://localhost:5567"); clone_updates (clone, "tcp://localhost:5568");
但这种写法还是比较啰嗦的,因为没有必要将API内部的一些设计暴露给编程人员。现在我们会使用三个套接字,而将来可能就会使用两个,或者四个。我们不可能让所有的应用程序都相应地修改吧?让我们把这些信息包装到API中:
// 指定主备服务器端点 clone_connect (clone, "tcp://localhost:5551"); clone_connect (clone, "tcp://localhost:5561");
这样一来代码就变得非常简洁,不过也会对现有代码的内部就够造成影响。我们需要从一个端点中推算出三个端点。一种方法是假设客户端和服务端使用三个连续的端点通信,并将这个规则写入协议;另一个方法是向服务器索取缺少的端点信息。我们使用第一种较为简单的方法:
1、服务器状态ROUTER在端点P;
2、服务器更新事件PUB在端点P + 1;
3、服务器更新事件SUB在端点P + 2。
clone类和第四章的flcliapi类很类似,由两部分组成:
1、一个在后台运行的异步克隆模式代理。该代理处理所有的I/O操作,实时地和服务器进行通信;
2、一个在前台应用程序中同步运行的clone类。当你创建了一个clone对象后,它会自动创建后台的clone线程;当你销毁clone对象,该后台线程也会被销毁。
前台的clone类会使用inproc管道和后台的代理进行通信。C语言中,czmq线程会自动为我们创建这个管道。这也是ZMQ多线程编程的常规方式。
如果没有ZMQ,这种异步的设计将很难处理高压工作,而ZMQ会让其变得简单。编写出来额代码会相对比较复杂。我们可以用反应堆的模式来编写,但这会进一步增加复杂度,且影响应用程序的使用。因此,我们的设计的API将更像是一个能够和服务器进行通信的键值表:
clone_t *clone_new (void); void clone_destroy (clone_t **self_p); void clone_connect (clone_t *self, char *address, char *service); void clone_set (clone_t *self, char *key, char *value); char *clone_get (clone_t *self, char *key);
下面就是克隆模式客户端模型6的代码,因为调用了API,所以非常简短:
clonecli6: Clone client, Model Six in C
// // 克隆模式 - 客户端 - 模型6 // // 直接编译,不建类库 #include "clone.c" #define SUBTREE "/client/" int main (void) { // 创建分布式哈希表 clone_t *clone = clone_new (); // 配置 clone_subtree (clone, SUBTREE); clone_connect (clone, "tcp://localhost", "5556"); clone_connect (clone, "tcp://localhost", "5566"); // 插入随机键值 while (!zctx_interrupted) { // 生成随机值 char key [255]; char value [10]; sprintf (key, "%s%d", SUBTREE, randof (10000)); sprintf (value, "%d", randof (1000000)); clone_set (clone, key, value, randof (30)); sleep (1); } clone_destroy (&clone); return 0; }
以下是clone类的实现:
clone: Clone class in C
/* ===================================================================== clone - client-side Clone Pattern class --------------------------------------------------------------------- Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> Copyright other contributors as noted in the AUTHORS file. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org This is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ===================================================================== */ #include "clone.h" // 请求超时时间 #define GLOBAL_TIMEOUT 4000 // msecs // 判定服务器死亡的时间 #define SERVER_TTL 5000 // msecs // 服务器数量 #define SERVER_MAX 2 // ===================================================================== // 同步部分,在应用程序线程中工作 // --------------------------------------------------------------------- // 类结构 struct _clone_t { zctx_t *ctx; // 上下文 void *pipe; // 和后台代理间的通信套接字 }; // 该线程用于处理真正的clone类 static void clone_agent (void *args, zctx_t *ctx, void *pipe); // --------------------------------------------------------------------- // 构造函数 clone_t * clone_new (void) { clone_t *self; self = (clone_t *) zmalloc (sizeof (clone_t)); self->ctx = zctx_new (); self->pipe = zthread_fork (self->ctx, clone_agent, NULL); return self; } // --------------------------------------------------------------------- // 析构函数 void clone_destroy (clone_t **self_p) { assert (self_p); if (*self_p) { clone_t *self = *self_p; zctx_destroy (&self->ctx); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 在链接之前指定快照和更新事件的子树 // 发送给后台代理的消息内容为[SUBTREE][subtree] void clone_subtree (clone_t *self, char *subtree) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SUBTREE"); zmsg_addstr (msg, subtree); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 连接至新的服务器端点 // 消息内容:[CONNECT][endpoint][service] void clone_connect (clone_t *self, char *address, char *service) { assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "CONNECT"); zmsg_addstr (msg, address); zmsg_addstr (msg, service); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 设置新值 // 消息内容:[SET][key][value][ttl] void clone_set (clone_t *self, char *key, char *value, int ttl) { char ttlstr [10]; sprintf (ttlstr, "%d", ttl); assert (self); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "SET"); zmsg_addstr (msg, key); zmsg_addstr (msg, value); zmsg_addstr (msg, ttlstr); zmsg_send (&msg, self->pipe); } // --------------------------------------------------------------------- // 取值 // 消息内容:[GET][key] // 如果没有clone可用,会返回NULL char * clone_get (clone_t *self, char *key) { assert (self); assert (key); zmsg_t *msg = zmsg_new (); zmsg_addstr (msg, "GET"); zmsg_addstr (msg, key); zmsg_send (&msg, self->pipe); zmsg_t *reply = zmsg_recv (self->pipe); if (reply) { char *value = zmsg_popstr (reply); zmsg_destroy (&reply); return value; } return NULL; } // ===================================================================== // 异步部分,在后台运行 // --------------------------------------------------------------------- // 单个服务端信息 typedef struct { char *address; // 服务端地址 int port; // 端口 void *snapshot; // 快照套接字 void *subscriber; // 接收更新事件的套接字 uint64_t expiry; // 服务器过期时间 uint requests; // 收到的快照请求数 } server_t; static server_t * server_new (zctx_t *ctx, char *address, int port, char *subtree) { server_t *self = (server_t *) zmalloc (sizeof (server_t)); zclock_log ("I: adding server %s:%d...", address, port); self->address = strdup (address); self->port = port; self->snapshot = zsocket_new (ctx, ZMQ_DEALER); zsocket_connect (self->snapshot, "%s:%d", address, port); self->subscriber = zsocket_new (ctx, ZMQ_SUB); zsocket_connect (self->subscriber, "%s:%d", address, port + 1); zsockopt_set_subscribe (self->subscriber, subtree); return self; } static void server_destroy (server_t **self_p) { assert (self_p); if (*self_p) { server_t *self = *self_p; free (self->address); free (self); *self_p = NULL; } } // --------------------------------------------------------------------- // 后台代理类 // 状态 #define STATE_INITIAL 0 // 连接之前 #define STATE_SYNCING 1 // 正在同步 #define STATE_ACTIVE 2 // 正在更新 typedef struct { zctx_t *ctx; // 上下文 void *pipe; // 与主线程通信的套接字 zhash_t *kvmap; // 键值表 char *subtree; // 子树 server_t *server [SERVER_MAX]; uint nbr_servers; // 范围:0 - SERVER_MAX uint state; // 当前状态 uint cur_server; // 当前master,0/1 int64_t sequence; // 键值对编号 void *publisher; // 发布更新事件的套接字 } agent_t; static agent_t * agent_new (zctx_t *ctx, void *pipe) { agent_t *self = (agent_t *) zmalloc (sizeof (agent_t)); self->ctx = ctx; self->pipe = pipe; self->kvmap = zhash_new (); self->subtree = strdup (""); self->state = STATE_INITIAL; self->publisher = zsocket_new (self->ctx, ZMQ_PUB); return self; } static void agent_destroy (agent_t **self_p) { assert (self_p); if (*self_p) { agent_t *self = *self_p; int server_nbr; for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++) server_destroy (&self->server [server_nbr]); zhash_destroy (&self->kvmap); free (self->subtree); free (self); *self_p = NULL; } } // 若线程被中断则返回-1 static int agent_control_message (agent_t *self) { zmsg_t *msg = zmsg_recv (self->pipe); char *command = zmsg_popstr (msg); if (command == NULL) return -1; if (streq (command, "SUBTREE")) { free (self->subtree); self->subtree = zmsg_popstr (msg); } else if (streq (command, "CONNECT")) { char *address = zmsg_popstr (msg); char *service = zmsg_popstr (msg); if (self->nbr_servers < SERVER_MAX) { self->server [self->nbr_servers++] = server_new ( self->ctx, address, atoi (service), self->subtree); // 广播更新事件 zsocket_connect (self->publisher, "%s:%d", address, atoi (service) + 2); } else zclock_log ("E: too many servers (max. %d)", SERVER_MAX); free (address); free (service); } else if (streq (command, "SET")) { char *key = zmsg_popstr (msg); char *value = zmsg_popstr (msg); char *ttl = zmsg_popstr (msg); zhash_update (self->kvmap, key, (byte *) value); zhash_freefn (self->kvmap, key, free); // 向服务端发送键值对 kvmsg_t *kvmsg = kvmsg_new (0); kvmsg_set_key (kvmsg, key); kvmsg_set_uuid (kvmsg); kvmsg_fmt_body (kvmsg, "%s", value); kvmsg_set_prop (kvmsg, "ttl", ttl); kvmsg_send (kvmsg, self->publisher); kvmsg_destroy (&kvmsg); puts (key); free (ttl); free (key); // 键值对实际由哈希表对象控制 } else if (streq (command, "GET")) { char *key = zmsg_popstr (msg); char *value = zhash_lookup (self->kvmap, key); if (value) zstr_send (self->pipe, value); else zstr_send (self->pipe, ""); free (key); free (value); } free (command); zmsg_destroy (&msg); return 0; } // --------------------------------------------------------------------- // 异步的后台代理会维护一个服务端池,并处理来自应用程序的请求或应答。 static void clone_agent (void *args, zctx_t *ctx, void *pipe) { agent_t *self = agent_new (ctx, pipe); while (TRUE) { zmq_pollitem_t poll_set [] = { { pipe, 0, ZMQ_POLLIN, 0 }, { 0, 0, ZMQ_POLLIN, 0 } }; int poll_timer = -1; int poll_size = 2; server_t *server = self->server [self->cur_server]; switch (self->state) { case STATE_INITIAL: // 该状态下,如果有可用服务,会发送快照请求 if (self->nbr_servers > 0) { zclock_log ("I: 正在等待服务器 %s:%d...", server->address, server->port); if (server->requests < 2) { zstr_sendm (server->snapshot, "ICANHAZ?"); zstr_send (server->snapshot, self->subtree); server->requests++; } server->expiry = zclock_time () + SERVER_TTL; self->state = STATE_SYNCING; poll_set [1].socket = server->snapshot; } else poll_size = 1; break; case STATE_SYNCING: // 该状态下我们从服务器端接收快照内容,若失败则尝试其他服务器 poll_set [1].socket = server->snapshot; break; case STATE_ACTIVE: // 该状态下我们从服务器获取更新事件,失败则尝试其他服务器 poll_set [1].socket = server->subscriber; break; } if (server) { poll_timer = (server->expiry - zclock_time ()) * ZMQ_POLL_MSEC; if (poll_timer < 0) poll_timer = 0; } // ------------------------------------------------------------ // poll循环 int rc = zmq_poll (poll_set, poll_size, poll_timer); if (rc == -1) break; // 上下文已被关闭 if (poll_set [0].revents & ZMQ_POLLIN) { if (agent_control_message (self)) break; // 中断 } else if (poll_set [1].revents & ZMQ_POLLIN) { kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket); if (!kvmsg) break; // 中断 // 任何服务端的消息将重置它的过期时间 server->expiry = zclock_time () + SERVER_TTL; if (self->state == STATE_SYNCING) { // 保存快照内容 server->requests = 0; if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence (kvmsg); self->state = STATE_ACTIVE; zclock_log ("I: received from %s:%d snapshot=%d", server->address, server->port, (int) self->sequence); kvmsg_destroy (&kvmsg); } else kvmsg_store (&kvmsg, self->kvmap); } else if (self->state == STATE_ACTIVE) { // 丢弃过期的更新事件 if (kvmsg_sequence (kvmsg) > self->sequence) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: received from %s:%d update=%d", server->address, server->port, (int) self->sequence); } else kvmsg_destroy (&kvmsg); } } else { // 服务端已死,尝试其他服务器 zclock_log ("I: 服务器 %s:%d 无响应", server->address, server->port); self->cur_server = (self->cur_server + 1) % self->nbr_servers; self->state = STATE_INITIAL; } } agent_destroy (&self); }
最后是克隆服务器的模型6代码:
clonesrv6: Clone server, Model Six in C
// // 克隆模式 - 服务端 - 模型6 // // 直接编译,不建类库 #include "bstar.c" #include "kvmsg.c" // bstar反应堆API static int s_snapshots (zloop_t *loop, void *socket, void *args); static int s_collector (zloop_t *loop, void *socket, void *args); static int s_flush_ttl (zloop_t *loop, void *socket, void *args); static int s_send_hugz (zloop_t *loop, void *socket, void *args); static int s_new_master (zloop_t *loop, void *unused, void *args); static int s_new_slave (zloop_t *loop, void *unused, void *args); static int s_subscriber (zloop_t *loop, void *socket, void *args); // 服务端属性 typedef struct { zctx_t *ctx; // 上下文 zhash_t *kvmap; // 存放键值对 bstar_t *bstar; // bstar反应堆核心 int64_t sequence; // 更新事件编号 int port; // 主端口 int peer; // 同伴端口 void *publisher; // 发布更新事件的端口 void *collector; // 接收客户端更新事件的端口 void *subscriber; // 接受同伴更新事件的端口 zlist_t *pending; // 延迟的更新事件 Bool primary; // 是否为主机 Bool master; // 是否为master Bool slave; // 是否为slave } clonesrv_t; int main (int argc, char *argv []) { clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t)); if (argc == 2 && streq (argv [1], "-p")) { zclock_log ("I: 作为主机master运行,正在等待备机slave连接。"); self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003", "tcp://localhost:5004"); bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER, s_snapshots, self); self->port = 5556; self->peer = 5566; self->primary = TRUE; } else if (argc == 2 && streq (argv [1], "-b")) { zclock_log ("I: 作为备机slave运行,正在等待主机master连接。"); self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004", "tcp://localhost:5003"); bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER, s_snapshots, self); self->port = 5566; self->peer = 5556; self->primary = FALSE; } else { printf ("Usage: clonesrv4 { -p | -b }\n"); free (self); exit (0); } // 主机将成为master if (self->primary) self->kvmap = zhash_new (); self->ctx = zctx_new (); self->pending = zlist_new (); bstar_set_verbose (self->bstar, TRUE); // 设置克隆服务端套接字 self->publisher = zsocket_new (self->ctx, ZMQ_PUB); self->collector = zsocket_new (self->ctx, ZMQ_SUB); zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1); zsocket_bind (self->collector, "tcp://*:%d", self->port + 2); // 作为克隆客户端连接同伴 self->subscriber = zsocket_new (self->ctx, ZMQ_SUB); zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1); // 注册状态事件处理器 bstar_new_master (self->bstar, s_new_master, self); bstar_new_slave (self->bstar, s_new_slave, self); // 注册bstar反应堆其他事件处理器 zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self); zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self); zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self); // 开启bstar反应堆 bstar_start (self->bstar); // 中断,终止。 while (zlist_size (self->pending)) { kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending); kvmsg_destroy (&kvmsg); } zlist_destroy (&self->pending); bstar_destroy (&self->bstar); zhash_destroy (&self->kvmap); zctx_destroy (&self->ctx); free (self); return 0; } // --------------------------------------------------------------------- // 发送快照内容 static int s_send_single (char *key, void *data, void *args); // 请求方信息 typedef struct { void *socket; // ROUTER套接字 zframe_t *identity; // 请求放标识 char *subtree; // 子树 } kvroute_t; static int s_snapshots (zloop_t *loop, void *snapshot, void *args) { clonesrv_t *self = (clonesrv_t *) args; zframe_t *identity = zframe_recv (snapshot); if (identity) { // 请求在消息的第二帧中 char *request = zstr_recv (snapshot); char *subtree = NULL; if (streq (request, "ICANHAZ?")) { free (request); subtree = zstr_recv (snapshot); } else printf ("E: 错误的请求,正在退出……\n"); if (subtree) { // 发送状态快照 kvroute_t routing = { snapshot, identity, subtree }; zhash_foreach (self->kvmap, s_send_single, &routing); // 发送终止消息,以及消息编号 zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence); zframe_send (&identity, snapshot, ZFRAME_MORE); kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "KTHXBAI"); kvmsg_set_body (kvmsg, (byte *) subtree, 0); kvmsg_send (kvmsg, snapshot); kvmsg_destroy (&kvmsg); free (subtree); } } return 0; } // 每次发送一个快照键值对 static int s_send_single (char *key, void *data, void *args) { kvroute_t *kvroute = (kvroute_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) && memcmp (kvroute->subtree, kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { // 先发送接收方的地址 zframe_send (&kvroute->identity, kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); kvmsg_send (kvmsg, kvroute->socket); } return 0; } // --------------------------------------------------------------------- // 从客户端收集更新事件 // 如果我们是master,则将该事件写入kvmap对象; // 如果我们是slave,则将其写入延迟队列 static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg); static int s_collector (zloop_t *loop, void *collector, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_recv (collector); kvmsg_dump (kvmsg); if (kvmsg) { if (self->master) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl")); if (ttl) kvmsg_set_prop (kvmsg, "ttl", "%" PRId64, zclock_time () + ttl * 1000); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在发布更新事件:%d", (int) self->sequence); } else { // 如果我们已经从master中获得了该事件,则丢弃该消息 if (s_was_pending (self, kvmsg)) kvmsg_destroy (&kvmsg); else zlist_append (self->pending, kvmsg); } } return 0; } // 如果消息已在延迟队列中,则删除它并返回TRUE static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg) { kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending); while (held) { if (memcmp (kvmsg_uuid (kvmsg), kvmsg_uuid (held), sizeof (uuid_t)) == 0) { zlist_remove (self->pending, held); return TRUE; } held = (kvmsg_t *) zlist_next (self->pending); } return FALSE; } // --------------------------------------------------------------------- // 删除带有过期时间的瞬间值 static int s_flush_single (char *key, void *data, void *args); static int s_flush_ttl (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_foreach (self->kvmap, s_flush_single, args); return 0; } // 如果键值对过期,则进行删除操作,并广播该事件 static int s_flush_single (char *key, void *data, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = (kvmsg_t *) data; int64_t ttl; sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl); if (ttl && zclock_time () >= ttl) { kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在发布删除事件:%d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // 发送心跳 static int s_send_hugz (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; kvmsg_t *kvmsg = kvmsg_new (self->sequence); kvmsg_set_key (kvmsg, "HUGZ"); kvmsg_set_body (kvmsg, (byte *) "", 0); kvmsg_send (kvmsg, self->publisher); kvmsg_destroy (&kvmsg); return 0; } // --------------------------------------------------------------------- // 状态改变事件处理函数 // 我们将转变为master // // 备机先将延迟列表中的事件更新到自己的快照中, // 并开始接收客户端发来的快照请求。 static int s_new_master (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; self->master = TRUE; self->slave = FALSE; zloop_cancel (bstar_zloop (self->bstar), self->subscriber); // 应用延迟列表中的事件 while (zlist_size (self->pending)) { kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending); kvmsg_set_sequence (kvmsg, ++self->sequence); kvmsg_send (kvmsg, self->publisher); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 正在发布延迟列表中的更新事件:%d", (int) self->sequence); } return 0; } // --------------------------------------------------------------------- // 正在切换为slave static int s_new_slave (zloop_t *loop, void *unused, void *args) { clonesrv_t *self = (clonesrv_t *) args; zhash_destroy (&self->kvmap); self->master = FALSE; self->slave = TRUE; zloop_reader (bstar_zloop (self->bstar), self->subscriber, s_subscriber, self); return 0; } // --------------------------------------------------------------------- // 从同伴主机(master)接收更新事件; // 接收该类更新事件时,我们一定是slave。 static int s_subscriber (zloop_t *loop, void *subscriber, void *args) { clonesrv_t *self = (clonesrv_t *) args; // 获取快照,如果需要的话。 if (self->kvmap == NULL) { self->kvmap = zhash_new (); void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER); zsocket_connect (snapshot, "tcp://localhost:%d", self->peer); zclock_log ("I: 正在请求快照:tcp://localhost:%d", self->peer); zstr_send (snapshot, "ICANHAZ?"); while (TRUE) { kvmsg_t *kvmsg = kvmsg_recv (snapshot); if (!kvmsg) break; // 中断 if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_destroy (&kvmsg); break; // 完成 } kvmsg_store (&kvmsg, self->kvmap); } zclock_log ("I: 收到快照,版本号:%d", (int) self->sequence); zsocket_destroy (self->ctx, snapshot); } // 查找并删除 kvmsg_t *kvmsg = kvmsg_recv (subscriber); if (!kvmsg) return 0; if (strneq (kvmsg_key (kvmsg), "HUGZ")) { if (!s_was_pending (self, kvmsg)) { // 如果master的更新事件比客户端的事件早到,则将master的事件存入延迟列表, // 当收到客户端更新事件时会将其从列表中清除。 zlist_append (self->pending, kvmsg_dup (kvmsg)); } // 如果更新事件比kvmap版本高,则应用它 if (kvmsg_sequence (kvmsg) > self->sequence) { self->sequence = kvmsg_sequence (kvmsg); kvmsg_store (&kvmsg, self->kvmap); zclock_log ("I: 收到更新事件:%d", (int) self->sequence); } else kvmsg_destroy (&kvmsg); } else kvmsg_destroy (&kvmsg); return 0; }
这段程序只有几百行,但还是花了一些时间来进行调通的。这个模型中包含了故障恢复,瞬间值,子树等等。虽然我们前期设计得很完备,但要在多个套接字之间进行调试还是很困难的。以下是我的工作方式:
1、由于使用了反应堆(bstar,建立在zloop之上),我们节省了大量的代码,让程序变得简洁明了。整个服务以一个线程运行,因此不会出现跨线程的问题。只需将结构指针(self)传递给所有的处理器即可。此外,使用发应堆后可以让代码更为模块化,易于重用。
2、我们逐个模块进行调试,只有某个模块能够正常运行时才会进入下一步。由于使用了四五个套接字,因此调试的工作量是很大的。我直接将调试信息输出到了屏幕上,因为实在没有必要专门开一个调试器来工作。
3、因为一直在使用valgrind工具进行测试,因此我能确定程序没有内存泄漏的问题。在C语言中,内存泄漏是我们非常关心的问题,因为没有什么垃圾回收机制可以帮你完成。正确地使用像kvmsg、czmq之类的抽象层可以很好地避免内存泄漏。
这段程序肯定还会存在一些BUG,部分读者可能会帮助我调试和修复,我在此表示感谢。
测试模型6时,先开启主机和备机,再打开一组客户端,顺序随意。随机地中止某个服务进程,如果程序设计得是正确的,那客户端获得的数据应该都是一致的。