skynet对半关闭状态的支持

简介: TCP四次挥手中,半关闭状态是否需要处理取决于具体应用场景。半关闭是指连接的一端关闭读或写通道,另一端仍可继续传输数据。在游戏服务器等场景中,需关注半关闭以确保数据完整发送。Java的Netty和Skynet框架对此有解决方案。Skynet通过reactor模型和epoll机制实现半关闭支持,确保在关闭写端前发送完剩余数据。测试表明,正确处理半关闭可避免数据丢失,提升连接关闭的可靠性。

TCP四次挥手中的半关闭状态是否需要解决,依赖于使用场景,大多数场景不解决也不会有影响,但有些场景(特别是游戏服务器)还是需要关注这个半关闭状态的。所谓半关闭,就是只关闭读端或写端;半关闭状态是接收到FIN包并返回ACK包时 到 发送FIN包前的状态。

对半关闭状态进行了解决的有JAVA的netty、skynet开源框架。

大多数网络连接程序在read=0时即调用close()关闭TCP连接;但是,在read=0到调用close()之间,可能还有很多数据需要发送(send),如果read=0时即调用close()那么对端就收不到这些数据了。

对TCP三次握手和四次挥手不了解的,可以查阅前面的文章。

二、TCP四次挥手流程

需要四次挥手的原因:希望将FIN包控制权交给应用程序去处理,以便处理断开连接的善后工作,即对端可能还有数据要发送。

四次挥手流程:

  1. 服务器收到FIN包,自动回复ACK包,进入CLOSE_WAIT状态。
  2. 此时TCP协议栈会为FIN包插入一个文件描述符EOF到内核态网络读缓冲区。
  3. 应用程序通过read=0来感知这个FIN包。
  4. 如果此时应用程序有数据要发送,则发送数据后调用关闭连接操作(发送FIN包到对端)。
  5. 双端关闭都需要一个FIN和一个ACK流程。

三、发送FIN包的场景

  1. 关闭读端:调用shutdown(fd,SHUT_RD)。
  2. 关闭写端:调用shutdown(fd,SHUT_WR);半关闭连接,仍旧接收数据,继而等待对方关闭。
  3. 关闭双端:调用close(fd) 或者shoutdown(fd,SHUT_RDWR)。
  4. 关闭进程:内核协议栈会自动发送FIN包。

如果客户端想进入半关闭状态,需要关闭写端,服务端会把自己的读端关闭,从而进入半关闭状态。

如果客户端是关闭读端或者把读写端都关闭了(调用close(fd) 或者shoutdown(fd,SHUT_RDWR))或者是关闭了进程,那么会进入快速关闭流程:

接受到对端回复的FIN包之后回复RST包,直接进入CLOSE状态,而没有TIME_WAIT状态,这是解决有大量TIME_WAIT现象的常用方法

思考:四次挥手中,为什么存在TIME_WAIT状态?因为ACK不会重传,防止没有LAST_ACK或LAST_ACK丢失,对端没有收到LAST_ACK而一直重发FIN包。一直重发已经不存在的socket,可能会对新建立的连接造成干扰。

四、skynet 网络封装支持半关闭状态

skynet 网络层支持半关闭状态。skynet 采用 reactor 网络编程模型;reactor 网络模型是一种异步事件模型。

通过 epoll_ctl 设置 struct epoll_eventdata.ptr =(struct socket *)ud; 来完成 fd 与 actor绑定。skynet 通过 socket.start(fd, func)来完成 actor 与 fd 的绑定。

4.1、连接的建立

客户端与服务端建立连接处理:建立成功的标识是 listenfd,有读事件触发。服务端与第三方服务建立连接:建立成功的标识是 connect 返回的fd,有可写事件触发。

4.2、连接断开

skynet 网络层支持半关闭状态。

(1)读写端都关闭,epoll的事件EPOLLHUP 读写段都关闭:(socket_epoll.h)

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

static int 
sp_wait(int efd, struct event *e, int max) {
  struct epoll_event ev[max];
  int n = epoll_wait(efd , ev, max, -1);
  int i;
  for (i=0;i<n;i++) {
    e[i].s = ev[i].data.ptr;
    unsigned flag = ev[i].events;
    e[i].write = (flag & EPOLLOUT) != 0;
    e[i].read = (flag & EPOLLIN) != 0;
    e[i].error = (flag & EPOLLERR) != 0;
    e[i].eof = (flag & EPOLLHUP) != 0;
  }
  return n;
}

(socket_server.c)

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

