boost asio 多线程异步服务器历程

简介:

场景说明

    本例子支持多线程异步处理消息,针对每一个链接请求,创建线程处理稍后的指令,CSimpleSession::SessionThreadFunc是线程函数,async_read_some函数设置接收数据的回调函数ContinueRead,一般情况下,read_some函数未必能够完整的读取客户端发送的数据包,当然必须要指定明确的结束标志,双方必须规定好等接收完毕的时候,必须等待线程返回,因此在析构函数调用m_thread->join函数,等线程函数正常返回之后,关闭连接,如果没有等待线程返回,就直接关闭连接,会导致async_read_some函数抛出异常,目前暂时没有什么头绪


service.h


#ifndef QPIDPUSHMESSAGESERVICE_H

#define QPIDPUSHMESSAGESERVICE_H


#include <iostream>

#include <vector>

#include <fstream>

#include <boost/asio.hpp>

#include <boost/thread/thread.hpp>

#include <boost/bind.hpp>

#include <boost/shared_ptr.hpp>

#include <boost/function/function0.hpp>

#include <boost/enable_shared_from_this.hpp>

#include <boost/thread/mutex.hpp>


namespace qpid

{

class CSimpleSession : public boost::enable_shared_from_this<CSimpleSession>

{

public:

CSimpleSession(boost::asio::io_service &io_service) : m_socket(io_service)

{

m_bRunning = true;

PrepareForNextRecv();

}

~CSimpleSession()

{

m_bRunning = false;

m_thread->join();

m_socket.close();

}


void StartThread()

{

static boost::asio::ip::tcp::no_delay option(true);

m_socket.set_option(option);

m_thread.reset(new boost::thread(boost::bind(&CSimpleSession::SessionThreadFunc, this)));

}


void SessionThreadFunc()

{

while (m_bRunning)

{

if (m_bStartSetCallBackRead)

{

m_socket.async_read_some(boost::asio::buffer(m_szRecvBuffer),

boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred));

m_bStartSetCallBackRead = false;

}

boost::this_thread::sleep_for(boost::chrono::milliseconds(300));

}

m_bRunning = false;

}


boost::asio::ip::tcp::socket &GetSocket()

{

return m_socket;

}


bool GetCurThreadRunningStatus()

{

return m_bRunning;

}


void PrepareForNextRecv()

{

memset(m_szRecvBuffer, 0x00, 10240);

m_strMatch = "";

m_bStartSetCallBackRead = true;

}


private:


void ContinueRead(const boost::system::error_code &error, std::size_t bytes_transferred)

{

if (error)

{

m_bRunning = false;

return;

}


m_strMatch =  m_szRecvBuffer;

int nIndexOfContentLength = m_strMatch.find("Content-Length:", 0);

int indexOfEnd = m_strMatch.find("\r\n\r\n", 0);

if (nIndexOfContentLength  == -1)

{

m_bRunning = false;

return;

}

std::cout << m_strMatch << std::endl;

std::string strContextLen = m_strMatch.substr(nIndexOfContentLength + 15, indexOfEnd - nIndexOfContentLength - 15);

int nContextLen = atoi(strContextLen.c_str());

if (nContextLen < m_strMatch.length())

{

//handle

m_bRunning = false;

return;

}

m_socket.async_read_some(boost::asio::buffer((m_szRecvBuffer)),

boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),

boost::asio::placeholders::error,

boost::asio::placeholders::bytes_transferred));

}


private:

boost::asio::ip::tcp::socket m_socket;

char m_szRecvBuffer[10240];

std::string m_strMatch;

bool m_bStartSetCallBackRead;

bool m_bRunning;

boost::shared_ptr<boost::thread> m_thread;

};


typedef boost::shared_ptr<CSimpleSession> CPtrSession;


class CSimpleServer

{

public:


CSimpleServer(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint &endpoint)

:m_ioService(io_service), m_acceptor(io_service, endpoint)

{

CPtrSession newSession(new CSimpleSession(io_service));

m_vecSession.push_back(newSession);

m_acceptor.async_accept(newSession->GetSocket(),

boost::bind(&CSimpleServer::HandleAccept,

this,

newSession,

boost::asio::placeholders::error));

}


void HandleAccept(CPtrSession newSession, const boost::system::error_code &error)

{

if (error) return;


//如果Start函数进行了阻塞,只有处理完当前的连接,才会进行下一步处理连接

newSession->StartThread();

ClearHasEndConnection();

CPtrSession createNewSession(new CSimpleSession(m_ioService));

m_vecSession.push_back(createNewSession);

m_acceptor.async_accept(createNewSession->GetSocket(),

boost::bind(&CSimpleServer::HandleAccept,

this,

createNewSession,

boost::asio::placeholders::error));

}


void ClearHasEndConnection()

{

std::vector<CPtrSession>::iterator iter;

iter = m_vecSession.begin();

std::size_t count = m_vecSession.size();

std::cout << "session count:" << count << std::endl;

while (iter != m_vecSession.end())

{

if (!(*iter)->GetCurThreadRunningStatus())

{

iter->reset();

m_vecSession.erase(iter);

break;

}

iter++;

}

}


void run()

{

m_ioService.run();

}


private:

boost::asio::io_service &m_ioService;

std::vector<CPtrSession> m_vecSession;

boost::asio::ip::tcp::acceptor m_acceptor;

};


void StartListenThread();


int StartListenService();

}


#endif


service.cpp


#include <boost/thread/thread.hpp>

#include "service.h"


void qpid::StartListenThread()

{

boost::asio::io_service ioService;

boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("192.168.0.34"), 7003);


qpid::CSimpleServer s(ioService, endpoint);

s.run();

}


int qpid::StartListenService()

{

boost::thread serviceThread(&StartListenThread);

serviceThread.detach();

return 0;

}






     本文转自fengyuzaitu 51CTO博客,原文链接:http://blog.51cto.com/fengyuzaitu/1953528,如需转载请自行联系原作者

相关文章
|
1月前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
78 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
2月前
|
安全 Java 调度
Java编程时多线程操作单核服务器可以不加锁吗?
Java编程时多线程操作单核服务器可以不加锁吗?
44 2
|
1月前
|
网络协议 Unix Linux
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
|
1月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
1月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
61 0
|
2月前
|
设计模式 缓存 Java
谷粒商城笔记+踩坑(14)——异步和线程池
初始化线程的4种方式、线程池详解、异步编排 CompletableFuture
谷粒商城笔记+踩坑(14)——异步和线程池
|
3月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
3月前
|
Java
Java使用FileInputStream&&FileOutputStream模拟客户端向服务器端上传文件(单线程)
Java使用FileInputStream&&FileOutputStream模拟客户端向服务器端上传文件(单线程)
83 1
|
3月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
3月前
|
C语言
【C语言】多线程服务器
【C语言】多线程服务器
30 0

热门文章

最新文章