[C#]I/O完成端口的类定义和测试实例-阿里云开发者社区

开发者社区> 数据库> 正文

[C#]I/O完成端口的类定义和测试实例

简介:
整理者:郑昀@UltraPower 
日期:2005-04-13 

从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用 
System.Threading.NativeOverlapped:。 

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范: 

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1     先在主线程中调用CreateIoCompletionPort创建IOCP

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2     我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT1秒。

 

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息:       传输的字节数、完成键和OVERLAPPED结构的地址。

 

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

 

需要注意的是:

第一,   线程池的数目是有限制的,和CPU数目有关系。

第二,   IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三,   最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

 


测试代码: 
None.gif 
None.gif
using System; 
None.gif
using System.Threading;  // Included for the Thread.Sleep call 
None.gif
using Continuum.Threading; 
None.gif
using System.Runtime.InteropServices; 
None.gif 
None.gif
namespace IOCPDemo 
ExpandedBlockStart.gif

InBlock.gif    
//=============================================================================
ExpandedSubBlockStart.gif
    /// <summary> Sample class for the threading class </summary> 
InBlock.gif    public class UtilThreadingSample 
ExpandedSubBlockStart.gif    

InBlock.gif        
//*****************************************************************************    
ExpandedSubBlockStart.gif
        /// <summary> Test Method </summary> 
InBlock.gif        static void Main() 
ExpandedSubBlockStart.gif        

InBlock.gif            
// Create the MSSQL IOCP Thread Pool 
InBlock.gif
            IOCPThreadPool pThreadPool = new IOCPThreadPool(01020new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction)); 
InBlock.gif       
InBlock.gif            
//for(int i =1;i<10000;i++) 
ExpandedSubBlockStart.gif
            
InBlock.gif                pThreadPool.PostEvent(
1234); 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif       
InBlock.gif            Thread.Sleep(
100); 
InBlock.gif       
InBlock.gif            pThreadPool.Dispose(); 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif     
InBlock.gif        
//******************************************************************** 
ExpandedSubBlockStart.gif
        /// <summary> Function to be called by the IOCP thread pool.  Called when 
InBlock.gif        
///           a command is posted for processing by the SocketManager </summary> 
ExpandedSubBlockEnd.gif        
/// <param name="iValue"> The value provided by the thread posting the event </param>
 
InBlock.gif        static public void IOCPThreadFunction(int iValue) 
ExpandedSubBlockStart.gif        

InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif                Console.WriteLine(
"Value: {0}", iValue.ToString()); 
InBlock.gif                Thread.Sleep(
3000); 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif       
InBlock.gif            
catch (Exception pException) 
ExpandedSubBlockStart.gif            

InBlock.gif                Console.WriteLine(pException.Message); 
ExpandedSubBlockEnd.gif            }
 
ExpandedSubBlockEnd.gif        }
 
ExpandedSubBlockEnd.gif    }
 
InBlock.gif 
ExpandedBlockEnd.gif}
 
None.gif


类代码: 
None.gifusing System; 
None.gif
using System.Threading; 
None.gif
using System.Runtime.InteropServices; 
None.gif 
None.gif
namespace IOCPThreading 
ExpandedBlockStart.gif

InBlock.gif    [StructLayout(LayoutKind.Sequential, CharSet
=CharSet.Auto)] 
InBlock.gif 
InBlock.gif    
public sealed class IOCPThreadPool 
ExpandedSubBlockStart.gif    

