采用管道进行通讯的例子

简介:

作者:肖波

用Remoting做进程间通讯,效率较低,于是做了一个采用管道技术进行进程间通讯的例子,在1.8G 双核计算机上每秒钟可以发送180M数据。下面给出源码

Server端的管道类

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Pipe.Win32;

namespace Pipe.Server
{

public delegate void ReceiveMessageFunc(System.IO.MemoryStream m);
public delegate void ReceiveMessageErrorFunc(Exception e);

public class PipeServer : IDisposable
{
    enum State
    {
        Idle = 0,
        Begining = 1,
        Reading = 2,
    }

    String m_PipeName;
    uint m_Handle;
    uint m_BufferSize;
    State m_State = State.Idle;

    const ulong SYNC_HEAD = 0xf8c7a1ca13db307e;
    const uint NMPWAIT_USE_DEFAULT_WAIT = 0x00000000;
    const int DEFAULT_BUFFER_SIZE = 1024;

    ReceiveMessageFunc m_ReceiveMessage;
    ReceiveMessageErrorFunc m_ReceiveMessageError;

    public ReceiveMessageFunc OnReceiveMessage
    {
        get
        {
            return m_ReceiveMessage;
        }

        set
        {
            m_ReceiveMessage = value;
        }
    }

    public ReceiveMessageErrorFunc OnReceiveMessageError
    {
        get
        {
            return m_ReceiveMessageError;
        }

        set
        {
            m_ReceiveMessageError = value;
        }
    }

    public String PipeName
    {
        get
        {
            return m_PipeName;
        }
    }

    public uint BufferSize
    {
        get
        {
            return m_BufferSize;
        }
    }

    public String PipeUri
    {
        get
        {
            return @"\\.\pipe\" + m_PipeName;
        }
    }

    private bool IsSyncHead(byte[] buf, uint len, out int msgLen)
    {
        msgLen = 0;

        if (len != 12)
        {
            return false;
        }

        if (SYNC_HEAD != BitConverter.ToUInt64(buf, 0))
        {
            return false;
        }

        msgLen = BitConverter.ToInt32(buf, sizeof(ulong));

        if (msgLen < 0)
        {
            return false;
        }

        return true;
    }

    private void ProcessMessage(System.IO.MemoryStream m)
    {
        if (OnReceiveMessage != null)
        {
            m.Position = 0;
            OnReceiveMessage(m);
        }
    }

    private void ThreadProc()
    {
    }

    public PipeServer(String pipeName)
    {
        m_PipeName = pipeName;
        m_BufferSize = DEFAULT_BUFFER_SIZE;
    }

