Handling multiple pending socket read and write operations

简介: The following source was built using Visual Studio 6.0 SP5 and Visual Studio .

The following source was built using Visual Studio 6.0 SP5 and Visual Studio .Net. You need to have a version of the Microsoft Platform SDK installed

Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It's only worth profiling the release builds.

JBSocketServer6.zip - a large packet echo server with multiple pending reads and writes - 111 Kb

JBSocketShutdown.zip - an off switch for the servers - 42 Kb

Overview

"How do you handle the problem of multiple pending WSARecv() calls?" is a common question on the Winsock news groups. It seems that everyone knows that it's often a good idea to have more than one outstanding read waiting on a socket and everyone's equally aware that sometimes code doesn't work right when you do that. This article explains the potential problems with multiple pending recvs and provides a solution within the reusable server framework that we've been developing over the last few articles.

That's out of order

There is a subtle issue when using IO completion ports with multiple threads. Although operations using the IO completion port will always complete in the order that they were submitted, thread scheduling issues may mean that the actual work associated with the completion is processed in an undefined order. For example, if we were to submit three WSARecv requests on a socket then they are guaranteed to complete in the order that we submitted them, however if we have 2 threads servicing the IO completion port two of the completions could be being processed simultaneously. If the thread processing the 'first' WSARecv completion is interrupted the second may be completely processed before the first. This is even more likely to occur on machines with multiple processors where the two threads may really be executing simultaneously, but it's possible on single processor boxes too. As always, this is the kind of subtle problem that probably wont show its face until you release the software to production...

The example above is easy to avoid, simply don't have multiple WSARecv requests outstanding on a single socket. This is what we have done so far in the example servers developed in the previous articles. However this reduces performance, it's always more performant to have a receive pending when the data actually arrives on the wire than it is to post a receive after the data has already arrived. Having multiple WSARecv calls outstanding ensures that there's always a call pending. What's more, the problem isn't limited to having multiple WSARecvs. In our server framework we marshal all socket IO calls from the user's threads into our IO thread pool using the IO completion port. This means that there is the potential for a user thread to issue multiple consecutive writes to the socket and for them to be executed in an undefined order.

In our example servers so far, code like this:

Collapse | Copy Code

   void CSocketServer::ReadCompleted(

      Socket *pSocket,

      CIOBuffer *pBuffer)

   {

      // do stuff...

 

      pSocket->Write(pBuffer);     // echo the command

      pSocket->Write(pResponse1);  // send part 1 response

      pSocket->Write(pResponse2);  // send part 2 response

   }

is potentially unsafe because the writes don't occur synchronously on the user's thread, they are posted to the IO completion port and occur in the IO thread pool.

Preserving the order of IO completion operations is relatively straight forward. As you'll remember, the overlapped structure passed to all calls that use IO completion ports represents 'per call' data. We can, and do, extend the overlapped structure to include our own 'per call' data by using the CIOBuffer class. If we add a sequence number to the CIOBuffer we can set the sequence number to the 'next' value in the user's thread and then make sure we process the buffers in order in the IO thread pool. This concept applies to any IO completion port operation and each distinct operation requires its own sequence number. For our server framework that means that our Socket class must now maintain independent sequence numbers for read and write requests.

The sequence number management code inside the Socket's Write method could be something like this:

Collapse | Copy Code

   pBuffer->SetSequenceNumber(m_writeSequenceNumber++);

To ensure that the sequence numbers actually represent the order that the operations are submitted requires that the setting of the sequence number and the submission of the operation are an atomic operation. For our socket writes this isn't a problem as we only guarentee the order of writes that are performed on a single thread, for socket reads we need to ensure that the allocation of the sequence number and call to WSARecv() occur without another thread having a chance to perform read at the same time. This involves using a critical section to lock access to the socket during the sequence number allocation and WSARecv() call. Failure to lock in this area can lead to the actual order that the WSARecv() calls are made failing to match the ordering of the sequence numbers allocated.

Orderly processing