InBlock.gif        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
InBlock.gif        
private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); 
InBlock.gif 
InBlock.gif        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
InBlock.gif        
private unsafe static extern Boolean CloseHandle(UInt32 hObject); 
InBlock.gif 
InBlock.gif        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
InBlock.gif        
private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped); 
InBlock.gif 
InBlock.gif        [DllImport(
"Kernel32", CharSet=CharSet.Auto)] 
InBlock.gif        
private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds); 
InBlock.gif 
InBlock.gif        
private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff
InBlock.gif        
private const UInt32 INIFINITE = 0xffffffff
InBlock.gif        
private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff
InBlock.gif        
public delegate void USER_FUNCTION(int iValue); 
InBlock.gif        
private UInt32 m_hHandle; 
ExpandedSubBlockStart.gif        
private UInt32 GetHandle get return m_hHandle; } set { m_hHandle = value; } } 
InBlock.gif 
InBlock.gif        
private Int32 m_uiMaxConcurrency; 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 GetMaxConcurrency get return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private Int32 m_iMinThreadsInPool; 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 GetMinThreadsInPool get return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } 
InBlock.gif 
InBlock.gif        
private Int32 m_iMaxThreadsInPool; 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 GetMaxThreadsInPool get return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private Object m_pCriticalSection; 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Object GetCriticalSection get return m_pCriticalSection; } set { m_pCriticalSection = value; } } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private USER_FUNCTION m_pfnUserFunction; 
InBlock.gif 
ExpandedSubBlockStart.gif        
private USER_FUNCTION GetUserFunction get return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private Boolean m_bDisposeFlag; 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Flag to indicate if the class is disposing </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Boolean IsDisposed get return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } 
InBlock.gif 
InBlock.gif        
private Int32 m_iCurThreadsInPool; 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: The current number of threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
public Int32 GetCurThreadsInPool get return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Increment current number of threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 IncCurThreadsInPool() return Interlocked.Increment(ref m_iCurThreadsInPool); } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Decrement current number of threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 DecCurThreadsInPool() return Interlocked.Decrement(ref m_iCurThreadsInPool); } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private Int32 m_iActThreadsInPool; 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: The current number of active threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
public Int32 GetActThreadsInPool get return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Increment current number of active threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 IncActThreadsInPool() return Interlocked.Increment(ref m_iActThreadsInPool); } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Decrement current number of active threads in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 DecActThreadsInPool() return Interlocked.Decrement(ref m_iActThreadsInPool); } 
InBlock.gif 
InBlock.gif 
InBlock.gif        
private Int32 m_iCurWorkInPool; 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: The current number of Work posted in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
public Int32 GetCurWorkInPool get return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Increment current number of Work posted in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 IncCurWorkInPool() return Interlocked.Increment(ref m_iCurWorkInPool); } 
InBlock.gif 
ExpandedSubBlockStart.gif        
/// <summary> SimType: Decrement current number of Work posted in the thread pool </summary> 
InBlock.gif 
ExpandedSubBlockStart.gif        
private Int32 DecCurWorkInPool() return Interlocked.Decrement(ref m_iCurWorkInPool); } 
InBlock.gif 
InBlock.gif        
public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction) 
ExpandedSubBlockStart.gif        

InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif                
// Set initial class state 
InBlock.gif
 
InBlock.gif                GetMaxConcurrency   
= iMaxConcurrency; 
InBlock.gif 
InBlock.gif                GetMinThreadsInPool 
= iMinThreadsInPool; 
InBlock.gif 
InBlock.gif                GetMaxThreadsInPool 
= iMaxThreadsInPool; 
InBlock.gif 
InBlock.gif                GetUserFunction     
= pfnUserFunction; 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Init the thread counters 
InBlock.gif
 
InBlock.gif                GetCurThreadsInPool 
= 0
InBlock.gif 
InBlock.gif                GetActThreadsInPool 
= 0
InBlock.gif 
InBlock.gif                GetCurWorkInPool    
= 0
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Initialize the Monitor Object 
InBlock.gif
 
InBlock.gif                GetCriticalSection 
= new Object(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Set the disposing flag to false 
InBlock.gif
 
InBlock.gif                IsDisposed 
= false
InBlock.gif 
InBlock.gif 
InBlock.gif                
unsafe 
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
// Create an IO Completion Port for Thread Pool use 
InBlock.gif
                    GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0null, (UInt32) GetMaxConcurrency); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Test to make sure the IO Completion Port was created 
InBlock.gif
 
InBlock.gif                
if (GetHandle == 0
InBlock.gif 
InBlock.gif                    
throw new Exception("Unable To Create IO Completion Port"); 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Allocate and start the Minimum number of threads specified 
InBlock.gif
 
InBlock.gif                Int32 iStartingCount 
= GetCurThreadsInPool; 
InBlock.gif 
InBlock.gif         
InBlock.gif 
InBlock.gif                ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
InBlock.gif 
InBlock.gif                
for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread) 
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
// Create a thread and start it 
InBlock.gif
 
InBlock.gif                    Thread thThread 
= new Thread(tsThread); 
InBlock.gif 
InBlock.gif                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
InBlock.gif 
InBlock.gif                    thThread.Start(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Increment the thread pool count 
InBlock.gif
 
InBlock.gif                    IncCurThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif 
InBlock.gif            
catch 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
throw new Exception("Unhandled Exception"); 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif 
InBlock.gif        
~IOCPThreadPool() 
ExpandedSubBlockStart.gif        

InBlock.gif 
InBlock.gif            
if (!IsDisposed) 
InBlock.gif 
InBlock.gif                Dispose(); 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif 
InBlock.gif        
public void Dispose() 
ExpandedSubBlockStart.gif        

InBlock.gif 
InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
// Flag that we are disposing this object 
InBlock.gif
 
InBlock.gif                IsDisposed 
= true
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Get the current number of threads in the pool 
InBlock.gif
 
InBlock.gif                Int32 iCurThreadsInPool 
= GetCurThreadsInPool; 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Shutdown all thread in the pool 
InBlock.gif
 
InBlock.gif                
for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread) 
ExpandedSubBlockStart.gif                