// 处理:直接关闭并向 actor 返回事件 SOCKET_CLOSE
if (e->eof) {
  // For epoll (at least), FIN packets are exchanged both ways.
  // See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
  int halfclose = halfclose_read(s);
  force_close(ss, s, &l, result);
   // 如果前面因为关闭读端已经发送SOCKET_CLOSE,在这里避免重复 SOCKET_CLOSE
  if (!halfclose) {
    return SOCKET_CLOSE;
  }
}

(2)检测读端关闭:(socket_server.c)

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
  int sz = s->p.size;
  char * buffer = MALLOC(sz);
  int n = (int)read(s->fd, buffer, sz);
  if (n<0) {
    FREE(buffer);
    switch(errno) {
    case EINTR:
    case AGAIN_WOULDBLOCK:
      break;
    default:
      return report_error(s, result, strerror(errno));
    }
    return -1;
  }
  if (n==0) {
    FREE(buffer);
    if (s->closing) { // 如果该连接的 socket 已经关闭
      // Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
      if (nomore_sending_data(s)) {
        force_close(ss,s,l,result);
      }
      return -1;
    }
    int t = ATOM_LOAD(&s->type);
    if (t == SOCKET_TYPE_HALFCLOSE_READ) {  // 如果已经处理过读端关闭
      // Rare case : Already shutdown read.
      return -1;
    }
    if (t == SOCKET_TYPE_HALFCLOSE_WRITE) { // 如果之前已经处理过写端关闭,则直接 close
      // Remote shutdown read (write error) before.
      force_close(ss,s,l,result);
    } else {
      close_read(ss, s, result);
    }
    return SOCKET_CLOSE;
  }
  if (halfclose_read(s)) {
    // discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
    FREE(buffer);
    return -1;
  }
  stat_read(ss,s,n);
  result->opaque = s->opaque;
  result->id = s->id;
  result->ud = n;
  result->data = buffer;
  if (n == sz) {
    s->p.size *= 2;
    return SOCKET_MORE;
  } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
    s->p.size /= 2;
  }
  return SOCKET_DATA;
}

(3)检测写端关闭:write < 0 && errno == EPIPE,能检测对端读端关闭。(aly.rilamp.com22)

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
  while (list->head) {
    struct write_buffer * tmp = list->head;
    for (;;) {
      ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
      if (sz < 0) {
        switch(errno) {
        case EINTR:
          continue;
        case AGAIN_WOULDBLOCK:
          return -1;
        }
        return close_write(ss, s, l, result);
      }
      stat_write(ss,s,(int)sz);
      s->wb_size -= sz;
      if (sz != tmp->sz) {
        tmp->ptr += sz;
        tmp->sz -= sz;
        return -1;
      }
      break;
    }
    list->head = tmp->next;
    write_buffer_free(ss,tmp);
  }
  list->tail = NULL;
  return -1;
}

4.3、消息到达

单线程读。读策略:

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

int n = read(fd, buf, sz);

sz 初始值为 64,根据从网络接收数据情况,动态调整 sz 的大小。(socket_server.c)

代码语言:C

代码运行次数:0

自动换行运行

AI代码解释

// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
  int sz = s->p.size;
  char * buffer = MALLOC(sz);
  int n = (int)read(s->fd, buffer, sz);
  if (n<0) {
    FREE(buffer);
    switch(errno) {
    case EINTR:
    case AGAIN_WOULDBLOCK:
      break;
    default:
      return report_error(s, result, strerror(errno));
    }
    return -1;
  }
  if (n==0) {
    FREE(buffer);
    if (s->closing) {
      // Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
      if (nomore_sending_data(s)) {
        force_close(ss,s,l,result);
      }
      return -1;
    }
    int t = ATOM_LOAD(&s->type);
    if (t == SOCKET_TYPE_HALFCLOSE_READ) {
      // Rare case : Already shutdown read.
      return -1;
    }
    if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {
      // Remote shutdown read (write error) before.
      force_close(ss,s,l,result);
    } else {
      close_read(ss, s, result);
    }
    return SOCKET_CLOSE;
  }
  if (halfclose_read(s)) {
    // discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
    FREE(buffer);
    return -1;
  }
  stat_read(ss,s,n);
  result->opaque = s->opaque;
  result->id = s->id;
  result->ud = n;
  result->data = buffer;
  if (n == sz) {
    s->p.size *= 2;
    return SOCKET_MORE;
  } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
    s->p.size /= 2;
  }
  return SOCKET_DATA;
}

