剖析KCP以及KCP在游戏中是如何使用的

本文涉及的产品
资源编排,不限时长
无影云电脑企业版,4核8GB 120小时 1个月
无影云电脑个人版,1个月黄金款+200核时
简介: 剖析KCP以及KCP在游戏中是如何使用的


       亲爱的各位读者你们好,由于前段时间忙于部分项目的重构和优化,未能及时更新文章,不少读者催更,哈哈,我还是很开心能抽出时间给大家再来分享下kcp的相关技术内幕,以及之前完善自己的网络库增加了KCP的客户端服务器收发支持(结尾会分享封装的客户端服务器C++源码)。

KCP概述

        对于游戏开发,尤其是MOBA游戏,或者全球唯一服架构类型的游戏,对于网络的要求比一般游戏要高。

       大多数游戏公司使用的网络无非就是TCP,UDP。我们都知道 TCP 协议具有重传机制,也就是说,如果发送方认为发生了丢包现象,就重发这些数据包。很显然,我们需要一个方法来猜测是否发生了丢包。最简单的想法就是,接收方每收到一个包,就向发送方返回一个 ACK,表示自己已经收到了这段数据,反过来,如果发送方一段时间内没有收到 ACK,就知道很可能是数据包丢失了,紧接着就重发该数据包,直到收到 ACK 为止。因为即使是超时了,这个数据包也可能并没有丢,它只是绕了一条远路,来的很晚而已。毕竟 TCP 协议是位于传输层的协议,不可能明确知道数据链路层和物理层发生了什么。但这并不妨碍我们的超时重传机制,因为接收方会自动忽略重复的包。

       超时和重传的概念其实就是这么简单,但内部的细节却是很多,我们最先想到的一个问题就是,到底多长时间才能算超时呢?

超时是怎么确定的?

假设我直接把超时时间设成一个固定值200ms,我们的电脑和很多服务器都有交互,这些服务器位于天南海北,国内国外,延迟差异巨大,打个比方:

假设你访问某国外网站,延迟有 130 ms,这就麻烦了,正常的数据包都可能被认为是超时,导致大量数据包被重发,可以想象,重发的数据包也很容易被误判为超时。。。雪崩效应的感觉

       所以设置固定值是很不可靠的,我们要根据网络延迟,动态调整超时时间,延迟越大,超时时间越长。

因此很多公司都使用了基于UDP的可靠传输KCP

分析KCP的实现

为了提高udp可靠性,KCP是在udp协议上封装一层可靠性传输机制(类似tcp的ACK机制、重传机制、序号机制、重排机制、窗口机制),就做到了兼具tcp的安全性(流量控制和拥塞控制等)和udp的实时性,并且具备一定的灵活性(超时重传、ack等),所以KCP并不是网络协议,而是一种业务逻辑。

关于KCP的实现C,C++源码大家可以直接看下韦神的 :

GitHub - skywind3000/kcp: KCP - A Fast and Reliable ARQ Protocol

代码逻辑写的很优雅,通过几个简单的函数调用就可以实现可靠性传输,对于使用者不需要关心丢包,重传,粘包的问题。

