采用管道进行通讯的例子

简介: 作者:肖波    用Remoting做进程间通讯,效率较低,于是做了一个采用管道技术进行进程间通讯的例子,在1.8G 双核计算机上每秒钟可以发送180M数据。下面给出源码       Server端的管道类using System;using System.

作者:肖波
    用Remoting做进程间通讯,效率较低,于是做了一个采用管道技术进行进程间通讯的例子,在1.8G 双核计算机上每秒钟可以发送180M数据。下面给出源码
   
    Server端的管道类

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Pipe.Win32;

namespace Pipe.Server
{
    public delegate void ReceiveMessageFunc(System.IO.MemoryStream m);
    public delegate void ReceiveMessageErrorFunc(Exception e);

    public class PipeServer : IDisposable
    {
        enum State
        {
            Idle = 0,
            Begining = 1,
            Reading = 2,
        }

        String m_PipeName;
        uint m_Handle;
        uint m_BufferSize;
        State m_State = State.Idle;

        const ulong SYNC_HEAD = 0xf8c7a1ca13db307e;
        const uint NMPWAIT_USE_DEFAULT_WAIT = 0x00000000;
        const int DEFAULT_BUFFER_SIZE = 1024;

        ReceiveMessageFunc m_ReceiveMessage;
        ReceiveMessageErrorFunc m_ReceiveMessageError;

        public ReceiveMessageFunc OnReceiveMessage
        {
            get
            {
                return m_ReceiveMessage;
            }

            set
            {
                m_ReceiveMessage = value;
            }
        }

        public ReceiveMessageErrorFunc OnReceiveMessageError
        {
            get
            {
                return m_ReceiveMessageError;
            }

            set
            {
                m_ReceiveMessageError = value;
            }
        }

        public String PipeName
        {
            get
            {
                return m_PipeName;
            }
        }

        public uint BufferSize
        {
            get
            {
                return m_BufferSize;
            }
        }

        public String PipeUri
        {
            get
            {
                return @"\\.\pipe\" + m_PipeName;
            }
        }

        private bool IsSyncHead(byte[] buf, uint len, out int msgLen)
        {
            msgLen = 0;

            if (len != 12)
            {
                return false;
            }

            if (SYNC_HEAD != BitConverter.ToUInt64(buf, 0))
            {
                return false;
            }

            msgLen = BitConverter.ToInt32(buf, sizeof(ulong));

            if (msgLen < 0)
            {
                return false;
            }

            return true;
        }

        private void ProcessMessage(System.IO.MemoryStream m)
        {
            if (OnReceiveMessage != null)
            {
                m.Position = 0;
                OnReceiveMessage(m);
            }
        }

        private void ThreadProc()
        {
        }

        public PipeServer(String pipeName)
        {
            m_PipeName = pipeName;
            m_BufferSize = DEFAULT_BUFFER_SIZE;
        }

        public PipeServer(String pipeName, uint bufferSize)
        {
            m_PipeName = pipeName;
            m_BufferSize = bufferSize;
        }


        public void Listen()
        {
            while (true)
            {
                try
                {

                    m_Handle = NTKernel.CreateNamedPipe(PipeUri, (uint)FileAccess.PIPE_ACCESS_DUPLEX,
                        (uint)PipeMode.PIPE_READMODE_MESSAGE | (uint)PipeMode.PIPE_TYPE_MESSAGE | (uint)PipeMode.PIPE_WAIT,
                        NTKernel.PIPE_UNLIMITED_INSTANCES, m_BufferSize, m_BufferSize, NMPWAIT_USE_DEFAULT_WAIT, new SecurityAttributes());

                    if (m_Handle == NTKernel.INVAILD_HANDLE)
                    {
                        throw new Exception(String.Format("CreateNamedPipe fail, err={0}", NTKernel.GetLastError()));
                    }

                    if (!NTKernel.ConnectNamedPipe(m_Handle, IntPtr.Zero))
                    {
                        uint err = NTKernel.GetLastError();
                        NTKernel.CloseHandle(m_Handle);
                        throw new Exception(String.Format("ConnectNamedPipe fail, err={0}", err));
                    }

                    byte[] buf = new byte[m_BufferSize];

                    uint relSize = 0;
                    int msgLen = 0;
                    int offset = 0;
                    System.IO.MemoryStream m = new System.IO.MemoryStream();

                    while (NTKernel.ReadFile(m_Handle, buf, m_BufferSize, out relSize, IntPtr.Zero))
                    {
                        switch (m_State)
                        {
                            case State.Idle:
                                if (IsSyncHead(buf, relSize, out msgLen))
                                {
                                    m_State = State.Begining;
                                }

                                break;
                            case State.Begining:
                                offset = 0;
                                m = new System.IO.MemoryStream();
                                m.Write(buf, 0, (int)relSize);
                                offset += (int)relSize;
                                if (offset >= msgLen)
                                {
                                    m_State = State.Idle;

                                    if (offset == msgLen)
                                    {
                                        ProcessMessage(m);
                                    }
                                    else
                                    {
                                        if (OnReceiveMessageError != null)
                                        {
                                            OnReceiveMessageError(new Exception("Message overflow!"));
                                        }
                                    }

                                }
                                else
                                {
                                    m_State = State.Reading;
                                }

                                break;
                            case State.Reading:
                                m.Write(buf, 0, (int)relSize);
                                offset += (int)relSize;
                                if (offset >= msgLen)
                                {
                                    m_State = State.Idle;

                                    if (offset == msgLen)
                                    {
                                        ProcessMessage(m);
                                    }
                                    else
                                    {
                                        if (OnReceiveMessageError != null)
                                        {
                                            OnReceiveMessageError(new Exception("Message overflow!"));
                                        }
                                    }
                                }

                                break;

                        }


                    }


                    NTKernel.DisconnectNamedPipe(m_Handle);
                    NTKernel.CloseHandle(m_Handle);
                    System.Threading.Thread.Sleep(10);
                }
                catch (Exception e)
                {
                    if (OnReceiveMessageError != null)
                    {
                        OnReceiveMessageError(e);
                    }
                }
            }
        }