4.4、消息发送完毕

多线程写。同一个 fd 可以在不同的 actor 中发送数据。skynet底层通过加锁确保数据正确发送到 socket 的写缓冲区。

代码语言:aly.texasref.com22

代码运行次数:0

自动换行运行

AI代码解释

int 
socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
  int id = buf->id;
  struct socket * s = &ss->slot[HASH_ID(id)];
  if (socket_invalid(s, id) || s->closing) {
    free_buffer(ss, buf);
    return -1;
  }
  struct socket_lock l;
  socket_lock_init(s, &l);
  
  // 确保能在fd上写数据,连接可用状态,检测 socket 线程是否在对该fd操作
  if (can_direct_write(s,id) && socket_trylock(&l)) {
    // may be we can send directly, double check
    if (can_direct_write(s,id)) { // 双检测
      // send directly
      struct send_object so;
      send_object_init_from_sendbuffer(ss, &so, buf);
      ssize_t n;
      if (s->protocol == PROTOCOL_TCP) {
        // 尝试在 work 线程直接写,如果n >0,则写成功
        n = write(s->fd, so.buffer, so.sz);
      } else {
        union sockaddr_all sa;
        socklen_t sasz = udp_socket_address(s, s->p.udp_address, &sa);
        if (sasz == 0) {
          skynet_error(NULL, "socket-server : set udp (%d) address first.", id);
          socket_unlock(&l);
          so.free_func((void *)buf->buffer);
          return -1;
        }
        n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
      }
      if (n<0) {
        // ignore error, let socket thread try again
         // 如果失败,不要在 work 线程中处理异常,所有网络异常在 socket 线程中处理
        n = 0;
      }
      stat_write(ss,s,n);
      if (n == so.sz) {
        // write done
        socket_unlock(&l);
        so.free_func((void *)buf->buffer);
        return 0;
      }
      // write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
      s->dw_buffer = clone_buffer(buf, &s->dw_size);
      s->dw_offset = n;
      socket_unlock(&l);
      struct request_package request;
      request.u.send.id = id;
      request.u.send.sz = 0;
      request.u.send.buffer = NULL;
       // 如果写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。
       // 这里通过 pipe 来与 socket 线程通信。
      // let socket thread enable write event
      send_request(ss, &request, 'W', sizeof(request.u.send));
      return 0;
    }
    socket_unlock(&l);
  }
  inc_sending_ref(s, id);
  struct request_package request;
  request.u.send.id = id;
  request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);
  send_request(ss, &request, 'D', sizeof(request.u.send));
  return 0;
}

注意:在 work 线程中调用的。如果work线程写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。这里通过 pipe 来与 socket 线程通信。

五、测试skynet对半关闭的支持

main.lua的代码实现:

代码语言:Lua

自动换行

AI代码解释

local skynet = require "skynet"
local socket = require "skynet.socket"
skynet.start(function ()
    local listenfd = socket.listen("0.0.0.0", 8888)
    socket.start(listenfd, function (clientfd, addr)
        print("receive a client:", clientfd, addr)
        socket.start(clientfd)
        while true do
            local data = socket.readline(clientfd, "\n")
            if not data then -- read = 0
                -- for i=1,10 do
                    socket.write(clientfd, "FINAL\n")
                -- end
                socket.close(clientfd)
                print("closed", clientfd)
                return
            end
            if data == "quit" then
                print("recv quit", clientfd)
                socket.close(clientfd)
                return
            else
                print("S recv:", data)
                socket.write(clientfd, "=> "..data.."\n")
            end
        end
    end)
    
end)

client.lua的代码实现:

代码语言:Lua

自动换行

AI代码解释

package.cpath = "./skynet/luaclib/?.so"
package.path = "./skynet/lualib/?.lua;./?.lua"
if _VERSION ~= "Lua 5.4" then
  error "Use lua 5.4"
end
local socket = require "client.socket"
local fd = assert(socket.connect("127.0.0.1", 8888))
local function send_package(pack)
  socket.send(fd, pack .. "\n")
end
local function unpack_package(text)
  local size = #text
  if size < 1 then
    return nil, text
  end
    local pos = text:find("\n", 1, true)
  if not pos then
    return nil, text
  end
  return text:sub(1,pos-1), text:sub(pos+1)
end
local function recv_package(last)
  local result
  result, last = unpack_package(last)
  if result then
    return result, last
  end
  local r = socket.recv(fd)
  if not r then
    return nil, last
  end
  if r == "" then
    error "Server closed"
  end
  return unpack_package(last .. r)
