在很多情况下,连接的一端需要一直感知连接的状态,如果连接无效了,应用程序可能需要报错,或者重新发起连接等。
TCP 有一个保持活跃的机制叫做 Keep-Alive。这个机制的原理是这样的:
定义一个时间段,在这个时间段内,如果没有任何连接相关的活动,TCP 保活机制会开始作用,每隔一个时间间隔,发送一个探测报文,该探测报文包含的数据非常少,如果连续几个探测报文都没有得到响应,则认为当前的 TCP 连接已经死亡,系统内核将错误信息通知给上层应用程序。
上述的可定义变量,分别被称为保活时间、保活时间间隔和保活探测次数。在 Linux 系统中,这些变量分别对应 sysctl 变量net.ipv4.tcp_keepalive_time、net.ipv4.tcp_keepalive_intvl、 net.ipv4.tcp_keepalve_probes,默认设置是 7200 秒(2 小时)、75 秒和 9 次探测。
如果开启了 TCP 保活,需要考虑以下几种情况:
第一种,对端程序是正常工作的。当 TCP 保活的探测报文发送给对端, 对端会正常响应,这样 TCP 保活时间会被重置,等待下一个 TCP 保活时间的到来。
第二种,对端程序崩溃并重启。当 TCP 保活的探测报文发送给对端后,对端是可以响应的,但由于没有该连接的有效信息,会产生一个 RST 报文,这样很快就会发现 TCP 连接已经被重置。
第三种,是对端程序崩溃,或对端由于其他原因导致报文不可达。当 TCP 保活的探测报文发送给对端后,石沉大海,没有响应,连续几次,达到保活探测次数后,TCP 会报告该 TCP 连接已经死亡。
TCP 保活机制默认是关闭的,当我们选择打开时,可以分别在连接的两个方向上开启,也可以单独在一个方向上开启。如果开启服务器端到客户端的检测,就可以在客户端非正常断连的情况下清除在服务器端保留的“脏数据”;而开启客户端到服务器端的检测,就可以在服务器无响应的情况下,重新发起连接。
如果使用 TCP 自身的 keep-Alive 机制,在 Linux 系统中,最少需要经过 2 小时 11 分 15 秒才可以发现一个“死亡”连接。这个时间是怎么计算出来的呢?其实是通过 2 小时,加上 75 秒乘以 9 的总和。实际上,对很多对时延要求敏感的系统中,这个时间间隔是不可接受的。
我们可以通过在应用程序中模拟 TCP Keep-Alive 机制,来完成在应用层的连接探活。
我们可以设计一个 PING-PONG 的机制,需要保活的一方,比如客户端,在保活时间达到后,发起对连接的 PING 操作,如果服务器端对 PING 操作有回应,则重新设置保活时间,否则对探测次数进行计数,如果最终探测次数达到了保活探测次数预先设置的值之后,则认为连接已经无效。
这里有两个比较关键的点:第一个是需要使用定时器,这可以通过使用 I/O 复用自身的机制来实现;第二个是需要设计一个 PING-PONG 的协议。
我们的程序是客户端来发起保活,为此定义了一个消息对象。你可以看到这个消息对象,这个消息对象是一个结构体,前 4 个字节标识了消息类型,为了简单,这里设计了MSG_PING、MSG_PONG、MSG_TYPE 1和MSG_TYPE 2四种消息类型。
typedef struct { u_int32_t type; char data[1024]; } messageObject; #define MSG_PING 1 #define MSG_PONG 2 #define MSG_TYPE1 11 #define MSG_TYPE2 21
客户端完全模拟 TCP Keep-Alive 的机制,在保活时间达到后,探活次数增加 1,同时向服务器端发送 PING 格式的消息,此后以预设的保活时间间隔,不断地向服务器端发送 PING 格式的消息。如果能收到服务器端的应答,则结束保活,将保活时间置为 0。
#include "lib/common.h" #include "message_objecte.h" #define MAXLINE 4096 #define KEEP_ALIVE_TIME 10 #define KEEP_ALIVE_INTERVAL 3 #define KEEP_ALIVE_PROBETIMES 3 int main(int argc, char **argv) { if (argc != 2) { error(1, 0, "usage: tcpclient <IPaddress>"); } int socket_fd; socket_fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(SERV_PORT); inet_pton(AF_INET, argv[1], &server_addr.sin_addr); socklen_t server_len = sizeof(server_addr); int connect_rt = connect(socket_fd, (struct sockaddr *) &server_addr, server_len); if (connect_rt < 0) { error(1, errno, "connect failed "); } char recv_line[MAXLINE + 1]; int n; fd_set readmask; fd_set allreads; struct timeval tv; int heartbeats = 0; tv.tv_sec = KEEP_ALIVE_TIME; tv.tv_usec = 0; messageObject messageObject; FD_ZERO(&allreads); FD_SET(socket_fd, &allreads); for (;;) { readmask = allreads; int rc = select(socket_fd + 1, &readmask, NULL, NULL, &tv); if (rc < 0) { error(1, errno, "select failed"); } if (rc == 0) { if (++heartbeats > KEEP_ALIVE_PROBETIMES) { error(1, 0, "connection dead\n"); } printf("sending heartbeat #%d\n", heartbeats); messageObject.type = htonl(MSG_PING); rc = send(socket_fd, (char *) &messageObject, sizeof(messageObject), 0); if (rc < 0) { error(1, errno, "send failure"); } tv.tv_sec = KEEP_ALIVE_INTERVAL; continue; } if (FD_ISSET(socket_fd, &readmask)) { n = read(socket_fd, recv_line, MAXLINE); if (n < 0) { error(1, errno, "read error"); } else if (n == 0) { error(1, 0, "server terminated \n"); } printf("received heartbeat, make heartbeats to 0 \n"); heartbeats = 0; tv.tv_sec = KEEP_ALIVE_TIME; } } }
服务器端的程序接受一个参数,这个参数设置的比较大,可以模拟连接没有响应的情况。服务器端程序在接收到客户端发送来的各种消息后,进行处理,其中如果发现是 PING 类型的消息,在休眠一段时间后回复一个 PONG 消息,告诉客户端:“嗯,我还活着。”当然,如果这个休眠时间很长的话,那么客户端就无法快速知道服务器端是否存活,这是我们模拟连接无响应的一个手段而已,实际情况下,应该是系统崩溃,或者网络异常。
#include "lib/common.h" #include "message_objecte.h" static int count; int main(int argc, char **argv) { if (argc != 2) { error(1, 0, "usage: tcpsever <sleepingtime>"); } int sleepingTime = atoi(argv[1]); int listenfd; listenfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(SERV_PORT); int rt1 = bind(listenfd, (struct sockaddr *) &server_addr, sizeof(server_addr)); if (rt1 < 0) { error(1, errno, "bind failed "); } int rt2 = listen(listenfd, LISTENQ); if (rt2 < 0) { error(1, errno, "listen failed "); } int connfd; struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); if ((connfd = accept(listenfd, (struct sockaddr *) &client_addr, &client_len)) < 0) { error(1, errno, "bind failed "); } messageObject message; count = 0; for (;;) { int n = read(connfd, (char *) &message, sizeof(messageObject)); if (n < 0) { error(1, errno, "error read"); } else if (n == 0) { error(1, 0, "client closed \n"); } printf("received %d bytes\n", n); count++; switch (ntohl(message.type)) { case MSG_TYPE1 : printf("process MSG_TYPE1 \n"); break; case MSG_TYPE2 : printf("process MSG_TYPE2 \n"); break; case MSG_PING: { messageObject pong_message; pong_message.type = MSG_PONG; sleep(sleepingTime); ssize_t rc = send(connfd, (char *) &pong_message, sizeof(pong_message), 0); if (rc < 0) error(1, errno, "send failure"); break; } default : error(1, 0, "unknown message type (%d)\n", ntohl(message.type)); } } }
一般来说,这种机制的建立依赖于系统定时器,以及恰当的应用层报文协议。比如,使用心跳包就是这样一种保持 Keep Alive 的机制。