[C#]I/O完成端口的类定义和测试实例

简介:
整理者:郑昀@UltraPower 
日期:2005-04-13 

从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用 
System.Threading.NativeOverlapped:。 

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范: 

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1     先在主线程中调用CreateIoCompletionPort创建IOCP

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2     我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT1秒。

 

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息:       传输的字节数、完成键和OVERLAPPED结构的地址。

 

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

 

需要注意的是:

第一,   线程池的数目是有限制的,和CPU数目有关系。

第二,   IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三,   最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

 


测试代码: 
 
using  System; 
using  System.Threading;   //  Included for the Thread.Sleep call  
using  Continuum.Threading; 
using  System.Runtime.InteropServices; 
 
namespace  IOCPDemo 

    
//=============================================================================
    /// <summary> Sample class for the threading class </summary> 
    public class UtilThreadingSample 
    

        
//*****************************************************************************    
        /// <summary> Test Method </summary> 
        static void Main() 
        

            
// Create the MSSQL IOCP Thread Pool 
            IOCPThreadPool pThreadPool = new IOCPThreadPool(01020new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction)); 
       
            
//for(int i =1;i<10000;i++) 
            
                pThreadPool.PostEvent(
1234); 
            }
 
       
            Thread.Sleep(
100); 
       
            pThreadPool.Dispose(); 
        }
 
     
        
//******************************************************************** 
        /// <summary> Function to be called by the IOCP thread pool.  Called when 
        
///           a command is posted for processing by the SocketManager </summary> 
        
/// <param name="iValue"> The value provided by the thread posting the event </param>
 
        static public void IOCPThreadFunction(int iValue) 
        

            
try 
            

                Console.WriteLine(
"Value: {0}", iValue.ToString()); 
                Thread.Sleep(
3000); 
            }
 
       
            
catch (Exception pException) 
            

                Console.WriteLine(pException.Message); 
            }
 
        }
 
    }
 
 
}
 


类代码: 
using  System; 
using  System.Threading; 
using  System.Runtime.InteropServices; 
 
namespace  IOCPThreading 

    [StructLayout(LayoutKind.Sequential, CharSet
=CharSet.Auto)] 
 
    
public sealed class IOCPThreadPool 
    

        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
        
private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); 
 
        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
        
private unsafe static extern Boolean CloseHandle(UInt32 hObject); 
 
        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
        
private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped); 
 
        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
        
private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds); 
 
        
private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff
        
private const UInt32 INIFINITE = 0xffffffff
        
private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff
        
public delegate void USER_FUNCTION(int iValue); 
        
private UInt32 m_hHandle; 
        
private UInt32 GetHandle get return m_hHandle; } set { m_hHandle = value; } } 
 
        
private Int32 m_uiMaxConcurrency; 
 
        
private Int32 GetMaxConcurrency get return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } 
 
 
        
private Int32 m_iMinThreadsInPool; 
 
        
private Int32 GetMinThreadsInPool get return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } 
 
        
private Int32 m_iMaxThreadsInPool; 
 
        
private Int32 GetMaxThreadsInPool get return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } 
 
 
        
private Object m_pCriticalSection; 
 
        
private Object GetCriticalSection get return m_pCriticalSection; } set { m_pCriticalSection = value; } } 
 
 
        
private USER_FUNCTION m_pfnUserFunction; 
 
        
private USER_FUNCTION GetUserFunction get return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } 
 
 
        
private Boolean m_bDisposeFlag; 
 
        
/// <summary> SimType: Flag to indicate if the class is disposing </summary> 
 
        
private Boolean IsDisposed get return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } 
 
        
private Int32 m_iCurThreadsInPool; 
 
        
/// <summary> SimType: The current number of threads in the thread pool </summary> 
 
        
public Int32 GetCurThreadsInPool get return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } } 
 
        
/// <summary> SimType: Increment current number of threads in the thread pool </summary> 
 
        
private Int32 IncCurThreadsInPool() return Interlocked.Increment(ref m_iCurThreadsInPool); } 
 
        
/// <summary> SimType: Decrement current number of threads in the thread pool </summary> 
 
        
private Int32 DecCurThreadsInPool() return Interlocked.Decrement(ref m_iCurThreadsInPool); } 
 
 
        