end
local last = ""
local function dispatch_package()
  while true do
    local v
    v, last = recv_package(last)
    if not v then
      break
    end
        print(v)
  end
end
while true do
  dispatch_package()
  local cmd = socket.readstdin()
  if cmd then
    if cmd == "quit" then
      send_package("quit")
        elseif cmd == "shut_r" then
            -- send_package("shut_r")
            socket.shutdown(fd, "r")
            -- send_package("shut_r")
        elseif cmd == "shut_w" then
            -- send_package("shut_w")
            send_package("shut_w1")
            send_package("shut_w2")
            send_package("shut_w3")
            send_package("shut_w4")
            socket.shutdown(fd, "w")
    else
      send_package(cmd)
    end
  else
    socket.usleep(100)
  end
end

config设置

代码语言:Lua

自动换行

AI代码解释

thread=4
logger=nil
harbor=0
start="main" -- 启动第一个服务
lua_path="./skynet/lualib/?.lua;".."./skynet/lualib/?/init.lua;".."./lualib/?.lua;"
luaservice="./skynet/service/?.lua;./app/?.lua"
lualoader="./skynet/lualib/loader.lua"
cpath="./skynet/cservice/?.so"
lua_cpath="./skynet/luaclib/?.so"

Makefile

代码语言:Bash

自动换行

AI代码解释

SKYNET_PATH?=./skynet
all:
  cd $(SKYNET_PATH) && $(MAKE) PLAT='linux'
clean:
  cd $(SKYNET_PATH) && $(MAKE) clean

项目结构:

代码语言:Bash

自动换行

AI代码解释

-- mytest
----+ skynet 
----| app
-------- main.lua
-------- client.lua
----+ config
----+ Makefile

编译和运行:

代码语言:Bash

自动换行

AI代码解释

make
./skynet/skynet config

启动client.lua的方式:

代码语言:Bash

自动换行

AI代码解释

./skynet/3rd/lua/lua ./app/client.lua

5.1、测试直接关闭进程

启动client.lua,然后CTL+C直接关闭进程。

代码语言:Bash

自动换行

AI代码解释

./skynet/3rd/lua/lua ./app/client.lua
  1. 此时内核协议栈会收到一个FIN包,内核协议栈的读缓冲区插入EOF,并触发读事件。
  2. skynet开始读数据,读到EOF(即read()==0);开始关闭读事件和读端。
  3. 然后进入main.lua的while循环,尝试发送消息;但是消息是无法发送出去了,因为客户端进程已经关闭,资源已经被释放,协议栈会把其丢弃。
  4. 注意,读数据、写数据、业务逻辑都在线程池执行,socket网络线程只负责监听触发事件;socket网络线程和线程池通过pipeline消息交互,epoll可以监听这个消息,然后注册读、写事件。

5.2、测试关闭读端

启动client.lua,输入shut_r命令。

代码语言:Bash

自动换行

AI代码解释

$ ./skynet/3rd/lua/lua ./app/client.lua 
shut_r
  1. 关闭读端,会走RST流程,立即进入CLOSE状态,服务端会报出“Connection reset by peer”错误。
  2. 服务端关闭写端,但仍然能接受到客户端发送的数据。
  3. 接受完数据后才调用关闭读端,结束连接。

代码语言:Bash

自动换行

AI代码解释

receive a client:       4       127.0.0.1:35384
S recv: shut_r
S recv: shut_r2
closed  4
[:00000008] socket: error on 4 Connection reset by peer

5.2、测试关闭写端

启动client.lua,输入shut_w命令。

代码语言:Bash

自动换行

AI代码解释

$ ./skynet/3rd/lua/lua ./app/client.lua 
shut_w
  1. 关闭写端,服务器会关闭读端。
  2. 在完全关闭连接之前,服务器还可以发送数据到客户端。

server端:

代码语言:Bash

自动换行

AI代码解释

receive a client:       5       127.0.0.1:35486
S recv: shut_w1
S recv: shut_w2
S recv: shut_w3
S recv: shut_w4
closed  5

client端:

代码语言:Bash

自动换行

AI代码解释

shut_w
SHUTDOWN 3 1
=> shut_w1
=> shut_w2
=> shut_w3
=> shut_w4
FINAL

六、总结

ACK包是不会重发的,如果ACK包丢失了,那么发送端在一定时间内接受不到数据就会重发FIN包,一般三次FIN包都没有收到ACK就会直接进入close。