        public void Dispose()
        {
            lock (this)
            {
                if (m_Handle != NTKernel.INVAILD_HANDLE)
                {
                    NTKernel.CloseHandle(m_Handle);
                    m_Handle = NTKernel.INVAILD_HANDLE;
                }
            }
        }

        ~PipeServer()
        {
            Dispose();
        }
    }

}


Client 端的管道类

using System;
using System.Collections.Generic;
using System.Text;
using System.Diagnostics;
using Pipe.Win32;

namespace Pipe.Client
{
    public class PipeClient : IDisposable
    {
        String m_PipeName;
        String m_ComputerName;
        uint m_Handle;
        uint m_BufferSize;
        const ulong SYNC_HEAD = 0xf8c7a1ca13db307e;
        byte[] m_SendBuf;

        Propertys

        private void Connect()
        {
            int file_not_find_times = 0;

            while (true)
            {
                m_Handle = NTKernel.CreateFile(PipeUri, (uint)FileAccess.GENERIC_READ | (uint)FileAccess.GENERIC_WRITE,
                    0, new SecurityAttributes(), (uint)CreateMode.OPEN_EXISTING, 0, 0);

                if (m_Handle == NTKernel.INVAILD_HANDLE)
                {
                    uint err = NTKernel.GetLastError();

                    if (err == NTKernel.ERROR_FILE_NOT_FOUND)
                    {
                        if (file_not_find_times++ < 2000)
                        {
                            System.Threading.Thread.Sleep(20);
                            continue;
                        }
                    }

                    if (err == NTKernel.ERROR_PIPE_BUSY)
                    {
                        NTKernel.WaitNamedPipeA(PipeUri, 20);
                        continue;
                    }
                    else
                    {
                        throw new Exception(String.Format("Create File for pipe fail, err={0}", NTKernel.GetLastError()));
                    }
                }

                break;
            }

        }

        private void WriteBuf(byte[] buf)
        {
            uint relSize;

            if (!NTKernel.WriteFile(m_Handle, buf, (uint)buf.Length, out relSize, IntPtr.Zero))
            {
                throw new Exception(String.Format("Send message to pipe fail, err={0}", NTKernel.GetLastError()));
            }
        }

        public void Close()
        {
            lock (this)
            {
                if (m_Handle != NTKernel.INVAILD_HANDLE)
                {
                    bool ret = NTKernel.CloseHandle(m_Handle);
                    m_Handle = NTKernel.INVAILD_HANDLE;
                }
            }
        }

        public PipeClient(String pipeName, uint bufferSize)
        {
            m_PipeName = pipeName;
            m_BufferSize = bufferSize;
            m_Handle = NTKernel.INVAILD_HANDLE;
            m_SendBuf = new byte[bufferSize];
        }

        public void Dispose()
        {
            Close();
        }