InBlock.gif                    
unsafe 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null); 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                
// Wait here until all the threads are gone 
InBlock.gif
 
InBlock.gif                
while (GetCurThreadsInPool != 0) Thread.Sleep(100); 
InBlock.gif 
InBlock.gif 
InBlock.gif                
unsafe 
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
// Close the IOCP Handle 
InBlock.gif
                    CloseHandle(GetHandle); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif            
catch 
ExpandedSubBlockStart.gif            

InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif        
private void IOCPFunction() 
ExpandedSubBlockStart.gif        

InBlock.gif            UInt32 uiNumberOfBytes; 
InBlock.gif 
InBlock.gif            Int32  iValue; 
InBlock.gif 
InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif                
while (true
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
unsafe 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        System.Threading.NativeOverlapped
* pOv; 
InBlock.gif 
InBlock.gif 
InBlock.gif                        
// Wait for an event 
InBlock.gif
 
InBlock.gif                        GetQueuedCompletionStatus(GetHandle, 
&uiNumberOfBytes, (UInt32*&iValue, &pOv, INIFINITE); 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif                    
// Decrement the number of events in queue 
InBlock.gif
 
InBlock.gif                    DecCurWorkInPool(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Was this thread told to shutdown 
InBlock.gif
 
InBlock.gif                    
if (iValue == SHUTDOWN_IOCPTHREAD) 
InBlock.gif 
InBlock.gif                        
break
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Increment the number of active threads 
InBlock.gif
 
InBlock.gif                    IncActThreadsInPool(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
try 
ExpandedSubBlockStart.gif                    

InBlock.gif                        
// Call the user function 
InBlock.gif
                        GetUserFunction(iValue); 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif                    
catch(Exception ex) 
ExpandedSubBlockStart.gif                    

InBlock.gif                        
throw ex; 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Get a lock 
InBlock.gif
 
InBlock.gif                    Monitor.Enter(GetCriticalSection); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
try 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
// If we have less than max threads currently in the pool 
InBlock.gif
 
InBlock.gif                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
ExpandedSubBlockStart.gif                        

InBlock.gif 
InBlock.gif                            
// Should we add a new thread to the pool 
InBlock.gif
 
InBlock.gif                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
ExpandedSubBlockStart.gif                            

InBlock.gif 
InBlock.gif                                
if (IsDisposed == false
ExpandedSubBlockStart.gif                                

InBlock.gif 
InBlock.gif                                    
// Create a thread and start it 
InBlock.gif
 
InBlock.gif                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
InBlock.gif 
InBlock.gif                                    Thread thThread 
= new Thread(tsThread); 
InBlock.gif 
InBlock.gif                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
InBlock.gif 
InBlock.gif                                    thThread.Start(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                                    
// Increment the thread pool count 
InBlock.gif
 
InBlock.gif                                    IncCurThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif                                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                        }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif                    
catch 
ExpandedSubBlockStart.gif                    

InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Relase the lock 
InBlock.gif
 
InBlock.gif                    Monitor.Exit(GetCriticalSection); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Increment the number of active threads 
InBlock.gif
 
InBlock.gif                    DecActThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif 
InBlock.gif            
catch(Exception ex) 
ExpandedSubBlockStart.gif            

InBlock.gif                
string str=ex.Message; 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif 
InBlock.gif            
// Decrement the thread pool count 
InBlock.gif
 
InBlock.gif            DecCurThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif 
InBlock.gif        
//public void PostEvent(Int32 iValue 
InBlock.gif
        public void PostEvent(int iValue) 
ExpandedSubBlockStart.gif        

InBlock.gif 
InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
// Only add work if we are not disposing 
InBlock.gif
 
InBlock.gif                
if (IsDisposed == false
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
unsafe 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
// Post an event into the IOCP Thread Pool 
InBlock.gif
 
InBlock.gif                        PostQueuedCompletionStatus(GetHandle, 
4, (UInt32*) iValue, null); 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Increment the number of item of work 
InBlock.gif
 
InBlock.gif                    IncCurWorkInPool(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Get a lock 
InBlock.gif
 
InBlock.gif                    Monitor.Enter(GetCriticalSection); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
try 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
// If we have less than max threads currently in the pool 
InBlock.gif
 
InBlock.gif                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
ExpandedSubBlockStart.gif                        

InBlock.gif 
InBlock.gif                            
// Should we add a new thread to the pool 
InBlock.gif
 
InBlock.gif                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
ExpandedSubBlockStart.gif                            

InBlock.gif 
InBlock.gif                                
if (IsDisposed == false
ExpandedSubBlockStart.gif                                

InBlock.gif 
InBlock.gif                                    
// Create a thread and start it 
InBlock.gif
 
InBlock.gif                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
InBlock.gif 
InBlock.gif                                    Thread thThread 
= new Thread(tsThread); 
InBlock.gif 
InBlock.gif                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
InBlock.gif 
InBlock.gif                                    thThread.Start(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                                    
// Increment the thread pool count 
InBlock.gif
 
InBlock.gif                                    IncCurThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif                                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                        }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
catch 
ExpandedSubBlockStart.gif                    

InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Release the lock 
InBlock.gif
 
InBlock.gif                    Monitor.Exit(GetCriticalSection); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif 
InBlock.gif            
catch (Exception e) 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
throw e; 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif 
InBlock.gif            
catch 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
throw new Exception("Unhandled Exception"); 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
   
InBlock.gif 
InBlock.gif        
public void PostEvent() 
ExpandedSubBlockStart.gif        

InBlock.gif 
InBlock.gif            
try 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
// Only add work if we are not disposing 
InBlock.gif
 
InBlock.gif                
if (IsDisposed == false
ExpandedSubBlockStart.gif                

InBlock.gif 
InBlock.gif                    
unsafe 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
// Post an event into the IOCP Thread Pool 
InBlock.gif
 
InBlock.gif                        PostQueuedCompletionStatus(GetHandle, 
0nullnull); 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Increment the number of item of work 
InBlock.gif
 
InBlock.gif                    IncCurWorkInPool(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Get a lock 
InBlock.gif
 
InBlock.gif                    Monitor.Enter(GetCriticalSection); 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
try 
InBlock.gif 
ExpandedSubBlockStart.gif                    

InBlock.gif 
InBlock.gif                        
// If we have less than max threads currently in the pool 
InBlock.gif
 
InBlock.gif                        
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
InBlock.gif 
ExpandedSubBlockStart.gif                        

InBlock.gif 
InBlock.gif                            
// Should we add a new thread to the pool 
InBlock.gif
 
InBlock.gif                            
if (GetActThreadsInPool == GetCurThreadsInPool) 
InBlock.gif 
ExpandedSubBlockStart.gif                            

InBlock.gif 
InBlock.gif                                
if (IsDisposed == false
InBlock.gif 
ExpandedSubBlockStart.gif                                

InBlock.gif 
InBlock.gif                                    
// Create a thread and start it 
InBlock.gif
 
InBlock.gif                                    ThreadStart tsThread 
= new ThreadStart(IOCPFunction); 
InBlock.gif 
InBlock.gif                                    Thread thThread 
= new Thread(tsThread); 
InBlock.gif 
InBlock.gif                                    thThread.Name 
= "IOCP " + thThread.GetHashCode(); 
InBlock.gif 
InBlock.gif                                    thThread.Start(); 
InBlock.gif 
InBlock.gif 
InBlock.gif                                    
// Increment the thread pool count 
InBlock.gif
 
InBlock.gif                                    IncCurThreadsInPool(); 
InBlock.gif 
ExpandedSubBlockEnd.gif                                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                        }
 
InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
catch 
InBlock.gif 
ExpandedSubBlockStart.gif                    

InBlock.gif 
ExpandedSubBlockEnd.gif                    }
 
InBlock.gif 
InBlock.gif 
InBlock.gif                    
// Release the lock 
InBlock.gif
 
InBlock.gif                    Monitor.Exit(GetCriticalSection); 
InBlock.gif 
ExpandedSubBlockEnd.gif                }
 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
InBlock.gif            
catch 
InBlock.gif 
ExpandedSubBlockStart.gif            

InBlock.gif 
InBlock.gif                
throw new Exception("Unhandled Exception"); 
InBlock.gif 
ExpandedSubBlockEnd.gif            }
 
InBlock.gif 
ExpandedSubBlockEnd.gif        }
 
InBlock.gif 
ExpandedSubBlockEnd.gif    }
 
InBlock.gif 
ExpandedBlockEnd.gif}
 
None.gif

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章