The code to ensure that the IO completions are handled in order is a little more complex. For each distinct IO operation we need to keep track of the next sequence number that we can process. When a call to GetQueuedCompletionStatus() returns we need to compare the sequence number in the request with the next sequence number that we can process. If these numbers match then we can process the request. If they don't then the request cannot be processed at this time. If an IO operation cannot be processed it should be stored for later processing. The storage of the out of sequence request needs to be keyed on the sequence number. When an IO thread finds that it can't process the current request it should add the current request to the store and see if there's a request in the store that can be processed. When a request is processed the last thing that the IO thread should do is atomically increment the value representing the next sequence number to process and check to see if there's an IO request in the store that can be processed.

The above strategy handles the situation where multiple IO requests complete concurrently. Only one thread can be processing an IO request that meets the criteria of being the next one to process, all other threads will simply add their requests to the store. When the thread that's processing a request finishes processing it can check to see if there are other requests in the store that can now be processed. If a thread needs to store its IO request then it can do so and then check for a request that can be processed in an atomic operation.

It's actually more complex to read about than it is to look at, the code to process an operation in order might look like this:

Collapse | Copy Code

   pBuffer = pSocket->m_outOfSequenceWrites.GetNext(pBuffer);

 

   while(pBuffer)

   {

      DWORD dwFlags = 0;

      DWORD dwSendNumBytes = 0;

 

      if (SOCKET_ERROR == ::WSASend(

         pSocket->m_socket,

         pBuffer->GetWSABUF(),

         1,

         &dwSendNumBytes,

         dwFlags,

         pBuffer,

         NULL))

      {

         // handle errors etc.

      }

 

      pBuffer = pSocket->m_outOfSequenceWrites.ProcessAndGetNext();

   }

The store itself needs to map sequence numbers to CIOBuffers. The obvious choice of data structure is a std::map<> though your performance requirements and profiling may dictate a different choice. GetNext() takes a buffer, compares its sequence number with the next one we can process and either returns the buffer or adds the buffer to the map and then checks the map to see if the first buffer in the map is the one we can process. Remember that the map stores its elements in order of their keys and that we're using the sequence number as the key, so m_list.begin() refers to the element in the map with the lowest sequence number. If this function returns null then we're still waiting for the 'next' buffer to arrive.

Collapse | Copy Code

   CIOBuffer *CIOBuffer::InOrderBufferList::GetNext(

      CIOBuffer *pBuffer)

   {

      CCriticalSection::Owner lock(m_criticalSection);

 

      if (m_next == pBuffer->GetSequenceNumber())

      {

         return pBuffer;

      }

 

      BS::value_type value(pBuffer->GetSequenceNumber(), pBuffer));

 

      std::pair<BS::iterator, bool> result = m_list.insert(value);

 

      if (result.second == false)

      {

         // handle error, element already in map

      }

 

      CIOBuffer *pNext = 0;

 

      BufferSequence::iterator it;

 

      it = m_list.begin();

 

      if (it != m_list.end())

      {

         if (it->first == m_next)

         {

            pNext = it->second;

 

            m_list.erase(it);

         }

      }

 

      return pNext;

   }

After processing a buffer the thread can check to see if there's another buffer that it can handle. It needs to increase the last processed value and perform the check atomically, hence the locking.

Collapse | Copy Code

   CIOBuffer *CIOBuffer::InOrderBufferList::ProcessAndGetNext()

   {

      CCriticalSection::Owner lock(m_criticalSection);

 

      ::InterlockedIncrement(&m_next);

 

      CIOBuffer *pNext = 0;

 

      BufferSequence::iterator it;

 

      it = m_list.begin();

 

      if (it != m_list.end())

      {

         if (it->first == m_next)

         {

            pNext = it->second;

 

            m_list.erase(it);

         }

      }

 

      return pNext;

   }

Handling reads

If the CIOBuffer used by every write that occurs contains a sequence number then similar code could be used to ensure that completed read requests are processed in the correct order. However, there's little point in this code being placed in the server framework as different users of the framework may require different functionality. The CSocketServer derived class could use the CIOBuffer::InOrderBufferList class to maintain processing order or it could simply dispatch the read completions to another IO completion port to pass them across to a business logic thread pool. In this case it's the code in the business logic thread pool that actually processes the data and the order should be maintained there. It may even need do both, ensuring packet order in the CSocketServer class itself so that it can successfully break the byte stream into messages and then dispatching the messages to the business logic thread pool and ensuring that these complete messages are also processed in the correct order.

Locking granularity