        public void Send(byte[] buf)
        {
            if (m_Handle == NTKernel.INVAILD_HANDLE)
            {
                Connect();
            }

            //Build Message Head
            byte[] syncHead = BitConverter.GetBytes(SYNC_HEAD);
            byte[] length = BitConverter.GetBytes(buf.Length);
            byte[] lengthBuf = new byte[syncHead.Length + length.Length];
            
            syncHead.CopyTo(lengthBuf, 0);
            
            for (int i = syncHead.Length; i < lengthBuf.Length; i++)
            {
                lengthBuf[i] = length[i - syncHead.Length];
            }

            WriteBuf(lengthBuf);

            //write content
            if (buf.Length < m_BufferSize)
            {
                WriteBuf(buf);
            }
            else
            {
                //the length of buf lardge than m_BufferSize

                int offset = 0;

                int len = Math.Min((int)m_BufferSize, buf.Length - offset);

                byte[] sendbuf;

                while (len > 0)
                {
                    if (len == m_BufferSize)
                    {
                        sendbuf = m_SendBuf;
                    }
                    else
                    {
                        sendbuf = new byte[len];
                    }

                    System.IO.MemoryStream m = new System.IO.MemoryStream(sendbuf);
                    m.Write(buf, offset, len);
                    m.Close();
                    offset += len;
                    len = Math.Min((int)m_BufferSize, buf.Length - offset);
                    WriteBuf(sendbuf);
                }
            }
        }

        ~PipeClient()
        {
            Dispose();
        }
    }
}


NTKernel.cs 
这个程序文件Client 和 Server 都要,封装了相应的API函数

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;


namespace Pipe.Win32
{

    Data Structures

    public class NTKernel
    {
        public const uint PIPE_UNLIMITED_INSTANCES = 255;
        public const uint INVAILD_HANDLE = 0xFFFFFFFF;
        public const uint ERROR_FILE_NOT_FOUND = 2;
        public const uint ERROR_PIPE_BUSY = 231;

        internal const uint INFINITE = 0xFFFFFFFF;


        [DllImport("kernel32", EntryPoint = "GetLastError", SetLastError = true, CharSet = CharSet.Unicode)]
        public static extern uint GetLastError();

        [DllImport("kernel32.dll", SetLastError = true)]
        public static extern uint CreateNamedPipe(string lpName, uint dwOpenMode,
           uint dwPipeMode, uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
           uint nDefaultTimeOut, SecurityAttributes lpSecurityAttributes);

        [DllImport("kernel32.dll")]
        public static extern bool ConnectNamedPipe(uint hNamedPipe,
           IntPtr lpOverlapped);

        [DllImport("kernel32.dll")]
        public static extern bool DisconnectNamedPipe(uint hNamedPipe);
        
        [DllImport("kernel32.dll", SetLastError=true)] 
        public static extern int WaitNamedPipeA (string lpNamedPipeName, int nTimeOut);

        [DllImport("kernel32.dll")]
        public static extern bool ReadFile(uint hFile, byte[] lpBuffer,
           uint nNumberOfBytesToRead, out uint lpNumberOfBytesRead, IntPtr lpOverlapped);

        [DllImport("kernel32.dll")]
        public static extern bool WriteFile(uint hFile, byte[] lpBuffer,
           uint nNumberOfBytesToWrite, out uint lpNumberOfBytesWritten,
           IntPtr lpOverlapped);

        [DllImport("kernel32.dll", SetLastError = true)]
        public static extern bool CloseHandle(uint hHandle);

        [DllImport("kernel32.dll", CharSet = CharSet.Auto, CallingConvention = CallingConvention.StdCall, SetLastError = true)]
        public static extern uint CreateFile(
              string lpFileName,
              uint dwDesiredAccess,
              uint dwShareMode,
              SecurityAttributes lpSecurityAttributes,
              uint dwCreationDisposition,
              uint dwFlagsAndAttributes,
              int hTemplateFile
              );

        Mutex


        Semaphore

        Event

    }

    class Mutex : IDisposable
    {
        IntPtr m_Handle;

        public Mutex(SecurityAttributes lpEventAttributes, bool bInitialOwner, string lpName)
        {
            m_Handle = NTKernel.CreateMutex(lpEventAttributes, bInitialOwner, lpName);

            if (m_Handle == IntPtr.Zero)
            {
                uint err = NTKernel.GetLastError();
                throw new Exception(String.Format("Create Event fail, error={0}",
                    err));
            }
        }

        public Mutex(bool bInitialOwner, string lpName)
        {
            m_Handle = NTKernel.CreateMutex(null, bInitialOwner, lpName);

            if (m_Handle == IntPtr.Zero)
            {
                uint err = NTKernel.GetLastError();
                throw new Exception(String.Format("Create Event fail, error={0}",
                    err));
            }
        }

        public bool WaitOne(uint dwMilliseconds)
        {
            WaitForState waitForState = (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);

            if (waitForState == WaitForState.WAIT_OBJECT_0)
            {
                return true;
            }
            else if (waitForState == WaitForState.WAIT_TIMEOUT)
            {
                return false;
            }
            else
            {
                throw new System.Threading.AbandonedMutexException();
            }


        }

        public bool WaitOne()
        {
            return WaitOne(NTKernel.INFINITE);
        }

        public void ReleaseMutex()
        {
            NTKernel.ReleaseMutex(m_Handle);
        }