    public PipeServer(String pipeName, uint bufferSize)
    {
        m_PipeName = pipeName;
        m_BufferSize = bufferSize;
    }
    public void Listen()
    {
        while (true)
        {
            try
            {

                m_Handle = NTKernel.CreateNamedPipe(PipeUri, (uint)FileAccess.PIPE_ACCESS_DUPLEX,
                    (uint)PipeMode.PIPE_READMODE_MESSAGE | (uint)PipeMode.PIPE_TYPE_MESSAGE | (uint)PipeMode.PIPE_WAIT,
                    NTKernel.PIPE_UNLIMITED_INSTANCES, m_BufferSize, m_BufferSize, NMPWAIT_USE_DEFAULT_WAIT, new SecurityAttributes());

                if (m_Handle == NTKernel.INVAILD_HANDLE)
                {
                    throw new Exception(String.Format("CreateNamedPipe fail, err={0}", NTKernel.GetLastError()));
                }

                if (!NTKernel.ConnectNamedPipe(m_Handle, IntPtr.Zero))
                {
                    uint err = NTKernel.GetLastError();
                    NTKernel.CloseHandle(m_Handle);
                    throw new Exception(String.Format("ConnectNamedPipe fail, err={0}", err));
                }

                byte[] buf = new byte[m_BufferSize];

                uint relSize = 0;
                int msgLen = 0;
                int offset = 0;
                System.IO.MemoryStream m = new System.IO.MemoryStream();

                while (NTKernel.ReadFile(m_Handle, buf, m_BufferSize, out relSize, IntPtr.Zero))
                {
                    switch (m_State)
                    {
                        case State.Idle:
                            if (IsSyncHead(buf, relSize, out msgLen))
                            {
                                m_State = State.Begining;
                            }

                            break;
                        case State.Begining:
                            offset = 0;
                            m = new System.IO.MemoryStream();
                            m.Write(buf, 0, (int)relSize);
                            offset += (int)relSize;
                            if (offset >= msgLen)
                            {
                                m_State = State.Idle;

                                if (offset == msgLen)
                                {
                                    ProcessMessage(m);
                                }
                                else
                                {
                                    if (OnReceiveMessageError != null)
                                    {
                                        OnReceiveMessageError(new Exception("Message overflow!"));
                                    }
                                }

                            }
                            else
                            {
                                m_State = State.Reading;
                            }

                            break;
                        case State.Reading:
                            m.Write(buf, 0, (int)relSize);
                            offset += (int)relSize;
                            if (offset >= msgLen)
                            {
                                m_State = State.Idle;

                                if (offset == msgLen)
                                {
                                    ProcessMessage(m);
                                }
                                else
                                {
                                    if (OnReceiveMessageError != null)
                                    {
                                        OnReceiveMessageError(new Exception("Message overflow!"));
                                    }
                                }
                            }

                            break;

                    }
                }
                NTKernel.DisconnectNamedPipe(m_Handle);
                NTKernel.CloseHandle(m_Handle);
                System.Threading.Thread.Sleep(10);
            }
            catch (Exception e)
            {
                if (OnReceiveMessageError != null)
                {
                    OnReceiveMessageError(e);
                }
            }
        }
    }

    public void Dispose()
    {
        lock (this)
        {
            if (m_Handle != NTKernel.INVAILD_HANDLE)
            {
                NTKernel.CloseHandle(m_Handle);
                m_Handle = NTKernel.INVAILD_HANDLE;
            }
        }
    }

    ~PipeServer()
    {
        Dispose();
    }
}

}

Client 端的管道类

using System;
using System.Collections.Generic;
using System.Text;
using System.Diagnostics;
using Pipe.Win32;

namespace Pipe.Client
{

public class PipeClient : IDisposable
{
    String m_PipeName;
    String m_ComputerName;
    uint m_Handle;
    uint m_BufferSize;
    const ulong SYNC_HEAD = 0xf8c7a1ca13db307e;
    byte[] m_SendBuf;

    Propertys

    private void Connect()
    {
        int file_not_find_times = 0;

        while (true)
        {
            m_Handle = NTKernel.CreateFile(PipeUri, (uint)FileAccess.GENERIC_READ | (uint)FileAccess.GENERIC_WRITE,
                0, new SecurityAttributes(), (uint)CreateMode.OPEN_EXISTING, 0, 0);

            if (m_Handle == NTKernel.INVAILD_HANDLE)
            {
                uint err = NTKernel.GetLastError();

                if (err == NTKernel.ERROR_FILE_NOT_FOUND)
                {
                    if (file_not_find_times++ < 2000)
                    {
                        System.Threading.Thread.Sleep(20);
                        continue;
                    }
                }

                if (err == NTKernel.ERROR_PIPE_BUSY)
                {
                    NTKernel.WaitNamedPipeA(PipeUri, 20);
                    continue;
                }
                else
                {
                    throw new Exception(String.Format("Create File for pipe fail, err={0}", NTKernel.GetLastError()));
                }
            }

            break;
        }

    }

    private void WriteBuf(byte[] buf)
    {
        uint relSize;

        if (!NTKernel.WriteFile(m_Handle, buf, (uint)buf.Length, out relSize, IntPtr.Zero))
        {
            throw new Exception(String.Format("Send message to pipe fail, err={0}", NTKernel.GetLastError()));
        }
    }