private Int32 m_iActThreadsInPool; 
 
        
/// <summary> SimType: The current number of active threads in the thread pool </summary> 
 
        
public Int32 GetActThreadsInPool get return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } } 
 
        
/// <summary> SimType: Increment current number of active threads in the thread pool </summary> 
 
        
private Int32 IncActThreadsInPool() return Interlocked.Increment(ref m_iActThreadsInPool); } 
 
        
/// <summary> SimType: Decrement current number of active threads in the thread pool </summary> 
 
        
private Int32 DecActThreadsInPool() return Interlocked.Decrement(ref m_iActThreadsInPool); } 
 
 
        
private Int32 m_iCurWorkInPool; 
 
        
/// <summary> SimType: The current number of Work posted in the thread pool </summary> 
 
        
public Int32 GetCurWorkInPool get return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } } 
 
        
/// <summary> SimType: Increment current number of Work posted in the thread pool </summary> 
 
        
private Int32 IncCurWorkInPool() return Interlocked.Increment(ref m_iCurWorkInPool); } 
 
        
/// <summary> SimType: Decrement current number of Work posted in the thread pool </summary> 
 
        
private Int32 DecCurWorkInPool() return Interlocked.Decrement(ref m_iCurWorkInPool); } 
 
        
public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction) 
        

            
try 
            

                
// Set initial class state 
 
                GetMaxConcurrency   
= iMaxConcurrency; 
 
                GetMinThreadsInPool 
= iMinThreadsInPool; 
 
                GetMaxThreadsInPool 
= iMaxThreadsInPool; 
 
                GetUserFunction     
= pfnUserFunction; 
 
 
                
// Init the thread counters 
 
                GetCurThreadsInPool 
= 0
 
                GetActThreadsInPool 
= 0
 
                GetCurWorkInPool    
= 0
 
 
                
// Initialize the Monitor Object 
 
                GetCriticalSection 
= new Object(); 
 
 
                
// Set the disposing flag to false 
 
                IsDisposed 
= false
 
 
                
unsafe 
                

 
                    
// Create an IO Completion Port for Thread Pool use 
                    GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0null, (UInt32) GetMaxConcurrency); 
 
                }
 
 
 
                
// Test to make sure the IO Completion Port was created 
 
                
if (GetHandle == 0
 
                    
throw new Exception("Unable To Create IO Completion Port"); 
 
 
                
// Allocate and start the Minimum number of threads specified 
 
                Int32 iStartingCount 
= GetCurThreadsInPool; 
 
         
 
                ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
 
                
for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread) 
                

 
                    
// Create a thread and start it 
 
                    Thread thThread 
= new Thread(tsThread); 
 
                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
 
                    thThread.Start(); 
 
 
                    
// Increment the thread pool count 
 
                    IncCurThreadsInPool(); 
 
                }
 
 
            }
 
 
 
            
catch 
            

 
                
throw new Exception("Unhandled Exception"); 
 
            }
 
 
        }
 
 
        
~IOCPThreadPool() 
        

 
            
if (!IsDisposed) 
 
                Dispose(); 
 
        }
 
 
        
public void Dispose() 
        

 
            
try 
            

 
                
// Flag that we are disposing this object 
 
                IsDisposed 
= true
 
 
                
// Get the current number of threads in the pool 
 
                Int32 iCurThreadsInPool 
= GetCurThreadsInPool; 
 
 
                
// Shutdown all thread in the pool 
 
                
for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread) 
                

                    
unsafe 
                    

 
                        
bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null); 
 
                    }
 
 
                }
 
 
 
                
// Wait here until all the threads are gone 
 
                
while (GetCurThreadsInPool != 0) Thread.Sleep(100); 
 
 
                
unsafe 
                

 
                    
// Close the IOCP Handle 
                    CloseHandle(GetHandle); 
 
                }
 
 
            }
 
 
            
catch 
            

 
            }
 
 
        }
 
        
private void IOCPFunction() 
        

            UInt32 uiNumberOfBytes; 
 
            Int32  iValue; 
 
            
try 
            

                
while (true
                

 
                    
unsafe 
                    

 
                        System.Threading.NativeOverlapped
* pOv; 
 
 
                        
// Wait for an event 
 
                        GetQueuedCompletionStatus(GetHandle, 
&uiNumberOfBytes, (UInt32*&iValue, &pOv, INIFINITE); 
                    }
 
 
                    
// Decrement the number of events in queue 
 
                    DecCurWorkInPool(); 
 
 
                    
// Was this thread told to shutdown 
 
                    
if (iValue == SHUTDOWN_IOCPTHREAD) 
 
                        
break
 
 
                    
// Increment the number of active threads 
 
                    IncActThreadsInPool(); 
 
 
                    
try 
                    

                        
// Call the user function 
                        GetUserFunction(iValue); 
 
                    }
 
 
                    