Each Socket must now keep track of independent read and write sequence numbers and maintain a map of out of sequence write requests. Manipulation of the map and associated next sequence number counter must be protected. We use a critical section to protect this code. Be aware that allocating a critical section for each Socket connection is potentially resource intensive. Instead we could choose to trade locking granularity for performance. The CSocketServer class already has a critical section that it uses to protect its lists of Sockets, we could pass a reference to this critical section to each Socket rather than have them create their own critical section. The problem with doing this is that we serialise every Socket's map access. This work performed inside the critical section is small, but a better solution might be to create a critical section for every X sockets where X is a value that is determined by profiling your application.

Only paying for what you use

Including sequence numbers in all buffers used for sending and receiving and ensuring the writes are processed in order adds a little overhead to the work of the IO threads. If you are sure that your server doesn't require this functionality, perhaps because you know that due to your protocol design there will only ever be a single read or write request pending, you can opt not to include this functionality by passing false as the useSequenceNumbers flag in the CSocketServer's constructor. Enabling read or write sequence numbers independently is left as an exercise for the reader.

An example

To demonstrate the concept of ensuring the ordering of multiple reads and writes we've come up with a rather contrived example. The packet echo server that we developed in the previous article has been changed as follows:

It now does its work in a business logic thread pool so that we can demonstrate maintaining receive order in both the socket server and the business logic thread pool when the socket server's worker thread isn't doing the processing itself.

It works with larger packets; we use a two byte packet header rather than a one byte header. This two byte header represents the length of the packet using the following format: packetLength = byte1 + (byte2 * 256). The length of the packet includes the two byte header.

When the client initially connects it posts a configurable number of reads. As each read completes it posts a new read so that it maintains the number of outstanding reads.

It processes the reads in order, and due to the fact that we now have multiple reads outstanding, CSocketServer::ProcessDataStream() has changed so that when more data is required we don't simply reissue a read to read more data into the same buffer.

It echoes the packet back to the client in pieces by issuing multiple write requests.

The large packet echo server is available for download here in SocketServer6.zip. Testing using telnet is possible, though more complex, you may find it easier to use the test harness that we develop here to test it. As with the previous examples, the server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available here, provides an off switch for the server.

Although both the server and thread pool classes are configurable as to whether they use sequence numbers to maintain packet order these settings can only be set in one way for the server to actually work in the way that the test harness expects. All packet ordering flags must be set to true. The purpose of the flags is so that you can turn off the various sequencing required and see the effect on the test. It's not intended that the server can run reliably in any other configuration.

Collapse | Copy Code

CThreadPool pool(

   5,                    // initial number of threads to create

   5,                    // minimum number of threads to keep in the pool

   10,                   // maximum number of threads in the pool

   5,                    // maximum number of "dormant" threads

   5000,                 // pool maintenance period (millis)

   100,                  // dispatch timeout (millis)

   10000,                // dispatch timeout for when pool is at max threads

   20,                   // (1) number of reads to post

   true,                 // (2) maintain packet order with sequence numbers

   true);                // (3) echo packets with multiple writes

  

pool.Start();

 

CSocketServer server(

   INADDR_ANY,           // address to listen on

   5001,                 // port to listen on

   10,                   // max number of sockets to keep in the pool

   10,                   // max number of buffers to keep in the pool

   1024,                 // buffer size

   pool,

   65536,                // max message size

   true,                 // (4) maintain read packet order with sequence numbers

   true,                 // (5) maintain write packet order with sequence numbers

   true);                // (6) issue a new read before we've completely processed

                         // this one

The configuration flags can be adjusted to witness the following effects:

The configuration shown above ensures that packets into the read completion method are processed in sequence - this maintains the validity of the incoming packet data; Packets into the worker thread are maintained in sequence - this maintains the order of echoing the actual packets; and the sequence of write calls is maintained - this maintains the validity of the outgoing packet data.

If the number of reads to post (1) is reduced to 1 then there is no need to maintain the read completion sequencing ((4) can be set to false) as long as read completion method doesn't issue another read until it has completed the processing of the current one ((6) should be set to false).

If the business logic thread pool doesn't attempt to maintain packet ordering ((2) set to false) then the test will likely report sequence number mismatches - as the packets are echoed out of sequence, and response != message errors as multiple threads in the business logic thread pool attempt to write fragments of the message to the socket in an unsynchronised manner and thus interleave sections of different messages.