    public void Close()
    {
        lock (this)
        {
            if (m_Handle != NTKernel.INVAILD_HANDLE)
            {
                bool ret = NTKernel.CloseHandle(m_Handle);
                m_Handle = NTKernel.INVAILD_HANDLE;
            }
        }
    }

    public PipeClient(String pipeName, uint bufferSize)
    {
        m_PipeName = pipeName;
        m_BufferSize = bufferSize;
        m_Handle = NTKernel.INVAILD_HANDLE;
        m_SendBuf = new byte[bufferSize];
    }

    public void Dispose()
    {
        Close();
    }

    public void Send(byte[] buf)
    {
        if (m_Handle == NTKernel.INVAILD_HANDLE)
        {
            Connect();
        }

        //Build Message Head
        byte[] syncHead = BitConverter.GetBytes(SYNC_HEAD);
        byte[] length = BitConverter.GetBytes(buf.Length);
        byte[] lengthBuf = new byte[syncHead.Length + length.Length];
        
        syncHead.CopyTo(lengthBuf, 0);
        
        for (int i = syncHead.Length; i < lengthBuf.Length; i++)
        {
            lengthBuf[i] = length[i - syncHead.Length];
        }

        WriteBuf(lengthBuf);

        //write content
        if (buf.Length < m_BufferSize)
        {
            WriteBuf(buf);
        }
        else
        {
            //the length of buf lardge than m_BufferSize

            int offset = 0;

            int len = Math.Min((int)m_BufferSize, buf.Length - offset);

            byte[] sendbuf;

            while (len > 0)
            {
                if (len == m_BufferSize)
                {
                    sendbuf = m_SendBuf;
                }
                else
                {
                    sendbuf = new byte[len];
                }

                System.IO.MemoryStream m = new System.IO.MemoryStream(sendbuf);
                m.Write(buf, offset, len);
                m.Close();
                offset += len;
                len = Math.Min((int)m_BufferSize, buf.Length - offset);
                WriteBuf(sendbuf);
            }
        }
    }

    ~PipeClient()
    {
        Dispose();
    }
}

}

NTKernel.cs
这个程序文件Client 和 Server 都要,封装了相应的API函数

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.InteropServices;

namespace Pipe.Win32
{

Data Structures

public class NTKernel
{
    public const uint PIPE_UNLIMITED_INSTANCES = 255;
    public const uint INVAILD_HANDLE = 0xFFFFFFFF;
    public const uint ERROR_FILE_NOT_FOUND = 2;
    public const uint ERROR_PIPE_BUSY = 231;

    internal const uint INFINITE = 0xFFFFFFFF;
    [DllImport("kernel32", EntryPoint = "GetLastError", SetLastError = true, CharSet = CharSet.Unicode)]
    public static extern uint GetLastError();

    [DllImport("kernel32.dll", SetLastError = true)]
    public static extern uint CreateNamedPipe(string lpName, uint dwOpenMode,
       uint dwPipeMode, uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
       uint nDefaultTimeOut, SecurityAttributes lpSecurityAttributes);

    [DllImport("kernel32.dll")]
    public static extern bool ConnectNamedPipe(uint hNamedPipe,
       IntPtr lpOverlapped);

    [DllImport("kernel32.dll")]
    public static extern bool DisconnectNamedPipe(uint hNamedPipe);
    
    [DllImport("kernel32.dll", SetLastError=true)] 
    public static extern int WaitNamedPipeA (string lpNamedPipeName, int nTimeOut);

    [DllImport("kernel32.dll")]
    public static extern bool ReadFile(uint hFile, byte[] lpBuffer,
       uint nNumberOfBytesToRead, out uint lpNumberOfBytesRead, IntPtr lpOverlapped);

    [DllImport("kernel32.dll")]
    public static extern bool WriteFile(uint hFile, byte[] lpBuffer,
       uint nNumberOfBytesToWrite, out uint lpNumberOfBytesWritten,
       IntPtr lpOverlapped);

    [DllImport("kernel32.dll", SetLastError = true)]
    public static extern bool CloseHandle(uint hHandle);

