C#高性能大容量SOCKET并发(十):SocketAsyncEventArgs线程模型

简介: 原文:C#高性能大容量SOCKET并发(十):SocketAsyncEventArgs线程模型 线程模型 SocketAsyncEventArgs编程模式不支持设置同时工作线程个数,使用的NET的IO线程,由NET底层提供,这点和直接使用完成端口API编程不同。
原文: C#高性能大容量SOCKET并发(十):SocketAsyncEventArgs线程模型

线程模型

SocketAsyncEventArgs编程模式不支持设置同时工作线程个数,使用的NET的IO线程,由NET底层提供,这点和直接使用完成端口API编程不同。NET底层IO线程也是每个异步事件都是由不同的线程返回到Completed事件,因此在Completed事件需要对用户对象进行加锁,避免同一个用户对象同时触发两个Completed事件。

        void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs)
        {
            AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken;
            userToken.ActiveDateTime = DateTime.Now;
            try
            {                
                lock (userToken) //避免同一个userToken同时有多个线程操作
                {
                    if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive)
                        ProcessReceive(asyncEventArgs);
                    else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send)
                        ProcessSend(asyncEventArgs);
                    else
                        throw new ArgumentException("The last operation completed on the socket was not a receive or send");
                }   
            }
            catch (Exception E)
            {
                Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message);
                Program.Logger.Error(E.StackTrace);
            }                     
        }
使用ProceXP可以看到服务端在比较忙的时候,服务的线程会越多。在Completed事件加锁有好处是后续逻辑处理都是串行的,可以不用考虑线程同步。还有一个地方需要注意的是断开超时连接,由于超时连接会调用Shutdown函数来强行中断SOCKET,因此在守护线程调用时,也需要锁住userToken对象。

        public void DaemonThreadStart()
        {
            while (m_thread.IsAlive)
            {
                AsyncSocketUserToken[] userTokenArray = null;
                m_asyncSocketServer.AsyncSocketUserTokenList.CopyList(ref userTokenArray);
                for (int i = 0; i < userTokenArray.Length; i++)
                {
                    if (!m_thread.IsAlive)
                        break;
                    try
                    {
                        if ((DateTime.Now - userTokenArray[i].ActiveDateTime).Milliseconds > m_asyncSocketServer.SocketTimeOutMS) //超时Socket断开
                        {
                            lock (userTokenArray[i])
                            {
                                m_asyncSocketServer.CloseClientSocket(userTokenArray[i]);
                            }
                        }
                    }                    
                    catch (Exception E)
                    {
                        Program.Logger.ErrorFormat("Daemon thread check timeout socket error, message: {0}", E.Message);
                        Program.Logger.Error(E.StackTrace);
                    }
                }

                for (int i = 0; i < 60 * 1000 / 10; i++) //每分钟检测一次
                {
                    if (!m_thread.IsAlive)
                        break;
                    Thread.Sleep(10);
                }
            }
        }
在CloseClientSocket方法中,对m_asyncSocketUserTokenPool和m_asyncSocketUserTokenList进行处理的时候都有加锁,代码如下:

        public void CloseClientSocket(AsyncSocketUserToken userToken)
        {
            if (userToken.ConnectSocket == null)
                return;
            string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,
                userToken.ConnectSocket.RemoteEndPoint);
            Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo);
            try
            {
                userToken.ConnectSocket.Shutdown(SocketShutdown.Both);
            }
            catch (Exception E) 
            {
                Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message);
            }
            userToken.ConnectSocket.Close();
            userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源

            m_maxNumberAcceptedClients.Release();
            m_asyncSocketUserTokenPool.Push(userToken);
            m_asyncSocketUserTokenList.Remove(userToken);
        }

        public void Push(AsyncSocketUserToken item)
        {
            if (item == null)
            {
                throw new ArgumentException("Items added to a AsyncSocketUserToken cannot be null");
            }
            lock (m_pool)
            {
                m_pool.Push(item);
            }
        }

        public void Remove(AsyncSocketUserToken userToken)
        {
            lock (m_list)
            {
                m_list.Remove(userToken);
            }
        }

在有些性能要求更高的系统,特别是在一些C++写的完成端口中,会使用原子操作来代替锁,这样做的好处是不用进行系统内核和用户态切换,性能会高。不过技术比较偏门,不易维护,而且实际表现需要进行多方面测试,这类优化更建议优化业务逻辑,并尽量减少内存分配和释放。


DEMO下载地址:http://download.csdn.net/detail/sqldebug_fan/7467745
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

目录
相关文章
|
4月前
|
C# 开发者
C# 一分钟浅谈:Socket 编程基础
【10月更文挑战第7天】本文介绍了Socket编程的基础知识、基本操作及常见问题,通过C#代码示例详细展示了服务器端和客户端的Socket通信过程,包括创建、绑定、监听、连接、数据收发及关闭等步骤,帮助开发者掌握Socket编程的核心技术和注意事项。
146 3
C# 一分钟浅谈:Socket 编程基础
|
1月前
|
网络协议 C# 开发工具
C#中简单Socket编程
1. 先运行服务器代码。服务器将开始监听指定的IP和端口,等待客户端连接。 1. 然后运行客户端代码。客户端将连接到服务器并发送消息。 1. 服务器接收到消息后,将回应客户端,并在控制台上显示接收到的消息。 1. 客户端接收到服务器的回应消息,并在控制台上显示。
102 15
|
4月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
4月前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
4月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
42 1
|
5月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
131 20
剖析 Redis List 消息队列的三种消费线程模型
|
4月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
110 5
|
4月前
|
SQL 传感器 开发框架
今天我们聊聊C#的并发和并行
今天我们聊聊C#的并发和并行
94 1
|
4月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
76 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
5月前
|
存储 消息中间件 NoSQL
Redis的单线程设计之谜:高性能与简洁并存
Redis的单线程设计之谜:高性能与简洁并存
63 1