C#高性能大容量SOCKET并发(三):接收、发送

简介: 原文:C#高性能大容量SOCKET并发(三):接收、发送 异步数据接收有可能收到的数据不是一个完整包,或者接收到的数据超过一个包的大小,因此我们需要把接收的数据进行缓存。
原文: C#高性能大容量SOCKET并发(三):接收、发送

异步数据接收有可能收到的数据不是一个完整包,或者接收到的数据超过一个包的大小,因此我们需要把接收的数据进行缓存。异步发送我们也需要把每个发送的包加入到一个队列,然后通过队列逐个发送出去,如果每个都实时发送,有可能造成上一个数据包未发送完成,这时再调用SendAsync会抛出异常,提示SocketAsyncEventArgs正在进行异步操作,因此我们需要建立接收缓存和发送缓存。

接收

通过Completed事件响应后调用AsyncSocketInvokeElement.ProcessReceive,在ProcessReceive中,我们把收到数据先写入一个缓存,然后进行分包,分包后压给包处理函数ProcessPacket,ProcessPacket函数然后调用ProcessCommand处理具体的命令,也是各个协议实现业务逻辑的地方,具体代码如下:

        public virtual bool ProcessReceive(byte[] buffer, int offset, int count) //接收异步事件返回的数据,用于对数据进行缓存和分包
        {
            m_activeDT = DateTime.UtcNow;
            DynamicBufferManager receiveBuffer = m_asyncSocketUserToken.ReceiveBuffer;

            receiveBuffer.WriteBuffer(buffer, offset, count);
            if (receiveBuffer.DataCount > sizeof(int))
            {
                //按照长度分包
                int packetLength = BitConverter.ToInt32(receiveBuffer.Buffer, 0); //获取包长度
                if (NetByteOrder)
                    packetLength = System.Net.IPAddress.NetworkToHostOrder(packetLength); //把网络字节顺序转为本地字节顺序


                if ((packetLength > 10 * 1024 * 1024) | (receiveBuffer.DataCount > 10 * 1024 * 1024)) //最大Buffer异常保护
                    return false;

                if ((receiveBuffer.DataCount - sizeof(int)) >= packetLength) //收到的数据达到包长度
                {
                    bool result = ProcessPacket(receiveBuffer.Buffer, sizeof(int), packetLength);
                    if (result)
                        receiveBuffer.Clear(packetLength + sizeof(int)); //从缓存中清理
                    return result;
                }
                else
                {
                    return true;
                }
            }
            else
            {
                return true;
            }
        }

        public virtual bool ProcessPacket(byte[] buffer, int offset, int count) //处理分完包后的数据,把命令和数据分开,并对命令进行解析
        {
            if (count < sizeof(int))
                return false;
            int commandLen = BitConverter.ToInt32(buffer, offset); //取出命令长度
            string tmpStr = Encoding.UTF8.GetString(buffer, offset + sizeof(int), commandLen);
            if (!m_incomingDataParser.DecodeProtocolText(tmpStr)) //解析命令
              return false;

            return ProcessCommand(buffer, offset + sizeof(int) + commandLen, count - sizeof(int) - commandLen); //处理命令
        }

        public virtual bool ProcessCommand(byte[] buffer, int offset, int count) //处理具体命令,子类从这个方法继承,buffer是收到的数据
        {
            return true;
        }

发送

通过Completed事件响应后调用AsyncSocketInvokeElement.SendCompleted,在SendCompleted中我们需要在队列中清除已发送的包,并检测是否还有剩余需要发送的数据包,如果有,则继续发送,具体实现如下:

        public virtual bool SendCompleted()
        {
            m_activeDT = DateTime.UtcNow;
            m_sendAsync = false;
            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
            asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包
            int offset = 0;
            int count = 0;
            if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count))
            {
                m_sendAsync = true;
                return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,
                    asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count);
            }
            else
                return SendCallback();
        }

        //发送回调函数,用于连续下发数据
        public virtual bool SendCallback()
        {
            return true;
        }
