IOCP windowsServer示例

简介: IOCP windowsServer示例

1.环境

Qt5.9.6

vs2017

2.代码

  这段代码是一个简单的基于 Windows 平台的 C++ 程序,实现了一个基于 IOCP(Input/Output Completion Port)的简单的服务器,用于接受客户端的连接,并处理收发数据。

  • send_data_handle:处理接收到的数据,并准备要发送的数据。
  • PostAcceptEx:发起一个异步接受连接的操作。
  • workerThread:工作线程函数,处理完成端口上的 I/O 完成包。
  • InitServer:初始化服务器,包括启动 WinSock、创建监听套接字、绑定端口、创建完成端口等操作。
//main.cpp
#include "CodeServer.h"
#include <QtWidgets/QApplication>
#include <QJsonDocument>
#include <QJsonObject>
#pragma comment(lib,"ws2_32.lib")
#include <WinSock2.h>
#include <MSWSock.h>
#include <cstring>
#include <iostream>
#define BUFFER_SIZE 1024
#define THREAD_COUNT 2
#define START_POST_ACCEPTEX 2
#define PORT 8989
void send_data_handle(char* receive_buffer, char * buffer, ULONG &buffersize);
enum class IO_OP_TYPE {
  IO_ACCEPT,  // accept
  IO_SEND,
  IO_RECV,
  IO_CONNECT,
  IO_DISCONNECT,
};
struct ServerParams {
  SOCKET listenSocket;
  HANDLE completionPort;
};
struct ThreadParams {
  CodeServer* codeServer;
  ServerParams* serverParams;
};
typedef struct OverlappedPerIO {
  OVERLAPPED overlapped;
  SOCKET socket;
  WSABUF wsaBuf;
  IO_OP_TYPE type;
  char buffer[BUFFER_SIZE];
} *LPOverlappedPerIO;
void PostAcceptEx(SOCKET listenSocket) {
  SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  if (sock == INVALID_SOCKET) {
    return;
  }
  OverlappedPerIO* overlp = new OverlappedPerIO;
  if (overlp == nullptr) {
    closesocket(sock);
    return;
  }
  ZeroMemory(overlp, sizeof(OverlappedPerIO));
  overlp->socket = sock;
  overlp->wsaBuf.buf = overlp->buffer;
  overlp->wsaBuf.len = BUFFER_SIZE;
  overlp->type = IO_OP_TYPE::IO_ACCEPT;
  DWORD dwByteRecv = 0;
  while (false == AcceptEx(listenSocket,
    sock,
    overlp->wsaBuf.buf,
    0,
    sizeof(SOCKADDR_IN) + 16,
    sizeof(SOCKADDR_IN) + 16,
    &dwByteRecv,
    (LPOVERLAPPED)overlp)) {
    if (WSAGetLastError() == WSA_IO_PENDING) {
      break;
    }
    std::cout << WSAGetLastError() << std::endl;
  }
}
DWORD WINAPI workerThread(LPVOID lpParam) {
  ServerParams* pms = (ServerParams*)((ThreadParams*)lpParam)->serverParams;
  HANDLE completionPort = pms->completionPort;
  SOCKET listenSocket = pms->listenSocket;
  DWORD bytesTrans;
  ULONG_PTR comletionKey;
  LPOverlappedPerIO overlp;
  int ret;
  while (true) {
    BOOL result = GetQueuedCompletionStatus(
      completionPort,
      &bytesTrans,
      &comletionKey,
      (LPOVERLAPPED*)&overlp,
      INFINITE);
    if (!result) {
      if ((GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED)) {
        std::cout << "socket disconnection:" << overlp->socket << std::endl;
        ((ThreadParams*)lpParam)->codeServer->appendText("Client disconnection:" + QString::number(overlp->socket));
        closesocket(overlp->socket);
        delete overlp;
        continue;
      }
      std::cout << "GetQueuedCompletionStatus failed" << std::endl;
      return 0;
    }
    switch (overlp->type) {
    case IO_OP_TYPE::IO_ACCEPT:
    {
      PostAcceptEx(listenSocket);
      std::cout << "happed IO_ACCEPT:" << bytesTrans << std::endl;
      ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client establishes connection:"+ QString::number(overlp->socket));
      setsockopt(overlp->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&(listenSocket), sizeof(SOCKET));
      ZeroMemory(overlp->buffer, BUFFER_SIZE);
      overlp->type = IO_OP_TYPE::IO_RECV;
      overlp->wsaBuf.buf = overlp->buffer;
      overlp->wsaBuf.len = BUFFER_SIZE;
      CreateIoCompletionPort((HANDLE)overlp->socket, completionPort, NULL, 0);
      DWORD dwRecv = 0, dwFlag = 0;
      ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0);
      if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
        std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl;
      }
    }
    break;
    case IO_OP_TYPE::IO_RECV:
    {
      std::cout << "happed IO_RECV:" << bytesTrans << std::endl;
      if (bytesTrans == 0) {
        std::cout << "socket disconnection:" << overlp->socket << std::endl;
        ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client disconnection:" + QString::number(overlp->socket));
        closesocket(overlp->socket);
        delete overlp;
        continue;
      }
      ((ThreadParams*)lpParam)->codeServer->appendText(u8" Recved data(Client "+ QString::number(overlp->socket) + "):\n" + QString::fromUtf8(overlp->buffer));
      std::cout << "recved data:" << overlp->buffer << std::endl;
      ZeroMemory(&overlp->overlapped, sizeof(OVERLAPPED));
      overlp->type = IO_OP_TYPE::IO_SEND;
      //overlp->wsaBuf.buf = (char *)"response from server\n";
      //overlp->wsaBuf.len = strlen("response from server\n");
      send_data_handle(overlp->buffer, overlp->wsaBuf.buf, overlp->wsaBuf.len);
      DWORD dwSend = 0;
      ret = WSASend(overlp->socket, &overlp->wsaBuf, 1, &dwSend, 0, &(overlp->overlapped), 0);
      if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
        std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl;
      }
    }
    break;
    case IO_OP_TYPE::IO_SEND:
    {
      std::cout << "happed IO_SEND:" << bytesTrans << std::endl;
      if (bytesTrans == 0) {
        std::cout << "socket disconnection:" << overlp->socket << std::endl;
        ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client disconnection:" + QString::number(overlp->socket));
        closesocket(overlp->socket);
        delete overlp;
        continue;
      }
      ZeroMemory(overlp->buffer, BUFFER_SIZE);
      overlp->type = IO_OP_TYPE::IO_RECV;
      overlp->wsaBuf.buf = overlp->buffer;
      overlp->wsaBuf.len = BUFFER_SIZE;
      DWORD dwRecv = 0, dwFlag = 0;
      ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0);
      if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
        std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl;
      }
    }
    break;
    }
  }
  return 0;
}
void send_data_handle(char* receive_buffer,char * buffer,ULONG &buffersize) {
  QByteArray receive_byteArray(receive_buffer);
  QJsonDocument receive_jsonDoc = QJsonDocument::fromJson(receive_byteArray);
  // 检查JSON是否有效
  if (receive_jsonDoc.isNull()) {
    // JSON无效
    std::cout << "Failed to parse JSON."<<std::endl;
  }
  QJsonObject receive_jsonObject = receive_jsonDoc.object();
  QJsonObject jsonObject;
  if (receive_jsonObject["cmd"].toString() == "add")
  {
    jsonObject["cmd"] = "addreply";
    jsonObject["status"] = "true";
  }
  // 将QJsonObject转换为QJsonDocument
  QJsonDocument jsonDoc(jsonObject);
  // 将QJsonDocument转换为字节流
  QByteArray byteArray = jsonDoc.toJson();
  buffersize = byteArray.size();
  // 创建一个char数组来存储字节流,并复制数据
  //char* buffer = new char[bufferSize];
  memcpy(buffer, byteArray.constData(), buffersize);
   
}
// 初始化 回滚式
int InitServer(ServerParams& pms) {
  WSADATA wsaData;
  int ret;
  ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
  if (ret == 0) {
    pms.listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (pms.listenSocket != INVALID_SOCKET) {
      // 绑定地址和端口
      sockaddr_in address;
      address.sin_family = AF_INET;
      address.sin_addr.s_addr = INADDR_ANY;
      address.sin_port = htons(PORT);
      ret = bind(pms.listenSocket, (const sockaddr*)& address, sizeof(address));
      if (ret == 0) {
        ret = listen(pms.listenSocket, SOMAXCONN);
        if (ret == 0) {
          pms.completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
          if (pms.completionPort != NULL) {
            if (NULL != CreateIoCompletionPort((HANDLE)pms.listenSocket,
              pms.completionPort,
              NULL,
              0)) {
              return 0;
            }
            CloseHandle(pms.completionPort);
          }
        }
      }
    }
    closesocket(pms.listenSocket);
  }
  WSACleanup();
  if (ret == 0) ret = -1;
  return ret;
}
int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
  CodeServer w;
  w.setPortLabelText(PORT);
  ServerParams pms;
  int ret;
  ret = InitServer(pms);
  if (ret != 0) {
    std::cout << "InitServer Error" << std::endl;
    w.appendText("InitServer Error");
    //return 1;
  }
  else
  {
    w.appendText("InitServer Finish");
  }
  ThreadParams threadParams;
  threadParams.codeServer = &w;
  threadParams.serverParams = &pms;
  for (int i = 0; i < THREAD_COUNT; i++) {
    CreateThread(NULL, 0, workerThread, &threadParams, 0, NULL);
  }
  for (int i = 0; i < START_POST_ACCEPTEX; i++) {
    PostAcceptEx(pms.listenSocket);
  }
    w.show();
    return a.exec();
}
//CodeServer.h
#pragma once
#include <QtWidgets/QMainWindow>
#include "ui_CodeServer.h"
#include <QScrollBar>
#include <QDateTime>
namespace Ui {
  class CodeServerClass;
}
class CodeServer : public QMainWindow
{
    Q_OBJECT
public:
    CodeServer(QWidget *parent = nullptr);
    ~CodeServer();
  void appendText(QString str);
  void setPortLabelText(int port);
signals:
  void outputDataReceived(const QString& data);
private:
    Ui::CodeServerClass *ui;
};
//CodeServer.cpp
#include "CodeServer.h"
CodeServer::CodeServer(QWidget *parent)
    : QMainWindow(parent),
  ui(new Ui::CodeServerClass)
{
    ui->setupUi(this);
  ui->textEdit->setReadOnly(true);
  QScrollBar* verticalScrollBar = ui->textEdit->verticalScrollBar();
  verticalScrollBar->setValue(verticalScrollBar->maximum());
  // 监听文本内容变化的信号,保持滚动到最后
  QObject::connect(ui->textEdit, &QTextEdit::textChanged, [=]() {
    verticalScrollBar->setValue(verticalScrollBar->maximum());
  });
}
CodeServer::~CodeServer()
{}
void CodeServer::appendText(QString str)
{
  QDateTime currentDateTime = QDateTime::currentDateTime();
  QString dateTimeString = currentDateTime.toString("yyyy-MM-dd hh:mm:ss");
  ui->textEdit->append(dateTimeString + ":" + str);
}
void CodeServer::setPortLabelText(int port)
{
  ui->label_server_port->setText(QString::number(port));
}