catch(Exception ex) 
                    

                        
throw ex; 
                    }
 
 
 
                    
// Get a lock 
 
                    Monitor.Enter(GetCriticalSection); 
 
 
                    
try 
                    

 
                        
// If we have less than max threads currently in the pool 
 
                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
                        

 
                            
// Should we add a new thread to the pool 
 
                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
                            

 
                                
if (IsDisposed == false
                                

 
                                    
// Create a thread and start it 
 
                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
 
                                    Thread thThread 
= new Thread(tsThread); 
 
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
 
                                    thThread.Start(); 
 
 
                                    
// Increment the thread pool count 
 
                                    IncCurThreadsInPool(); 
 
                                }
 
 
                            }
 
 
                        }
 
 
                    }
 
 
                    
catch 
                    

 
                    }
 
 
 
                    
// Relase the lock 
 
                    Monitor.Exit(GetCriticalSection); 
 
 
                    
// Increment the number of active threads 
 
                    DecActThreadsInPool(); 
 
                }
 
 
            }
 
 
 
            
catch(Exception ex) 
            

                
string str=ex.Message; 
 
            }
 
 
 
            
// Decrement the thread pool count 
 
            DecCurThreadsInPool(); 
 
        }
 
 
        
//public void PostEvent(Int32 iValue 
        public void PostEvent(int iValue) 
        

 
            
try 
            

 
                
// Only add work if we are not disposing 
 
                
if (IsDisposed == false
                

 
                    
unsafe 
                    

 
                        
// Post an event into the IOCP Thread Pool 
 
                        PostQueuedCompletionStatus(GetHandle, 
4, (UInt32*) iValue, null); 
 
                    }
 
 
 
                    
// Increment the number of item of work 
 
                    IncCurWorkInPool(); 
 
 
                    
// Get a lock 
 
                    Monitor.Enter(GetCriticalSection); 
 
 
                    
try 
                    

 
                        
// If we have less than max threads currently in the pool 
 
                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
                        

 
                            
// Should we add a new thread to the pool 
 
                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
                            

 
                                
if (IsDisposed == false
                                

 
                                    
// Create a thread and start it 
 
                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
 
                                    Thread thThread 
= new Thread(tsThread); 
 
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
 
                                    thThread.Start(); 
 
 
                                    
// Increment the thread pool count 
 
                                    IncCurThreadsInPool(); 
 
                                }
 
 
                            }
 
 
                        }
 
 
                    }
 
 
 
                    
catch 
                    

 
                    }
 
 
 
                    
// Release the lock 
 
                    Monitor.Exit(GetCriticalSection); 
 
                }
 
 
            }
 
 
 
            
catch (Exception e) 
            

 
                
throw e; 
 
            }
 
 
 
            
catch 
            

 
                
throw new Exception("Unhandled Exception"); 
 
            }
 
 
        }
   
 
        
public void PostEvent() 
        

 
            
try 
            

 
                
// Only add work if we are not disposing 
 
                
if (IsDisposed == false
                

 
                    
unsafe 
                    

 
                        
// Post an event into the IOCP Thread Pool 
 
                        PostQueuedCompletionStatus(GetHandle, 
0nullnull); 
 
                    }
 
 
 
                    
// Increment the number of item of work 
 
                    IncCurWorkInPool(); 
 
 
                    
// Get a lock 
 
                    Monitor.Enter(GetCriticalSection); 
 
 
                    
try 
 
                    

 
                        
// If we have less than max threads currently in the pool 
 
                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
 
                        

 
                            
// Should we add a new thread to the pool 
 
                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
 
                            

 
                                
if (IsDisposed == false
 
                                

 
                                    
// Create a thread and start it 
 
                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
 
                                    Thread thThread 
= new Thread(tsThread); 
 
                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
 
                                    thThread.Start(); 
 
 
                                    
// Increment the thread pool count 
 
                                    IncCurThreadsInPool(); 
 
                                }
 
 
                            }
 
 
                        }
 
 
                    }
 
 
 
                    
catch 
 
                    

 
                    }
 
 
 
                    
// Release the lock 
 
                    Monitor.Exit(GetCriticalSection); 
 
                }
 
 
            }
 
 
            