在AsyncSocketInvokeElement中提供函数给子类发送数据,业务逻辑是把当前数据包写入缓存,并检测当前是否正在发送包,如果正在发送,则等待回调,如果没有正在发送的数据包,则投递发送请求。

        public bool DoSendResult()
        {
            string commandText = m_outgoingDataAssembler.GetProtocolText();
            byte[] bufferUTF8 = Encoding.UTF8.GetBytes(commandText);
            int totalLength = sizeof(int) + bufferUTF8.Length; //获取总大小
            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
            asyncSendBufferManager.StartPacket();
            asyncSendBufferManager.DynamicBufferManager.WriteInt(totalLength, false); //写入总大小
            asyncSendBufferManager.DynamicBufferManager.WriteInt(bufferUTF8.Length, false); //写入命令大小
            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(bufferUTF8); //写入命令内容
            asyncSendBufferManager.EndPacket();

            bool result = true;
            if (!m_sendAsync)
            {
                int packetOffset = 0;
                int packetCount = 0;
                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))
                {
                    m_sendAsync = true;
                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs, 
                        asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);
                }                
            }
            return result;
        }

        public bool DoSendResult(byte[] buffer, int offset, int count)
        {
            string commandText = m_outgoingDataAssembler.GetProtocolText();
            byte[] bufferUTF8 = Encoding.UTF8.GetBytes(commandText);
            int totalLength = sizeof(int) + bufferUTF8.Length + count; //获取总大小
            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
            asyncSendBufferManager.StartPacket();
            asyncSendBufferManager.DynamicBufferManager.WriteInt(totalLength, false); //写入总大小
            asyncSendBufferManager.DynamicBufferManager.WriteInt(bufferUTF8.Length, false); //写入命令大小
            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(bufferUTF8); //写入命令内容
            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(buffer, offset, count); //写入二进制数据
            asyncSendBufferManager.EndPacket();

            bool result = true;
            if (!m_sendAsync)
            {
                int packetOffset = 0;
                int packetCount = 0;
                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))
                {
                    m_sendAsync = true;
                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs, 
                        asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);
                }
            }
            return result;
        }

        public bool DoSendBuffer(byte[] buffer, int offset, int count)
        {
            AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
            asyncSendBufferManager.StartPacket();
            asyncSendBufferManager.DynamicBufferManager.WriteBuffer(buffer, offset, count);
            asyncSendBufferManager.EndPacket();

            bool result = true;
            if (!m_sendAsync)
            {
                int packetOffset = 0;
                int packetCount = 0;
                if (asyncSendBufferManager.GetFirstPacket(ref packetOffset, ref packetCount))
                {
                    m_sendAsync = true;
                    result = m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs, 
                        asyncSendBufferManager.DynamicBufferManager.Buffer, packetOffset, packetCount);
                }
            }
            return result;
        }

DEMO下载地址: http://download.csdn.net/detail/sqldebug_fan/7467745
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

目录
相关文章
|
7月前
|
存储 Python
Python网络编程基础(Socket编程) UDP 发送和接收数据
【4月更文挑战第10天】对于UDP客户端而言,发送数据是一个相对简单的过程。首先,你需要构建一个要发送的数据报,这通常是一个字节串(bytes)。然后,你可以调用socket对象的`sendto`方法,将数据报发送到指定的服务器地址和端口。
|
7月前
|
数据处理 C# C++
如何使用C#和C++结构体实现Socket通信
如何使用C#和C++结构体实现Socket通信
321 0
|
7月前
|
网络协议 Python
python中socket客户端发送和接收数据
【4月更文挑战第7天】本教程聚焦TCP客户端数据发送与接收。使用Python的`socket`模块,通过`send()`发送字节串至服务器,如`client_socket.send(message_bytes)`;用`recv()`接收数据,如`received_data = client_socket.recv(buffer_size)`。异常处理确保网络错误时程序健壮性,例如`try-except`捕获`socket.error`。理解和掌握这些基础操作对于构建稳定的TCP客户端至关重要。
1455 1
|
2月前
|
C# 开发者
C# 一分钟浅谈:Socket 编程基础
【10月更文挑战第7天】本文介绍了Socket编程的基础知识、基本操作及常见问题,通过C#代码示例详细展示了服务器端和客户端的Socket通信过程,包括创建、绑定、监听、连接、数据收发及关闭等步骤,帮助开发者掌握Socket编程的核心技术和注意事项。
102 3
C# 一分钟浅谈:Socket 编程基础
|
2月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
2月前
|
SQL 传感器 开发框架
今天我们聊聊C#的并发和并行
今天我们聊聊C#的并发和并行
61 1
|
6月前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
107 0
|
4月前
|
网络协议 数据格式 Python
python Socket无限发送接收数据方式
Socket是指套接字,是对网络中不同主机上的应用进程之间进行双向通信的端点的一种抽象。 一个套接字就是网络上进程通信的一端,提供了应用层进程利用网络协议交换数据的机制。
|
4月前
|
开发者 C# C++
揭秘:如何轻松驾驭Uno Platform,用C#和XAML打造跨平台神器——一步步打造你的高性能WebAssembly应用!
【8月更文挑战第31天】Uno Platform 是一个跨平台应用程序框架,支持使用 C# 和 XAML 创建多平台应用,包括 Web。通过编译为 WebAssembly,Uno Platform 可实现在 Web 上运行高性能、接近原生体验的应用。本文介绍如何构建高效的 WebAssembly 应用:首先确保安装最新版本的 Visual Studio 或 VS Code 并配置 Uno Platform 开发环境;接着创建新的 Uno Platform 项目;然后通过安装工具链并使用 Uno WebAssembly CLI 编译应用;最后添加示例代码并测试应用。
124 0
|
4月前
|
开发者 Apache 程序员
揭秘Apache Wicket:页面生命周期背后的神秘力量!
【8月更文挑战第31天】李工是一位热爱Web开发的程序员,近日在技术博客上分享了他对Apache Wicket框架的学习心得,特别是页面生命周期的理解。他认为掌握Wicket页面生命周期对于开发富交互式Web应用至关重要。他通过一个简单的计数器应用示例,详细解释了Wicket的组件化设计理念以及页面和组件在生命周期中的变化。
49 0