3.主要流程

  • 在 main 函数中,首先初始化服务器,然后创建多个工作线程,每个工作线程都会在完成端口上等待 I/O 完成包的到来。
  • 当有新连接到来时,会调用 PostAcceptEx 发起异步接受连接的操作,然后在 workerThread 中处理连接的接收和发送数据的操作。
相关文章
|
6月前
|
移动开发 监控 网络协议
linux如何查看websocket的连接
linux如何查看websocket的连接
488 0
|
6月前
|
Shell Linux C语言
【Shell 命令集合 网络通讯 】Linux 建立串行连接 cu命令 使用指南
【Shell 命令集合 网络通讯 】Linux 建立串行连接 cu命令 使用指南
55 0
|
6月前
|
监控 Shell Linux
【Shell 命令集合 网络通讯 】Linux 发送和接收传真 efax命令 使用指南
【Shell 命令集合 网络通讯 】Linux 发送和接收传真 efax命令 使用指南
86 0
|
Linux PHP Windows
|
消息中间件 存储 API
事件,消息,消息处理函数,第一个图形界面程序(附带官方解释链接)
事件,消息,消息处理函数,第一个图形界面程序(附带官方解释链接)
|
Linux 测试技术 iOS开发
【实测】windows下进程的创建和终止-python3
【实测】windows下进程的创建和终止-python3
|
网络协议 Windows
通用异步 Windows Socket TCP 客户端组件的设计与实现
编写 Windows Socket TCP 客户端其实并不困难,Windows 提供了6种 I/O 通信模型供大家选择。但本座看过很多客户端程序都把 Socket 通信和业务逻辑混在一起,剪不断理还乱。
1085 0
|
缓存 网络协议 Windows
基于 IOCP 的通用异步 Windows Socket TCP 高性能服务端组件的设计与实现
设计概述   服务端通信组件的设计是一项非常严谨的工作,其中性能、伸缩性和稳定性是必须考虑的硬性质量指标,若要把组件设计为通用组件提供给多种已知或未知的上层应用使用,则设计的难度更会大大增加,通用性、可用性和灵活性必须考虑在内。
1419 0