    [DllImport("kernel32.dll", CharSet = CharSet.Auto, CallingConvention = CallingConvention.StdCall, SetLastError = true)]
    public static extern uint CreateFile(
          string lpFileName,
          uint dwDesiredAccess,
          uint dwShareMode,
          SecurityAttributes lpSecurityAttributes,
          uint dwCreationDisposition,
          uint dwFlagsAndAttributes,
          int hTemplateFile
          );

    Mutex
    Semaphore

    Event

}

class Mutex : IDisposable
{
    IntPtr m_Handle;

    public Mutex(SecurityAttributes lpEventAttributes, bool bInitialOwner, string lpName)
    {
        m_Handle = NTKernel.CreateMutex(lpEventAttributes, bInitialOwner, lpName);

        if (m_Handle == IntPtr.Zero)
        {
            uint err = NTKernel.GetLastError();
            throw new Exception(String.Format("Create Event fail, error={0}",
                err));
        }
    }

    public Mutex(bool bInitialOwner, string lpName)
    {
        m_Handle = NTKernel.CreateMutex(null, bInitialOwner, lpName);

        if (m_Handle == IntPtr.Zero)
        {
            uint err = NTKernel.GetLastError();
            throw new Exception(String.Format("Create Event fail, error={0}",
                err));
        }
    }

    public bool WaitOne(uint dwMilliseconds)
    {
        WaitForState waitForState = (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);

        if (waitForState == WaitForState.WAIT_OBJECT_0)
        {
            return true;
        }
        else if (waitForState == WaitForState.WAIT_TIMEOUT)
        {
            return false;
        }
        else
        {
            throw new System.Threading.AbandonedMutexException();
        }
    }

    public bool WaitOne()
    {
        return WaitOne(NTKernel.INFINITE);
    }

    public void ReleaseMutex()
    {
        NTKernel.ReleaseMutex(m_Handle);
    }

    public void Close()
    {
        lock (this)
        {
            if (m_Handle != IntPtr.Zero)
            {
                if (NTKernel.CloseHandle((uint)m_Handle))
                {
                    m_Handle = IntPtr.Zero;
                }
            }
        }
    }

    ~Mutex()
    {
        Dispose();
    }

    IDisposable Members

}

public class Event : IDisposable
{
    IntPtr m_Handle;

    public Event()
    {
    }

    public Event(SecurityAttributes lpEventAttributes, bool bManualReset, bool bInitialState, string lpName)
    {
        m_Handle = NTKernel.CreateEvent(lpEventAttributes, bManualReset, bInitialState, lpName);

        if (m_Handle == IntPtr.Zero)
        {
            uint err = NTKernel.GetLastError();
            throw new Exception(String.Format("Create Event fail, error={0}",
                err));
        }
    }

    public bool Open(EventAccess dwDesiredAccess, bool bInheritHandle, string lpName)
    {
        m_Handle = NTKernel.OpenEvent((uint)dwDesiredAccess, bInheritHandle, lpName);

        if (m_Handle == IntPtr.Zero)
        {
            return false;
        }
        else
        {
            return true;
        }
    }

    public WaitForState WaitFor(uint dwMilliseconds)
    {
        return (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);
    }

    public WaitForState WaitFor()
    {
        return WaitFor(NTKernel.INFINITE);
    }

    public void SetEvent()
    {
        NTKernel.SetEvent(m_Handle);
    }

    public void Release()
    {
        NTKernel.ResetEvent(m_Handle);
    }

    public void Close()
    {
        lock (this)
        {
            if (m_Handle != IntPtr.Zero)
            {
                if (NTKernel.CloseHandle((uint)m_Handle))
                {
                    m_Handle = IntPtr.Zero;
                }
            }
        }
    }

    ~Event()
    {
        Dispose();
    }

    IDisposable Members
}

}