If (2) is left set to false but (3) is also set to false then the test will only fail with sequence number mismatches as the threads are now echoing their packets in a single write so the data in the individual packets cant be corrupted by being interleaved with sections of other packets.

Unfortunately it's currently impossible to simply turn off write packet ordering at the socket server (option (5)) as doing so also turns off read packet sequencing and so makes it impossible to get valid data to the business logic thread so that it can echo it and we can witness the writes being performed out of sequence. If you're interested in seeing this in action then you can hack at the socket server code.

Revision history

15th July 2002 - Initial revision.

12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out. Derived class can receive connection reset and connection error notifications. Socket provides a means to determine if send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values. General code cleaning and lint issues. Adjusted the code and article so that each socket has its own critical section and the resource utilisation optimisation is suggested, rather than imposed. Fixed a bug whereby the critical section that is used to protect the per socket data was owned by the worker thread rather than the per socket data.

Other articles in the series

A reusable socket server class

Business logic processing in a socket server

Speeding up socket server connections with AcceptEx

Handling multiple pending socket read and write operations

Testing socket servers with C# and .Net

A high performance TCP/IP socket server COM component for VB

目录
相关文章
|
8月前
|
分布式计算 Linux Spark
【已解决】Caused by: java.net.SocketException: Connection reset by peer: socket write error
【已解决】Caused by: java.net.SocketException: Connection reset by peer: socket write error
248 0
|
前端开发 JavaScript Java
惊喜!竟然发现了产生socket write error的原因
惊喜!竟然发现了产生socket write error的原因
349 0
|
测试技术
testNG java.net.SocketException: Software caused connection abort: socket write error
执行用例报错,提示 java.net.SocketException: Software caused connection abort: socket write error java.
1617 0
|
3月前
|
网络协议 测试技术 网络安全
Python编程-Socket网络编程
Python编程-Socket网络编程
35 0
|
6月前
|
网络协议 开发者 Python
深度探索Python Socket编程:从理论到实践,进阶篇带你领略网络编程的魅力!
【7月更文挑战第25天】在网络编程中, Python Socket编程因灵活性强而广受青睐。本文采用问答形式深入探讨其进阶技巧。**问题一**: Socket编程基于TCP/IP,通过创建Socket对象实现通信,支持客户端和服务器间的数据交换。**问题二**: 提升并发处理能力的方法包括多线程(适用于I/O密集型任务)、多进程(绕过GIL限制)和异步IO(asyncio)。**问题三**: 提供了一个使用asyncio库实现的异步Socket服务器示例,展示如何接收及响应客户端消息。通过这些内容,希望能激发读者对网络编程的兴趣并引导进一步探索。
75 4
|
6月前
|
开发者 Python
Python Socket编程:不只是基础,更有进阶秘籍,让你的网络应用飞起来!
【7月更文挑战第25天】在网络应用蓬勃发展的数字时代,Python凭借其简洁的语法和强大的库支持成为开发高效应用的首选。本文通过实时聊天室案例,介绍了Python Socket编程的基础与进阶技巧,包括服务器与客户端的建立、数据交换等基础篇内容,以及使用多线程和异步IO提升性能的进阶篇。基础示例展示了服务器端监听连接请求、接收转发消息,客户端连接服务器并收发消息的过程。进阶部分讨论了如何利用Python的`threading`模块和`asyncio`库来处理多客户端连接,提高应用的并发处理能力和响应速度。掌握这些技能,能使开发者在网络编程领域更加游刃有余,构建出高性能的应用程序。
42 3
|
6月前
|
网络协议 Python
网络世界的建筑师:Python Socket编程基础与进阶,构建你的网络帝国!
【7月更文挑战第26天】在网络的数字宇宙中,Python Socket编程是开启网络世界大门的钥匙。本指南将引领你从基础到实战,成为网络世界的建筑师。
73 2
|
6月前
|
网络协议 程序员 视频直播
|
6月前
|
消息中间件 网络协议 网络安全
Python Socket编程:打造你的专属网络通道,基础篇与进阶篇一网打尽!
【7月更文挑战第26天】在网络编程领域,Python以简洁语法和强大库支持成为构建应用的首选。Socket编程为核心,实现计算机间的数据交换。
81 1