1.环境
Qt5.9.6
vs2017
2.代码
这段代码是一个简单的基于 Windows 平台的 C++ 程序,实现了一个基于 IOCP(Input/Output Completion Port)的简单的服务器,用于接受客户端的连接,并处理收发数据。
- send_data_handle:处理接收到的数据,并准备要发送的数据。
- PostAcceptEx:发起一个异步接受连接的操作。
- workerThread:工作线程函数,处理完成端口上的 I/O 完成包。
- InitServer:初始化服务器,包括启动 WinSock、创建监听套接字、绑定端口、创建完成端口等操作。
//main.cpp #include "CodeServer.h" #include <QtWidgets/QApplication> #include <QJsonDocument> #include <QJsonObject> #pragma comment(lib,"ws2_32.lib") #include <WinSock2.h> #include <MSWSock.h> #include <cstring> #include <iostream> #define BUFFER_SIZE 1024 #define THREAD_COUNT 2 #define START_POST_ACCEPTEX 2 #define PORT 8989 void send_data_handle(char* receive_buffer, char * buffer, ULONG &buffersize); enum class IO_OP_TYPE { IO_ACCEPT, // accept IO_SEND, IO_RECV, IO_CONNECT, IO_DISCONNECT, }; struct ServerParams { SOCKET listenSocket; HANDLE completionPort; }; struct ThreadParams { CodeServer* codeServer; ServerParams* serverParams; }; typedef struct OverlappedPerIO { OVERLAPPED overlapped; SOCKET socket; WSABUF wsaBuf; IO_OP_TYPE type; char buffer[BUFFER_SIZE]; } *LPOverlappedPerIO; void PostAcceptEx(SOCKET listenSocket) { SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (sock == INVALID_SOCKET) { return; } OverlappedPerIO* overlp = new OverlappedPerIO; if (overlp == nullptr) { closesocket(sock); return; } ZeroMemory(overlp, sizeof(OverlappedPerIO)); overlp->socket = sock; overlp->wsaBuf.buf = overlp->buffer; overlp->wsaBuf.len = BUFFER_SIZE; overlp->type = IO_OP_TYPE::IO_ACCEPT; DWORD dwByteRecv = 0; while (false == AcceptEx(listenSocket, sock, overlp->wsaBuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwByteRecv, (LPOVERLAPPED)overlp)) { if (WSAGetLastError() == WSA_IO_PENDING) { break; } std::cout << WSAGetLastError() << std::endl; } } DWORD WINAPI workerThread(LPVOID lpParam) { ServerParams* pms = (ServerParams*)((ThreadParams*)lpParam)->serverParams; HANDLE completionPort = pms->completionPort; SOCKET listenSocket = pms->listenSocket; DWORD bytesTrans; ULONG_PTR comletionKey; LPOverlappedPerIO overlp; int ret; while (true) { BOOL result = GetQueuedCompletionStatus( completionPort, &bytesTrans, &comletionKey, (LPOVERLAPPED*)&overlp, INFINITE); if (!result) { if ((GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED)) { std::cout << "socket disconnection:" << overlp->socket << std::endl; ((ThreadParams*)lpParam)->codeServer->appendText("Client disconnection:" + QString::number(overlp->socket)); closesocket(overlp->socket); delete overlp; continue; } std::cout << "GetQueuedCompletionStatus failed" << std::endl; return 0; } switch (overlp->type) { case IO_OP_TYPE::IO_ACCEPT: { PostAcceptEx(listenSocket); std::cout << "happed IO_ACCEPT:" << bytesTrans << std::endl; ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client establishes connection:"+ QString::number(overlp->socket)); setsockopt(overlp->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&(listenSocket), sizeof(SOCKET)); ZeroMemory(overlp->buffer, BUFFER_SIZE); overlp->type = IO_OP_TYPE::IO_RECV; overlp->wsaBuf.buf = overlp->buffer; overlp->wsaBuf.len = BUFFER_SIZE; CreateIoCompletionPort((HANDLE)overlp->socket, completionPort, NULL, 0); DWORD dwRecv = 0, dwFlag = 0; ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0); if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl; } } break; case IO_OP_TYPE::IO_RECV: { std::cout << "happed IO_RECV:" << bytesTrans << std::endl; if (bytesTrans == 0) { std::cout << "socket disconnection:" << overlp->socket << std::endl; ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client disconnection:" + QString::number(overlp->socket)); closesocket(overlp->socket); delete overlp; continue; } ((ThreadParams*)lpParam)->codeServer->appendText(u8" Recved data(Client "+ QString::number(overlp->socket) + "):\n" + QString::fromUtf8(overlp->buffer)); std::cout << "recved data:" << overlp->buffer << std::endl; ZeroMemory(&overlp->overlapped, sizeof(OVERLAPPED)); overlp->type = IO_OP_TYPE::IO_SEND; //overlp->wsaBuf.buf = (char *)"response from server\n"; //overlp->wsaBuf.len = strlen("response from server\n"); send_data_handle(overlp->buffer, overlp->wsaBuf.buf, overlp->wsaBuf.len); DWORD dwSend = 0; ret = WSASend(overlp->socket, &overlp->wsaBuf, 1, &dwSend, 0, &(overlp->overlapped), 0); if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl; } } break; case IO_OP_TYPE::IO_SEND: { std::cout << "happed IO_SEND:" << bytesTrans << std::endl; if (bytesTrans == 0) { std::cout << "socket disconnection:" << overlp->socket << std::endl; ((ThreadParams*)lpParam)->codeServer->appendText(u8"Client disconnection:" + QString::number(overlp->socket)); closesocket(overlp->socket); delete overlp; continue; } ZeroMemory(overlp->buffer, BUFFER_SIZE); overlp->type = IO_OP_TYPE::IO_RECV; overlp->wsaBuf.buf = overlp->buffer; overlp->wsaBuf.len = BUFFER_SIZE; DWORD dwRecv = 0, dwFlag = 0; ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0); if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { std::cout << "WSARecv failed:" << WSAGetLastError() << std::endl; } } break; } } return 0; } void send_data_handle(char* receive_buffer,char * buffer,ULONG &buffersize) { QByteArray receive_byteArray(receive_buffer); QJsonDocument receive_jsonDoc = QJsonDocument::fromJson(receive_byteArray); // 检查JSON是否有效 if (receive_jsonDoc.isNull()) { // JSON无效 std::cout << "Failed to parse JSON."<<std::endl; } QJsonObject receive_jsonObject = receive_jsonDoc.object(); QJsonObject jsonObject; if (receive_jsonObject["cmd"].toString() == "add") { jsonObject["cmd"] = "addreply"; jsonObject["status"] = "true"; } // 将QJsonObject转换为QJsonDocument QJsonDocument jsonDoc(jsonObject); // 将QJsonDocument转换为字节流 QByteArray byteArray = jsonDoc.toJson(); buffersize = byteArray.size(); // 创建一个char数组来存储字节流,并复制数据 //char* buffer = new char[bufferSize]; memcpy(buffer, byteArray.constData(), buffersize); } // 初始化 回滚式 int InitServer(ServerParams& pms) { WSADATA wsaData; int ret; ret = WSAStartup(MAKEWORD(2, 2), &wsaData); if (ret == 0) { pms.listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (pms.listenSocket != INVALID_SOCKET) { // 绑定地址和端口 sockaddr_in address; address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(PORT); ret = bind(pms.listenSocket, (const sockaddr*)& address, sizeof(address)); if (ret == 0) { ret = listen(pms.listenSocket, SOMAXCONN); if (ret == 0) { pms.completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (pms.completionPort != NULL) { if (NULL != CreateIoCompletionPort((HANDLE)pms.listenSocket, pms.completionPort, NULL, 0)) { return 0; } CloseHandle(pms.completionPort); } } } } closesocket(pms.listenSocket); } WSACleanup(); if (ret == 0) ret = -1; return ret; } int main(int argc, char *argv[]) { QApplication a(argc, argv); CodeServer w; w.setPortLabelText(PORT); ServerParams pms; int ret; ret = InitServer(pms); if (ret != 0) { std::cout << "InitServer Error" << std::endl; w.appendText("InitServer Error"); //return 1; } else { w.appendText("InitServer Finish"); } ThreadParams threadParams; threadParams.codeServer = &w; threadParams.serverParams = &pms; for (int i = 0; i < THREAD_COUNT; i++) { CreateThread(NULL, 0, workerThread, &threadParams, 0, NULL); } for (int i = 0; i < START_POST_ACCEPTEX; i++) { PostAcceptEx(pms.listenSocket); } w.show(); return a.exec(); }
//CodeServer.h #pragma once #include <QtWidgets/QMainWindow> #include "ui_CodeServer.h" #include <QScrollBar> #include <QDateTime> namespace Ui { class CodeServerClass; } class CodeServer : public QMainWindow { Q_OBJECT public: CodeServer(QWidget *parent = nullptr); ~CodeServer(); void appendText(QString str); void setPortLabelText(int port); signals: void outputDataReceived(const QString& data); private: Ui::CodeServerClass *ui; };
//CodeServer.cpp #include "CodeServer.h" CodeServer::CodeServer(QWidget *parent) : QMainWindow(parent), ui(new Ui::CodeServerClass) { ui->setupUi(this); ui->textEdit->setReadOnly(true); QScrollBar* verticalScrollBar = ui->textEdit->verticalScrollBar(); verticalScrollBar->setValue(verticalScrollBar->maximum()); // 监听文本内容变化的信号,保持滚动到最后 QObject::connect(ui->textEdit, &QTextEdit::textChanged, [=]() { verticalScrollBar->setValue(verticalScrollBar->maximum()); }); } CodeServer::~CodeServer() {} void CodeServer::appendText(QString str) { QDateTime currentDateTime = QDateTime::currentDateTime(); QString dateTimeString = currentDateTime.toString("yyyy-MM-dd hh:mm:ss"); ui->textEdit->append(dateTimeString + ":" + str); } void CodeServer::setPortLabelText(int port) { ui->label_server_port->setText(QString::number(port)); }
3.主要流程
- 在 main 函数中,首先初始化服务器,然后创建多个工作线程,每个工作线程都会在完成端口上等待 I/O 完成包的到来。
- 当有新连接到来时,会调用 PostAcceptEx 发起异步接受连接的操作,然后在 workerThread 中处理连接的接收和发送数据的操作。