boost中asio网络库多线程并发处理实现,以及asio在多线程模型中线程的调度情况和线程安全。

简介: 1、实现多线程方法: 其实就是多个线程同时调用io_service::run         for (int i = 0; i != m_nThreads; ++i)        {            boost::shared_ptr pTh(new boost::thread(   ...

1、实现多线程方法:

其实就是多个线程同时调用io_service::run

        for (int i = 0; i != m_nThreads; ++i)
        {
            boost::shared_ptr<boost::thread> pTh(new boost::thread(
                boost::bind(&boost::asio::io_service::run,&m_ioService)));
            m_listThread.push_back(pTh);
        }

2、多线程调度情况:

asio规定:只能在调用io_service::run的线程中才能调用事件完成处理器。

注:事件完成处理器就是你async_accept、async_write等注册的句柄,类似于回调的东西。

单线程:

如果只有一个线程调用io_service::run,根据asio的规定,事件完成处理器也只能在这个线程中执行。也就是说,你所有代码都在同一个线程中运行,因此变量的访问是安全的。

多线程:

如果有多个线程同时调用io_service::run以实现多线程并发处理。对于asio来说,这些线程都是平等的,没有主次之分。如果你投递的一个请求比如async_write完成时,asio将随机的激活调用io_service::run的线程。并在这个线程中调用事件完成处理器(async_write当时注册的句柄)。如果你的代码耗时较长,这个时候你投递的另一个async_write请求完成时,asio将不等待你的代码处理完成,它将在另外的一个调用io_service::run线程中,调用async_write当时注册的句柄。也就是说,你注册的事件完成处理器有可能同时在多个线程中调用。

当然你可以使用 boost::asio::io_service::strand让完成事件处理器的调用,在同一时间只有一个, 比如下面的的代码:

  socket_.async_read_some(boost::asio::buffer(buffer_),
      strand_.wrap(
        boost::bind(&connection::handle_read, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred)));

...

boost::asio::io_service::strand strand_;

 

此时async_read_som完成后掉用handle_read时,必须等待其它handle_read调用完成时才能被执行(async_read_som引起的handle_read调用)。

      多线程调用时,还有一个重要的问题,那就是无序化。比如说,你短时间内投递多个async_write,那么完成处理器的调用并不是按照你投递async_write的顺序调用的。asio第一次调用完成事件处理器,有可能是第二次async_write返回的结果,也有可能是第3次的。使用strand也是这样的。strand只是保证同一时间只运行一个完成处理器,但它并不保证顺序。

 

代码测试:

服务器:

将下面的代码编译以后,使用cmd命令提示符下传人参数<IP> <port> <threads>调用

比如:test.exe 0.0.0.0 3005 10   

客服端 使用windows自带的telnet

cmd命令提示符:

telnet 127.0.0.1 3005

 

原理:客户端连接成功后,同一时间调用100次boost::asio::async_write给客户端发送数据,并且在完成事件处理器中打印调用序号,和线程ID。

核心代码:

    void start()
    {
        for (int i = 0; i != 100; ++i)
        {
            boost::shared_ptr<string> pStr(new string);
            *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
            *pStr += "\r\n";
            boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 pStr,i)
                );
        }
    }

//去掉 boost::mutex::scoped_lock lk(m_ioMutex); 效果更明显。

    void HandleWrite(const boost::system::error_code& error
        ,std::size_t bytes_transferred
        ,boost::shared_ptr<string> pStr,int nIndex)
    {
        if (!error)
        {
            boost::mutex::scoped_lock lk(m_ioMutex);
            cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
        }
        else
        {
            cout << "连接断开" << endl;
        }
    }

 

完整代码:

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <string>
#include <iostream>


using std::cout;
using std::endl;
using std::string;
using boost::asio::ip::tcp;


class CMyTcpConnection
    : public boost::enable_shared_from_this<CMyTcpConnection>
{
public:
    CMyTcpConnection(boost::asio::io_service &ser)
        :m_nSocket(ser)
    {
    }
    typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;


    static CPMyTcpCon CreateNew(boost::asio::io_service& io_service)
    {
        return CPMyTcpCon(new CMyTcpConnection(io_service));
    }


   
public:
    void start()
    {
        for (int i = 0; i != 100; ++i)
        {
            boost::shared_ptr<string> pStr(new string);
            *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
            *pStr += "\r\n";
            boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 pStr,i)
                );
        }
    }
    tcp::socket& socket()
    {
        return m_nSocket;
    }
private:
    void HandleWrite(const boost::system::error_code& error
        ,std::size_t bytes_transferred
        ,boost::shared_ptr<string> pStr,int nIndex)
    {
        if (!error)
        {
            boost::mutex::scoped_lock lk(m_ioMutex);
            cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
        }
        else
        {
            cout << "连接断开" << endl;
        }
    }
private:
    tcp::socket m_nSocket;
    boost::mutex m_ioMutex;
};