这里我简单介绍下kcp里的几个名词:

    • 用户数据:应用层发送的数据,比如一张图片2KB的数据。
    • MTU:最大传输单元,即每次发送的最大数据。
    • RTO:Retransmission TimeOut,重传超时时间。
    • cwnd:ongestion window,拥塞窗口,表示发送方可发送多少个KCP数据包。与接收方窗口有关,与网络状况(拥塞控制)有关,与发送窗口大小有关。
    • rwnd:receiver window,接收方窗口大小,表示接收方还可接收多少个KCP数据包。
    • snd_queue:待发送KCP数据包队列。
    • snd_nxt:下一个即将发送的kcp数据包序列号。
    • snd_una:下一个待确认的序列号

    • TCP的特点是可靠传输(累积确认、超时重传、选择确认)、流量控制(滑动窗口)
    • 、拥塞控制(慢开始、拥塞避免、快重传、快恢复)、面向连接。KCP对这些参数
    • 基本都可配,也没用建立/关闭连接的过程。

    kcp协议头部

    image.gif编辑

    conv:连接号。UDP是无连接的,conv用于表示来自于哪个客户端,是对连接的一种替代,使用conv的好处,其实我相信很多文章里都没有写,这里我从商业项目使用的经验可以告诉读者,我们的游戏用户玩游戏的过程中经常会出现由4G,5G,wifi之间进行自由切换,而且也存在基站之间的信号切换,导致客户端的ip地址也可能发生变化,如果没有一个ID来唯一标识,那么在移动网络发生切换过程中触发了断线重连,这就导致每次切换,玩家都要走这个流程和逻辑,你觉得玩家体验还能好吗?因此我们一般使用conv来记录这个id对应哪个客户端,即使客户端网络环境发生变化,也不需要断线重连,每次KCP包的协议头部都携带conv,这样我们就知道当前发送包的是哪个客户端,当然有一些服务器的做法是将KCP的conv和TCP协议的用户进行一一映射绑定,在TCP和KCP之间自由切换,在这里mark下,后边我会介绍这种用法是怎么实现的。

    cmd:命令字。如,IKCP_CMD_ACK确认命令,IKCP_CMD_WASK接收窗口大小询问命令,IKCP_CMD_WINS接收窗口大小告知命令。

    frg:分片,用户数据可能会被分成多个KCP包,发送出去。

    wnd:接收窗口大小,发送方的发送窗口不能超过接收方给出的数值。

    ts:时间戳。

    sn:序列号。

    una:下一个可接收的序列号。其实就是确认号,类似tcp的ack。

    len:数据长度。

    data:用户数据。

    KCP流程

    注意收发数据时,数据要先放到缓存重排然后再进行收,如下是kcp数据接收和发送的过程简图:

    image.gif编辑

           接收数据时,kcp会先把数据放到recv_buf中进行缓存,注意recv_buf中的数据是已经按sn排好序的,但是不一定完整连续,因为传输过程中可能有丢失,recv_queue是从recv_buf中拷贝出的连续完整的数据分片。用户调用ikcp_recv()时是从recv_que中读取数据。还要注意一点,ikcp_recv中的len一定要是整个数据的长度,要一次性读完,这和recvfrom的size是一样的。

    与接收过程类似,只不过顺序相反,用户调用ikcp_send()只是将数据放到了send_buf中,不过会自动进行数据分片,使得数据大小不超过mtu。然后,kcp会通过流量控制和拥塞控制将数据放到send_buf中再进行发送。

    注意区分一下,不断调用ikcp_send(),数据会累积在send_queue中,如果对方没有接收,对方的数据则会累积在对方的recv_buf中。

    说了这么多,其实kcp底层调用的,还是recvfrom()和sendto(),这一点不要忘记。

    kcp接收和确认机制

    KCP的接收过程是将UDP收到的数据进行解包,重新组装顺序的、可靠的数据后交付给用户。

    kcp_input输入UDP收到的数据包。kcp包对前面的24个字节进行解压,包括conv、 frg、 cmd、 wnd、 ts、 sn、 una、 len。根据una,会删除snd_buf中,所有una之前的kcp数据包,因为这些数据包接收者已经确认。根据wnd更新接收端接收窗口大小。根据不同的命令字进行分别处理,这块我就不赘述了,韦神的代码写的很详细,直接看kcp_input函数。

    KCP函数使用

    创建 KCP对象:

    // 初始化 kcp对象,conv为一个表示会话编号的整数,和tcp的 conv一样,通信双

    // 方需保证 conv相同,相互的数据包才能够被认可,user是一个给回调函数的指针

    ikcpcb *kcp = ikcp_create(conv, user);

    设置传输回调函数(如UDP的send函数):

    // KCP的下层协议输出函数,KCP需要发送数据时会调用它

    // buf/len 表示缓存和长度

    // user指针为 kcp对象创建时传入的值,用于区别多个 KCP对象

    int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)

    {

    ….

    }

    // 设置回调函数

    kcp->output = udp_output;

    循环调用 update:

    // 以一定频率调用 ikcp_update来更新 kcp状态,并且传入当前时钟(毫秒单位)

    // 如 10ms调用一次,或用 ikcp_check确定下次调用 update的时间不必每次调用

    ikcp_update(kcp, millisec);

    输入一个应用层数据包(如UDP收到的数据包):

    //

    收到一个下层数据包(比如UDP包)时需要调用:ikcp_input(kcp,received_udp_packet,received_udp_size);

    处理了下层协议的输出/输入后 KCP协议就可以正常工作了,使用 ikcp_send 来向

    远端发送数据。而另一端使用 ikcp_recv(kcp, ptr, size)来接收数据

    image.gif编辑

    UDP收到的包,不断通过kcp_input喂给KCP,KCP会对这部分数据(KCP协议数据)进行解包,重新封装成应用层用户数据,应用层通过kcp_recv获取。应用层通过kcp_send发送数据,KCP会把用户数据拆分kcp数据包,通过kcp_output,以UDP(send)的方式发送。

    KCP的客户端/服务器封装

    这里所提供的代码是我个人的C++网络库框架部分代码,其实KCP也是去年为了支持某项目才加入的网络模块,这里我给大家分享出来,如果有需要相关cpp实现代码,可以关注我公众号,私信发给我请备注说明( kcp源码 ):

    我在这里封装了一个KcpChannel 类,主要是负责kcp对象的创建,接收udp的输入,输出数据并回调服务器和客户端的发送接口send_kcp_callback 。

    KcpChannel.h

    #pragma once
    //#include "inet_addr.h"
    #include "os.h"
    #include "inet_addr.h"
    #include <unordered_map> 
    #include "ikcp.h"
    #include <log.h>
    #include <string>
    #include "config.h"
    #include <util.h>
    #include <functional>
    #include "event_handler.h"
    #define KCP_PKG_LIMIT 100000 
    typedef std::function<int(const char*, int, sockaddr_in&)> send_kcp_callback;
    inline uint32_t iclock()
    {
        return (uint32_t)(utils::Util::GetCurrentTimeMsec() & 0xfffffffful);
    }
    class KcpChannel {
    public: 
        KcpChannel(uint32_t conv, send_kcp_callback cb, sockaddr_in remote);
        KcpChannel(uint32_t conv, send_kcp_callback cb, INET_Addr remote);
        static int kcp_output(const char* buf, int len, ikcpcb* kcp, void* ptr);
        bool init_kcp();
        ~KcpChannel();
        void reset();
        int input(const char* buffer, const int len);    
        void flush();    
        bool get_data(const char* recv_buf, int recv_len, char* out_buf, int& out_len);
        bool get_data(char*& out_buf, int& out_len);    
        void update_remote(sockaddr_in remote);
        void update(uint64_t current_clock);
        bool try_send();
        int  send(const char* send_buffer, int len);
    public:
        uint32_t m_conv;
        ikcpcb* m_kcp;
        Event_Handler*   m_kcp_handler;
        sockaddr_in      m_remote;
        IUINT32          m_next_kcp_update_ = 0;
        send_kcp_callback m_send_callback; 
    };

    image.gif

    KcpChannel.cpp的相关实现如下:

    #include "kcp_channel.h"
    #include "log.h"  
    #include "config.h"
    KcpChannel::KcpChannel(uint32_t conv, send_kcp_callback cb, sockaddr_in remote) :m_conv(conv), m_send_callback(cb), m_remote(remote)
    {
        init_kcp();
    }
    KcpChannel::KcpChannel(uint32_t conv, send_kcp_callback cb, INET_Addr remote) : m_conv(conv), m_send_callback(cb)
    {
        m_remote.sin_family = AF_INET;
        m_remote.sin_addr.s_addr = remote.get_addr();
        m_remote.sin_port = remote.get_port();
        init_kcp();
    }
    int KcpChannel::kcp_output(const char* buf, int len, ikcpcb* kcp, void* ptr)
    {
        assert(NULL != ptr);
        KcpChannel* ch = (KcpChannel*)(ptr);
        string s;
        s.assign(buf, len);
        ch->m_send_callback(buf, len, ch->m_remote);
        return 0;
    }
    bool KcpChannel::init_kcp() {
        m_kcp = ikcp_create(m_conv, (void*)this);
        assert(NULL != m_kcp);
        m_kcp->output = &KcpChannel::kcp_output;
        //ikcp_setmtu(m_kcp, 128);
        ikcp_wndsize(m_kcp, 512, 512);
        // 启动快速模式
        // 第二个参数 nodelay-启用以后若干常规加速将启动
        // 第三个参数 interval为内部处理时钟,默认设置为 10ms
        // 第四个参数 resend为快速重传指标,设置为2
        // 第五个参数 为是否禁用常规流控,这里禁止
         //普通模式: 
        //ikcp_nodelay(m_kcp, 0, 40, 0, 0);
        ikcp_nodelay(m_kcp, 1, 10, 2, 1);
        ikcp_nodelay(m_kcp, 1, 10, 2, 1);
        m_kcp->rx_minrto = 10; //超时重传最小时间 10ms
        m_kcp->fastresend = 2;
        return 0;
    }
    KcpChannel::~KcpChannel() {
        ikcp_update(m_kcp, iclock());
        ikcp_flush(m_kcp);
        ikcp_release(m_kcp);
    }
    void KcpChannel::reset() {
        ikcp_update(m_kcp, iclock());
        ikcp_flush(m_kcp);
        ikcp_release(m_kcp);
        init_kcp();
    }
    int KcpChannel::input(const char* buffer, const int len)
    {
        int send_ret = ikcp_input(m_kcp, buffer, len);
        if (send_ret == 0) {
            update(iclock());
            flush();
        }
        return 0;
    }
    void KcpChannel::flush()
    {
        IUINT32 cwnd = min(m_kcp->snd_wnd, m_kcp->rmt_wnd);
        if (m_kcp->nocwnd == 0) {
            cwnd = min(m_kcp->cwnd, cwnd);
        }
        if (ikcp_waitsnd(m_kcp) > cwnd)
            ikcp_flush(m_kcp);
    }
    bool KcpChannel::get_data(const char* recv_buf, int recv_len, char* out_buf, int& out_len)
    {
        out_len = ikcp_recv(m_kcp, out_buf, RECV_BUFFER_LENGTH);
        if (out_len <= 0) {
            LOG(WARN)("SOCK_Dgram::handle_input, kcp recv error. packet len[%d]", out_len);
            return false;
        }
        return true;
    }
    bool KcpChannel::get_data(char*& out_buf, int& out_len)
    {
        int peeksize = ikcp_peeksize(m_kcp);
        if (peeksize <= 0) {
            return false;
        }
        out_buf = new char[peeksize];
        out_len = ikcp_recv(m_kcp, out_buf, RECV_BUFFER_LENGTH);
        if (out_len <= 0) {
            LOG(WARN)("SOCK_Dgram::handle_input, kcp recv error. packet len[%d]", out_len);
            return false;
        }
        return true;
    }
    void KcpChannel::update_remote(sockaddr_in remote)
    {
        bool update = false;
        if (remote.sin_addr.s_addr != m_remote.sin_addr.s_addr) {
            m_remote.sin_addr.s_addr = remote.sin_addr.s_addr;
            update = true;
        }
        if (remote.sin_port != m_remote.sin_port) {
            m_remote.sin_port = remote.sin_port;
            update = true;
        }
        if (update) {
            reset();
        }
    }
    void KcpChannel::update(uint64_t current_clock)
    {
        IUINT32 current = current_clock & 0xfffffffflu;
        if (current >= m_next_kcp_update_)
        {
            ikcp_update(m_kcp, current);
            m_next_kcp_update_ = ikcp_check(m_kcp, current);
        }
    }
    bool KcpChannel::try_send() {
        if (ikcp_waitsnd(m_kcp) > (int)(m_kcp->snd_wnd * 2)) {
            LOG(ERROR)("SOCK_KCP_Client::send_data error, kcp nsnd_buf is full, %d - %d", m_kcp->nsnd_buf + m_kcp->nsnd_que, m_kcp->snd_wnd * 2);
            return false;
        }
        return true;
    }
    int  KcpChannel::send(const char* send_buffer, int len)
    {
        return ikcp_send(m_kcp, send_buffer, len);
    }

    image.gif

    kcp客户端的源码比较简单,这里我只罗列kcp服务器端的源码头文件和cpp文件:

    #pragma once
    #include "net_event.h"
    #include "event_handler.h"
    #include "inet_addr.h"
    #include "net_manager.h"
    #include "block_buffer.h"
    #include "ikcp.h"
    #include "kcp_channel.h"
    #include "os.h"
    class Net_Manager;
    //! @class SOCK_KCP
    //! @brief kcp通道处理类
    class SOCK_KCP_Server : public Event_Handler
    {
    public:
      //! 构造函数
      //! @param net_manager 网络管理器
      SOCK_KCP_Server(Net_Manager* net_manager, Packet_Splitter* packet_splitter, const INET_Addr& local_addr, void* pUserData);
      //! 析构函数
      virtual ~SOCK_KCP_Server();
    public:
      //! 创建kcp通道
      //! @param local_addr upd本地绑定地址
      //! @param handler 该连接的事件处理函数指针
      //! @return 结果, 0:成功, -1失败
      int create_kcp_server(int netbufsize);
      //! 获取通道id
      //! @return 通道id
      virtual uint32_t get_id();
      //! 获取socket句柄
      //! @return socket句柄
      virtual SOCKET get_handle();
      //! 处理读
      //! @return 处理结果 0:处理正常, -1: 连接被关闭, -2:连接异常
      virtual int handle_input();
      //! 处理写
      //! @return 处理结果 0:处理正常, -1: 连接被关闭, -2:连接异常
      virtual int handle_output();
      //! 连接异常
      virtual int handle_exception();
      //! 连接关闭
      virtual int handle_close();
      //! 超时
      virtual int handle_timeout();
      //! 提交发送任务
      //! @param send_task 待发送的任务
      virtual int post_packet(Net_Packet* send_packet);  
      int send(const char* buf, int len, sockaddr_in& remote_addr);
      KcpChannel* get_session(ISTDUINT32& conv);
      bool  add_session(ISTDUINT32& conv, KcpChannel* sock_kcp);
      void  remove_session(ISTDUINT32& conv); 
      void update();
    private:
      //! 通道id
      uint32_t m_id;
      //! socket句柄
      SOCKET m_socket;
      //! 本地监听地址
      INET_Addr m_local_addr; 
      //! 网络管理器
      Net_Manager* m_net_manager; 
      //! 接受缓存
      Block_Buffer_T<RECV_BUFFER_LENGTH> m_recv_buffer; 
      //! 该连接的事件处理函数指针
      void* m_UserData; 
      std::unordered_map<ISTDUINT32, KcpChannel*> m_sessions;
    };

    image.gif

    #include "sock_kcp_server.h" 
    #include "net_manager.h"
    #include "log.h"
    #include "ikcp.h"
    #include <sstream>
    #include <util.h>
    #define KCP_SERVER_TIMEOUT 100
    SOCK_KCP_Server::SOCK_KCP_Server(Net_Manager* net_manager, Packet_Splitter* packet_splitter, const INET_Addr& local_addr,void* pUserData)
    {
        m_id = net_manager->m_id_manager.acquire(ID_UDP);
        assert(m_id != 0);
        m_socket = INVALID_SOCKET;
        m_net_manager = net_manager;
        m_UserData = pUserData; 
        m_local_addr = local_addr; 
        timeout = utils::Util::GetCurrentTimeMsec() + KCP_SERVER_TIMEOUT;
    } 
    SOCK_KCP_Server::~SOCK_KCP_Server()
    {
        if (0 != m_id) {
            m_net_manager->m_id_manager.release(m_id);
        } 
        if (INVALID_SOCKET != m_socket) {
            close(m_socket);
            m_socket = INVALID_SOCKET;
        }
        for (auto it : m_sessions) {
            if (it.second != NULL) {
                delete it.second;
                it.second = NULL;
            }
        }
        m_sessions.clear();
    }
    KcpChannel* SOCK_KCP_Server::get_session(ISTDUINT32& conv)
    {
        auto it = m_sessions.find(conv);
        if (it != m_sessions.end()) {
            return it->second;
        }
        return NULL;
    }
    void  SOCK_KCP_Server::remove_session(ISTDUINT32& conv)
    {
        auto it = m_sessions.find(conv);
        if (it != m_sessions.end()) {
            delete it->second;
            it->second = NULL;
            m_sessions.erase(it);
        }
    }
    bool SOCK_KCP_Server::add_session(ISTDUINT32& conv, KcpChannel* sock_kcp)
    {
        auto it = m_sessions.find(conv);
        if (it != m_sessions.end()) {
            return false;
        }
        m_sessions.insert(std::make_pair(conv, sock_kcp));
        return true;
    }
    void SOCK_KCP_Server::update()
    {
        auto i_c = iclock();
        for (auto it = m_sessions.begin(); it != m_sessions.end(); it++)
        {
            it->second->update(i_c);
            it->second->flush();
        }
    }
    int SOCK_KCP_Server::create_kcp_server(int netbufsize)
    { 
        m_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
        if (INVALID_SOCKET == m_socket) {
            LOG(ERROR)("SOCK_KCP_Server::create_tcp_server error, socket error, errno:%d", error_no());
            return -1;
        }
        int nret = set_socket_nonblocking(m_socket);
        if (nret != 0)
        {
            LOG(INFO)("SOCK_KCP_Server::set_socket_nonblocking error, socket error, errno:%d", error_no());
            return -1;
        }
        //设置网络底层收发缓冲区长度
        nret = set_socket_bufsize(m_socket, true, netbufsize);
        int nsize = 0;
        nret = get_socket_bufsize(m_socket, true, nsize);
        if (nsize != netbufsize)
        {
            LOG(WARN)("setsockopt so_rcvbuf failed.to set size %d orgin size %d.", netbufsize, nsize);
        }
        nret = set_socket_bufsize(m_socket, false, netbufsize);
        nsize = 0;
        nret = get_socket_bufsize(m_socket, false, nsize);
        if (nsize != netbufsize)
        {
            LOG(WARN)("setsockopt so_sndbuf failed.to set size %d orgin size %d.", netbufsize, nsize);
        }
        int val = 1;
        setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val));
        sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_addr.s_addr = m_local_addr.get_addr();
        local.sin_port = m_local_addr.get_port();
        int rc = ::bind(m_socket, (sockaddr*)&local, sizeof(local));
        if (0 != rc) {
            int err = error_no();
            LOG(ERROR)("SOCK_KCP_Server::create_tcp_server error, bind error, errno:%d", err);
            close(m_socket);
            m_socket = INVALID_SOCKET;
            return -1;
        } 
        LOG(INFO)("create new udp server:%d, %d\n", local.sin_addr.s_addr, local.sin_port);
        return 0;
    }  
    uint32_t SOCK_KCP_Server::get_id()
    {
        return m_id;
    }
    SOCKET SOCK_KCP_Server::get_handle()
    {
        return m_socket;
    }
    int SOCK_KCP_Server::handle_input()
    {
        int r = 0; 
        sockaddr_in from;
        socklen_t fromlen = sizeof(from);
        int datasize = 0;
        //一般kcp的mtu是1400字节 IKCP_MTU_DEF 
        // 用户获得收到的数据 
        uint32_t curclock = iclock();
        while (true) { 
            sockaddr_in from;
            socklen_t fromlen = sizeof(from);
            int rc = recvfrom(m_socket, m_recv_buffer.wr_ptr(), m_recv_buffer.space_length(), 0, (sockaddr*)&from, &fromlen);
            if (rc > 0)
            { 
                m_recv_buffer.wr_ptr(rc);
                auto conv = ikcp_getconv(m_recv_buffer.rd_ptr());
                auto kcp_session = get_session(conv);
                if (NULL == kcp_session) {
                    kcp_session = new KcpChannel(conv, std::bind(&SOCK_KCP_Server::send, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), from);//INET_Addr remote_addr;
                    add_session(conv, kcp_session); 
                }
                else {
                    kcp_session->update_remote(from);//防止IP,端口地址发生变化,比如切换网络环境  
                } 
                int packet_len = 0;
                int send_ret = kcp_session->input(m_recv_buffer.rd_ptr(), m_recv_buffer.data_length());
                if (send_ret == 0) {
                    curclock = iclock();
                    kcp_session->flush();
                    kcp_session->update(curclock);                
                } 
                m_recv_buffer.rd_ptr(m_recv_buffer.data_length());
                if ((m_recv_buffer.data_length() == 0) || (m_recv_buffer.space_length() == 0))
                {
                    m_recv_buffer.recycle();
                }
                while (true) {
                    char*  packet_buffer = NULL;
                    bool ret = kcp_session->get_data(packet_buffer, packet_len);
                    if (ret == false) {
                        break;
                    }
                    Net_Event* net_event = new Net_Event;
                    net_event->net_event_type = TYPE_DATA;
                    net_event->id = m_id;
                    net_event->new_id = 0;
                    net_event->local_addr = m_local_addr;
                    net_event->UserData = m_UserData;
                    net_event->remote_addr.set_addr(from.sin_addr.s_addr);
                    net_event->remote_addr.set_port(from.sin_port);
                    net_event->packet.write_data(packet_buffer, packet_len);
                    net_event->packet.remote_addr = net_event->remote_addr;
                    net_event->packet.movetobegin();
                    net_event->packet.m_cmd.id = conv;//需要kcp上层再返回给网络层
                    m_net_manager->put_event(net_event); 
                    delete packet_buffer;
                    packet_buffer = NULL;
                    kcp_session->update(iclock());
                    kcp_session->flush(); 
                }
            }
            else if (rc == 0)
            {
                // close
                LOG(WARN)("SOCK_KCP_Server::handle_input, socket close by peer");
                return 0;//-1;
            }
            else
            {
                if (NETMANAGER_EAGAIN == error_no()) {
                    return 0;
                }
                else {
                    // exception
                    LOG(WARN)("SOCK_KCP_Server::handle_input error, recvfrom error, errno:%d", error_no()); 
                    return 0;//-2;
                }
            }
        }
        if (datasize > 0)
        {
            update();        
        }  
        return 0;
    }
    int SOCK_KCP_Server::handle_output()
    {
        // 如果当前正在发送的任务非空则先发送 
        return 0;
    }
    int SOCK_KCP_Server::handle_exception()
    {
        if (INVALID_SOCKET != m_socket) {
            close(m_socket);
            m_socket = INVALID_SOCKET;
        }
        Net_Event* net_event = new Net_Event;
        net_event->net_event_type = TYPE_EXCEPTION;
        net_event->id = m_id;
        net_event->new_id = 0;
        //net_event->local_addr = m_local_addr;
        net_event->UserData = m_UserData;
        m_net_manager->put_event(net_event);
        return 0;
    }
    int SOCK_KCP_Server::handle_close()
    {
        if (INVALID_SOCKET != m_socket) {
            close(m_socket);
            m_socket = INVALID_SOCKET;
        }
        if (notify_close)
        {
            return 0;
        }
        Net_Event* net_event = new Net_Event;
        net_event->net_event_type = TYPE_CLOSE;
        net_event->id = m_id;
        net_event->new_id = 0;
        net_event->local_addr = m_local_addr; 
       // net_event->local_addr = m_local_addr;
        net_event->UserData = m_UserData;
        m_net_manager->put_event(net_event);
        return 0;
    }
    int SOCK_KCP_Server::handle_timeout()
    { 
        timeout = utils::Util::GetCurrentTimeMsec() + KCP_SERVER_TIMEOUT;
        auto i_c = iclock();
        for (auto it = m_sessions.begin(); it != m_sessions.end(); it++)
        {
            it->second->update(i_c);
            it->second->flush();
            while (true) {
                int packet_len = 0;
                char* out_buf = NULL;
                bool ret = it->second->get_data(out_buf, packet_len);
                if (ret == false) {
                    break;
                }
                Net_Event* net_event = new Net_Event;
                net_event->net_event_type = TYPE_DATA;
                net_event->id = m_id;
                net_event->new_id = 0;
                net_event->local_addr = m_local_addr;
                net_event->UserData = m_UserData;
                net_event->packet.write_data(out_buf, packet_len);
                net_event->packet.remote_addr = net_event->remote_addr;
                net_event->packet.movetobegin();
                net_event->packet.m_cmd.id = it->second->m_conv;//需要kcp上层再返回给网络层
                m_net_manager->put_event(net_event);
            }       
        }
        return -1;
    }
    int SOCK_KCP_Server::post_packet(Net_Packet* send_packet)
    {
        // sync if too many send package
        auto clock = iclock();
        ISTDUINT32 conv = send_packet->m_cmd.id;
        if (send_packet->m_cmd.cmd == Net_Packet::NET_DELETE) {
            LOG(INFO)("SOCK_KCP_Server::delete kcp conv");
            remove_session(conv);
            return 0;
        }
        auto kcp_session = get_session(conv);
        if (kcp_session == NULL) {
            LOG(ERROR)("SOCK_KCP_Server::send_data error");
            delete send_packet;
            return 0;
        }
        if (!kcp_session->try_send()) {
            //放到缓冲队列中 TODO,算了还是丢弃掉吧
            LOG(ERROR)("SOCK_KCP_Server::post_packet error, start drop package");
            delete send_packet;
            return 0;
        }
        int data_len = 0;
        char* send_buffer = send_packet->get_rptr(data_len);
        if (kcp_session->send((const char*)send_buffer, data_len) < 0) {
            LOG(ERROR)("SOCK_KCP_Server::send_data error");
            delete send_packet;
            return 0;
        } 
        kcp_session->update(clock);
        kcp_session->flush();
        delete send_packet;
        // 监控可写事件
        int rc = reactor()->register_handler(this, MASK_WRITE);
        if (0 != rc) {
            // 设置reactor失败, 认为socket异常
            LOG(ERROR)("SOCK_KCP_Server::send_data error, register_handler error");
            reactor()->notify_close(m_id);
            return 0;
        } 
        return 0;
    }  
    int SOCK_KCP_Server::send(const char* buf, int len, sockaddr_in& remote_addr)
    { 
        socklen_t tolen = sizeof(remote_addr);
        //TODO 如果当前有发送的则推送到队列中。否则直接发送
        const char* pBuf = buf;
        int curdatalen = len;
        int rc = 0;
        while (curdatalen > 0)
        {
            rc = sendto(m_socket, pBuf, curdatalen, 0, (sockaddr*)&remote_addr, tolen);
            if (rc > 0) {
                curdatalen -= rc;
                //m_current_send_length += rc;
                pBuf += rc;
            }
            else {
                if (NETMANAGER_EAGAIN == error_no()) {
                    return false;
                }
                else {
                    // exception
                    int nErrCode = error_no();
                    LOG(WARN)("SOCK_KCP_Server::handle_output error, sendto error, errno:%d", nErrCode);
                    return -1;
                }
            }
        }  
        return 0;
    }

    image.gif


    相关实践学习
    通过Ingress进行灰度发布
    本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
    容器应用与集群管理
    欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
    相关文章
    |
    4月前
    |
    网络协议 C语言
    C语言 网络编程(十一)TCP通信创建流程---服务端
    在服务器流程中,新增了绑定IP地址与端口号、建立监听队列及接受连接并创建新文件描述符等步骤。`bind`函数用于绑定IP地址与端口,`listen`函数建立监听队列并设置监听状态,`accept`函数则接受连接请求并创建新的文件描述符用于数据传输。套接字状态包括关闭(CLOSED)、同步发送(SYN-SENT)、同步接收(SYN-RECEIVE)和已建立连接(ESTABLISHED)。示例代码展示了TCP服务端程序如何初始化socket、绑定地址、监听连接请求以及接收和发送数据。
    |
    4月前
    |
    网络协议 C语言
    C语言 网络编程(十二)TCP通信创建-粘包
    TCP通信中的“粘包”现象指的是由于协议特性,发送方的数据包被拆分并在接收方按序组装,导致多个数据包粘连或单个数据包分割。为避免粘包,可采用定长数据包或先传送数据长度再传送数据的方式。示例代码展示了通过在发送前添加数据长度信息,并在接收时先读取长度后读取数据的具体实现方法。此方案适用于长度不固定的数据传输场景。
    |
    4月前
    |
    网络协议 C语言
    C语言 网络编程(十)TCP通信创建流程---客户端
    在TCP通信中,客户端需通过一系列步骤与服务器建立连接并进行数据传输。首先使用 `socket()` 函数创建一个流式套接字,然后通过 `connect()` 函数连接服务器。连接成功后,可以使用 `send()` 和 `recv()` 函数进行数据发送和接收。最后展示了一个完整的客户端示例代码,实现了与服务器的通信过程。
    |
    7月前
    |
    网络协议 Java Go
    技术笔记:KCP协议学习
    技术笔记:KCP协议学习
    133 0
    |
    网络协议 数据安全/隐私保护 网络架构
    Netty实战(十五)UDP广播事件(一)UDP简介和示例程序
    用户数据报协议(UDP)上,它通常用在性能至关重要并且能够容忍一定的数据包丢失的情况下使用
    510 0
    |
    数据库 容器
    Netty实战(十六)UDP广播事件(二)编写广播者和监视器
    Netty 提供了大量的类来支持 UDP 应用程序的编写
    167 0
    |
    存储 缓存 Java
    golang channel的创建、接受和发送原理讲透
    golang channel的创建、接受和发送原理讲透
    |
    网络协议
    TCP 通信并发服务器详解(附有案例代码)
    TCP 通信并发服务器详解(附有案例代码)
    |
    网络协议
    TCP 通信流程详解(附有案例代码)
    TCP 通信流程详解(附有案例代码)