        public void Close()
        {
            lock (this)
            {
                if (m_Handle != IntPtr.Zero)
                {
                    if (NTKernel.CloseHandle((uint)m_Handle))
                    {
                        m_Handle = IntPtr.Zero;
                    }
                }
            }
        }

        ~Mutex()
        {
            Dispose();
        }

        IDisposable Members

    }

    public class Event : IDisposable
    {
        IntPtr m_Handle;

        public Event()
        {
        }

        public Event(SecurityAttributes lpEventAttributes, bool bManualReset, bool bInitialState, string lpName)
        {
            m_Handle = NTKernel.CreateEvent(lpEventAttributes, bManualReset, bInitialState, lpName);

            if (m_Handle == IntPtr.Zero)
            {
                uint err = NTKernel.GetLastError();
                throw new Exception(String.Format("Create Event fail, error={0}",
                    err));
            }
        }

        public bool Open(EventAccess dwDesiredAccess, bool bInheritHandle, string lpName)
        {
            m_Handle = NTKernel.OpenEvent((uint)dwDesiredAccess, bInheritHandle, lpName);

            if (m_Handle == IntPtr.Zero)
            {
                return false;
            }
            else
            {
                return true;
            }
        }

        public WaitForState WaitFor(uint dwMilliseconds)
        {
            return (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);
        }

        public WaitForState WaitFor()
        {
            return WaitFor(NTKernel.INFINITE);
        }

        public void SetEvent()
        {
            NTKernel.SetEvent(m_Handle);
        }

        public void Release()
        {
            NTKernel.ResetEvent(m_Handle);
        }

        public void Close()
        {
            lock (this)
            {
                if (m_Handle != IntPtr.Zero)
                {
                    if (NTKernel.CloseHandle((uint)m_Handle))
                    {
                        m_Handle = IntPtr.Zero;
                    }
                }
            }
        }

        ~Event()
        {
            Dispose();
        }

        IDisposable Members
    }
}


客户端调用

            byte[] buf = new byte[10240];
            Pipe.Client.PipeClient client = new Pipe.Client.PipeClient("test", 102400);

            for (int i = 0; i < 10000; i++)
             {
                try
                {
                    client.Send(buf);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
                finally
                {
                }
            }


服务器调用

        static bool begin = true;
        static System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
        static int count = 0;
        static object lockObj = new object();

        static void ReceiveMessage(System.IO.MemoryStream m)
         {
            //Console.WriteLine(msg.Event);

            lock (lockObj)
            {

                if (begin)
                {
                    watch.Start();
                    begin = false;
                }

                count++;

                if (count == 10000)
                {
                    watch.Stop();
                    float len = m.Length;

                    Console.WriteLine(String.Format("{0} MB", (len * 10000 * 1000 / watch.ElapsedMilliseconds) / (1024 * 1024)));
                    Console.WriteLine(String.Format("{0} ms", watch.ElapsedMilliseconds));
               
                }
            }
        }

        static void ReceiveMessageError(Exception e)
        {
            Console.WriteLine(e.Message);
        }

        static void Main(string[] args)
        {
            Pipe.Server.PipeServer server = new Pipe.Server.PipeServer("Test", 102400);
            server.OnReceiveMessage = ReceiveMessage;
            server.OnReceiveMessageError = ReceiveMessageError;
            server.Listen();


        }


源码下载位置

目录
相关文章
|
6月前
|
设计模式 监控 网络协议
socket通信处于网络协议那一层和两种接收发送消息方式
socket通信处于网络协议那一层和两种接收发送消息方式
79 2
|
6月前
|
消息中间件 Linux
网络编程之 进程间的通信之管道的使用
如何使用管道是进程间通信的关键 博主先声明一下,关于处理进程创建以及销毁的方法。 “子进程究竟何时终止????调用waitpid函数后还要无休止的等待子进程终止吗???”,这显然会是一个问题。因为父进程往往与子进程一样繁忙,因此我们不能只调用waitpid函数来等待子进程终止。那么我们应该怎么办呢???
60 0
 网络编程之 进程间的通信之管道的使用
|
6月前
|
消息中间件
进程间的通信方式有哪些
进程间的通信方式有哪些
|
Unix 数据处理 Python
怎么还蹦出来个 “ 数据管道 ”
怎么还蹦出来个 “ 数据管道 ”
|
消息中间件 缓存 网络协议
进程间的通讯方式(三)
进程间的通讯方式
107 0
|
缓存 关系型数据库 MySQL
进程间的通讯方式(一)
进程间的通讯方式
76 0
|
消息中间件 存储 大数据
进程间的通讯方式(二)
进程间的通讯方式
103 0
Linux进程通信——管道(下)
Linux进程通信——管道(下)
|
Unix Linux C++
Linux进程通信——管道(上)
Linux进程通信——管道(上)
|
消息中间件 存储
进程间的通信方式 8种
进程间的通信方式 8种
121 0