客户端调用

        byte[] buf = new byte[10240];
        Pipe.Client.PipeClient client = new Pipe.Client.PipeClient("test", 102400);

        for (int i = 0; i < 10000; i++)
        {
            try
            {
                client.Send(buf);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
            finally
            {
            }
        }

服务器调用

    static bool begin = true;
    static System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
    static int count = 0;
    static object lockObj = new object();

    static void ReceiveMessage(System.IO.MemoryStream m)
    {
        //Console.WriteLine(msg.Event);

        lock (lockObj)
        {

            if (begin)
            {
                watch.Start();
                begin = false;
            }

            count++;

            if (count == 10000)
            {
                watch.Stop();
                float len = m.Length;

                Console.WriteLine(String.Format("{0} MB", (len * 10000 * 1000 / watch.ElapsedMilliseconds) / (1024 * 1024)));
                Console.WriteLine(String.Format("{0} ms", watch.ElapsedMilliseconds));
           
            }
        }
    }

    static void ReceiveMessageError(Exception e)
    {
        Console.WriteLine(e.Message);
    }

    static void Main(string[] args)
    {
        Pipe.Server.PipeServer server = new Pipe.Server.PipeServer("Test", 102400);
        server.OnReceiveMessage = ReceiveMessage;
        server.OnReceiveMessageError = ReceiveMessageError;
        server.Listen();
    }

源码下载位置

目录
相关文章
|
5月前
|
设计模式 监控 网络协议
socket通信处于网络协议那一层和两种接收发送消息方式
socket通信处于网络协议那一层和两种接收发送消息方式
70 2
|
5月前
|
监控 安全
管道的三种使用方案中,唯一正确而安全的使用方法
管道的三种使用方案中,唯一正确而安全的使用方法
15 0
|
5月前
socket长连接的用处与模块图
socket长连接的用处与模块图
39 0
|
5月前
|
消息中间件 Linux
网络编程之 进程间的通信之管道的使用
如何使用管道是进程间通信的关键 博主先声明一下,关于处理进程创建以及销毁的方法。 “子进程究竟何时终止????调用waitpid函数后还要无休止的等待子进程终止吗???”,这显然会是一个问题。因为父进程往往与子进程一样繁忙,因此我们不能只调用waitpid函数来等待子进程终止。那么我们应该怎么办呢???
53 0
 网络编程之 进程间的通信之管道的使用
|
5月前
UDP通信程序练习(实现模拟聊天室)
UDP通信程序练习(实现模拟聊天室)
86 0
|
Unix 数据处理 Python
怎么还蹦出来个 “ 数据管道 ”
怎么还蹦出来个 “ 数据管道 ”
|
开发者
驱动开发:基于事件同步的反向通信
在之前的文章中`LyShark`一直都在教大家如何让驱动程序与应用层进行`正向通信`,而在某些时候我们不仅仅只需要正向通信,也需要反向通信,例如杀毒软件如果驱动程序拦截到恶意操作则必须将这个请求动态的转发到应用层以此来通知用户,而这种通信方式的实现有多种,通常可以使用创建Socket套接字的方式实现,亦或者使用本章所介绍的通过`事件同步`的方法实现反向通信。
188 0
|
JSON JavaScript 小程序
【小程序】组件通信
【小程序】组件通信
203 0
【小程序】组件通信
|
存储 测试技术 C语言
西门子S7-200 SMART自由口通信参数如何设置?如何编写发送程序和接收程序?如何测试?
西门子S7-200 SMART除了支持以太网通信,还可以通过CPU上或信号板上的RS485接口实现串口通信。支持的串口协议,包括自由口协议、USS协议、MODBUS协议和PPI协议。STEP7 Micro/WIN SMART编程软件安装时自动集成串口通信所需要的功能块和子程序。
西门子S7-200 SMART自由口通信参数如何设置?如何编写发送程序和接收程序?如何测试?
|
Windows
驱动开发:通过PIPE管道与内核层通信
在本人前一篇博文`《驱动开发:通过ReadFile与内核层通信》`详细介绍了如何使用应用层`ReadFile`系列函数实现内核通信,本篇将继续延申这个知识点,介绍利用`PIPE`命名管道实现应用层与内核层之间的多次通信方法。
337 0
驱动开发:通过PIPE管道与内核层通信