相关文章
|
8月前
|
人工智能 缓存 安全
你还是没有理解CAS
在高并发场景下,使用 `count++` 统计商品浏览次数可能导致计数丢失。本文介绍了如何使用 CAS(Compare and Swap)实现无锁的原子操作来解决该问题。CAS 通过比较内存值与期望值,确保更新操作的原子性,避免了线程竞争带来的数据错误。文章详细解析了 CAS 的工作机制、优势与局限性,并结合 Java 示例展示了其底层实现与实际应用,如高性能计数器、无锁栈和缓存更新策略。此外,还探讨了 CAS 可能引发的 ABA 问题及其解决方案,如版本号机制。最后,通过性能对比分析,帮助开发者根据场景合理选择并发控制方式。
193 0
|
8月前
|
人工智能 算法 C++
浅谈 KMP
KMP算法是一种高效的字符串匹配算法,由Knuth、Morris和Pratt提出。它通过预处理模式串构建next数组,利用匹配失败时的信息减少重复比较,从而提升匹配效率。其时间复杂度为O(m+n),适用于大规模文本匹配场景。
539 0
|
8月前
|
存储 人工智能 Shell
Lua与C语言接口编程实战指南:打造高性能、灵活的程序
本文深入介绍了 Lua 与 C 语言的交互机制,重点分析了 Lua 作为胶水语言在嵌入式系统、游戏开发(如 Skynet、OpenResty)中的应用。内容涵盖 Lua 环境搭建、虚拟栈管理、C 与 Lua 的相互调用、闭包、Userdata 和注册表的使用等核心技术,并结合代码示例讲解了如何在实际项目中实现 Lua 与 C 的高效交互,适合希望掌握 Lua 扩展与嵌入开发的工程师参考学习。
986 0
|
8月前
|
人工智能 编译器 C语言
C语言模拟面向对象三大特性与C++实现对比
C语言通过结构体和函数指针模拟面向对象特性,实现封装、继承和多态,而C++则通过原生语法支持。两者在实现原理上有相似之处,但C++在语法、编译期检查和内存管理方面更具优势,提高了代码的安全性和开发效率。
128 0
|
8月前
|
存储 人工智能 自然语言处理
深入理解 UpValue 和闭包
Lua 函数为第一类值,支持词法定界与闭包特性。函数可作为参数传递、返回值返回,且能访问外部变量(UpValue)。通过闭包机制,函数可携带其所需环境,实现灵活编程。
214 0
|
消息中间件 存储 大数据
深入理解操作系统中的进程间通信(IPC)机制
本文旨在探讨操作系统中进程间通信(IPC)的核心机制与其重要性。通过对不同IPC手段如管道、信号、消息队列及共享内存等的详细解析,揭示它们如何高效地促进进程间的信息交换与同步。文章不仅阐述各种IPC技术的实现原理,还探讨了它们在实际系统应用中的场景与优化策略,为系统开发者提供全面而深入的理解。
|
缓存 负载均衡 监控
数据库多实例的负载均衡技术深入
【10月更文挑战第23天】数据库多实例负载均衡技术是确保数据库系统高效运行的重要手段。通过合理选择负载均衡策略、实时监控实例状态、不断优化调整,能够实现资源的最优分配和系统性能的提升。在实际应用中,需要根据具体情况灵活运用各种负载均衡技术,并结合其他相关技术,以满足不断变化的业务需求。
|
5月前
|
缓存 监控 供应链
实战解析:阿里巴巴国际站 alibaba.item_get 商品详情数据API接口
本文详解阿里巴巴国际站alibaba.item_get API,涵盖OAuth2.0认证、签名生成、商品数据获取与标准化解析,提供Python实战代码,助力跨境电商实现智能选品、价格监控与供应链优化。
|
消息中间件 Web App开发 数据采集
微服务业务监控和行为分析怎么做?试试日志埋点
互联网公司一般都会有专门的数据团队对公司的一些业务指标负责;为了拿到这些基本的业务指标,一般也要工程团队去配合做一些数据采集工作,于是埋点诞生了。
1657 0
|
机器学习/深度学习 存储 小程序
OSS 实践篇-OSS API 鉴权剖析
出现 signature 一般出现客户端自签名调 API 的操作中, signature 的计算稍微复杂点,建议最好用 SDK 来替代计算的过程和多样性。如果业务强需求,先要读懂如果计算 signature。
7442 0
OSS 实践篇-OSS API 鉴权剖析

热门文章

最新文章

下一篇
开通oss服务