class CMyService
    : private boost::noncopyable
{
public:
    CMyService(string const &strIP,string const &strPort,int nThreads)
        :m_tcpAcceptor(m_ioService)
        ,m_nThreads(nThreads)
    {
        tcp::resolver resolver(m_ioService);
        tcp::resolver::query query(strIP,strPort);
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
        m_tcpAcceptor.open(endpoint.protocol());
        m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        m_tcpAcceptor.bind(endpoint);
        m_tcpAcceptor.listen();


        StartAccept();
    }
    ~CMyService(){Stop();}
public:
    void Stop() 
    { 
        m_ioService.stop();
        for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
            it != m_listThread.cend(); ++ it)
        {
            (*it)->join();
        }
    }
    void Start()
    {
        for (int i = 0; i != m_nThreads; ++i)
        {
            boost::shared_ptr<boost::thread> pTh(new boost::thread(
                boost::bind(&boost::asio::io_service::run,&m_ioService)));
            m_listThread.push_back(pTh);
        }
    }
private:
    void HandleAccept(const boost::system::error_code& error
        ,boost::shared_ptr<CMyTcpConnection> newConnect)
    {
        if (!error)
        {
            newConnect->start();
        }
        StartAccept();
    }


    void StartAccept()
    {
        CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
        m_tcpAcceptor.async_accept(newConnect->socket(),
            boost::bind(&CMyService::HandleAccept, this,
            boost::asio::placeholders::error,newConnect));
    }
private:
    boost::asio::io_service m_ioService;
    boost::asio::ip::tcp::acceptor m_tcpAcceptor;
    std::vector<boost::shared_ptr<boost::thread>> m_listThread;
    std::size_t m_nThreads;
};


int main(int argc, char* argv[])
{
    try
    {
        if (argc != 4)
        {
            std::cerr << "<IP> <port> <threads>\n";
            return 1;
        }
        int nThreads = boost::lexical_cast<int>(argv[3]);
        CMyService mySer(argv[1],argv[2],nThreads);
        mySer.Start();
        getchar();
        mySer.Stop();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }
    return 0;
}

 

 

测试发现和上面的理论是一致的,发送序号是乱的,线程ID也不是同一个。

 

asio多线程中线程的合理个数:

作为服务器,在不考虑省电的情况下,应该尽可能的使用cpu。也就是说,为了让cpu都忙起来,你的线程个数应该大于等于你电脑的cpu核心数(一个核心运行一个线程)。具体的值没有最优方案,大多数人使用cpu核心数*2 + 2的这种方案,但它不一定适合你的情况。

asio在windows xp等系统中的实现:

asio在windows下使用完成端口,如果你投递的请求没有完成,那么这些线程都在等待GetQueuedCompletionStatus的返回,也就是等待内核对象,此时线程是不占用cpu时间的。

目录
相关文章
|
3月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
69 6
|
3月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
103 8
|
3月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
3月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
2月前
|
SQL 安全 网络安全
网络安全与信息安全:知识分享####
【10月更文挑战第21天】 随着数字化时代的快速发展,网络安全和信息安全已成为个人和企业不可忽视的关键问题。本文将探讨网络安全漏洞、加密技术以及安全意识的重要性,并提供一些实用的建议,帮助读者提高自身的网络安全防护能力。 ####
90 17
|
2月前
|
存储 SQL 安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将介绍网络安全的重要性,分析常见的网络安全漏洞及其危害,探讨加密技术在保障网络安全中的作用,并强调提高安全意识的必要性。通过本文的学习,读者将了解网络安全的基本概念和应对策略,提升个人和组织的网络安全防护能力。
|
2月前
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
随着互联网的普及,网络安全问题日益突出。本文将从网络安全漏洞、加密技术和安全意识三个方面进行探讨,旨在提高读者对网络安全的认识和防范能力。通过分析常见的网络安全漏洞,介绍加密技术的基本原理和应用,以及强调安全意识的重要性,帮助读者更好地保护自己的网络信息安全。
64 10
|
2月前
|
SQL 安全 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
在数字化时代,网络安全和信息安全已成为我们生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术和安全意识等方面的内容,并提供一些实用的代码示例。通过阅读本文,您将了解到如何保护自己的网络安全,以及如何提高自己的信息安全意识。
75 10
|
2月前
|
存储 监控 安全
云计算与网络安全:云服务、网络安全、信息安全等技术领域的融合与挑战
本文将探讨云计算与网络安全之间的关系,以及它们在云服务、网络安全和信息安全等技术领域中的融合与挑战。我们将分析云计算的优势和风险,以及如何通过网络安全措施来保护数据和应用程序。我们还将讨论如何确保云服务的可用性和可靠性,以及如何处理网络攻击和数据泄露等问题。最后,我们将提供一些关于如何在云计算环境中实现网络安全的建议和最佳实践。
|
2月前
|
监控 安全 网络安全
网络安全与信息安全:漏洞、加密与意识的交织
在数字时代的浪潮中,网络安全与信息安全成为维护数据完整性、保密性和可用性的关键。本文深入探讨了网络安全中的漏洞概念、加密技术的应用以及提升安全意识的重要性。通过实际案例分析,揭示了网络攻击的常见模式和防御策略,强调了教育和技术并重的安全理念。旨在为读者提供一套全面的网络安全知识框架,从而在日益复杂的网络环境中保护个人和组织的资产安全。