catch 
 
            

 
                
throw new Exception("Unhandled Exception"); 
 
            }
 
 
        }
 
 
    }
 
 
}
 
目录
相关文章
|
4月前
|
开发框架 .NET C#
C#|.net core 基础 - 删除字符串最后一个字符的七大类N种实现方式
【10月更文挑战第9天】在 C#/.NET Core 中,有多种方法可以删除字符串的最后一个字符,包括使用 `Substring` 方法、`Remove` 方法、`ToCharArray` 与 `Array.Copy`、`StringBuilder`、正则表达式、循环遍历字符数组以及使用 LINQ 的 `SkipLast` 方法。
127 8
|
2月前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
38 11
|
2月前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
55 10
|
2月前
|
存储 安全 编译器
学懂C#编程:属性(Property)的概念定义及使用详解
通过深入理解和使用C#的属性,可以编写更清晰、简洁和高效的代码,为开发高质量的应用程序奠定基础。
116 12
|
4月前
|
测试技术 C# 数据库
C# 单元测试框架 NUnit 一分钟浅谈
【10月更文挑战第17天】单元测试是软件开发中重要的质量保证手段,NUnit 是一个广泛使用的 .NET 单元测试框架。本文从基础到进阶介绍了 NUnit 的使用方法,包括安装、基本用法、参数化测试、异步测试等,并探讨了常见问题和易错点,旨在帮助开发者有效利用单元测试提高代码质量和开发效率。
210 64
|
2月前
|
算法 Java 测试技术
Benchmark.NET:让 C# 测试程序性能变得既酷又简单
Benchmark.NET是一款专为 .NET 平台设计的性能基准测试框架,它可以帮助你测量代码的执行时间、内存使用情况等性能指标。它就像是你代码的 "健身教练",帮助你找到瓶颈,优化性能,让你的应用跑得更快、更稳!希望这个小教程能让你在追求高性能的路上越走越远,享受编程带来的无限乐趣!
155 13
|
3月前
|
数据采集 自然语言处理 数据库
深入体验阿里云通义灵码:测试与实例展示
阿里云通义灵码是一款强大的代码生成工具,支持自然语言描述需求,快速生成高质量代码。它在测试、代码质量和用户体验方面表现出色,能够高效地生成 Python 和 Java 等语言的代码,助力开发者提升开发效率和代码质量。无论是新手还是资深开发者,都能从中受益匪浅。
深入体验阿里云通义灵码:测试与实例展示
|
3月前
|
传感器 人工智能 物联网
C 语言在计算机科学中尤其在硬件交互方面占据重要地位。本文探讨了 C 语言与硬件交互的主要方法,包括直接访问硬件寄存器、中断处理、I/O 端口操作、内存映射 I/O 和设备驱动程序开发
C 语言在计算机科学中尤其在硬件交互方面占据重要地位。本文探讨了 C 语言与硬件交互的主要方法,包括直接访问硬件寄存器、中断处理、I/O 端口操作、内存映射 I/O 和设备驱动程序开发,以及面临的挑战和未来趋势,旨在帮助读者深入了解并掌握这些关键技术。
88 6
|
4月前
|
机器学习/深度学习 JSON 算法
实例分割笔记(一): 使用YOLOv5-Seg对图像进行分割检测完整版(从自定义数据集到测试验证的完整流程)
本文详细介绍了使用YOLOv5-Seg模型进行图像分割的完整流程,包括图像分割的基础知识、YOLOv5-Seg模型的特点、环境搭建、数据集准备、模型训练、验证、测试以及评价指标。通过实例代码,指导读者从自定义数据集开始,直至模型的测试验证,适合深度学习领域的研究者和开发者参考。
1493 3
实例分割笔记(一): 使用YOLOv5-Seg对图像进行分割检测完整版(从自定义数据集到测试验证的完整流程)
|
3月前
|
缓存 监控 数据挖掘
C# 一分钟浅谈:性能测试与压力测试
【10月更文挑战第20天】本文介绍了性能测试和压力测试的基础概念、目的、方法及常见问题与解决策略。性能测试关注系统在正常条件下的响应时间和资源利用率,而压力测试则在超出正常条件的情况下测试系统的极限和潜在瓶颈。文章通过具体的C#代码示例,详细探讨了忽视预热阶段、不合理测试数据和缺乏详细监控等常见问题及其解决方案,并提供了如何避免这些问题的建议。
86 7

热门文章

最新文章