写在前面
最近项目中有用到boost库,这里就记录下学习过程。
一、什么是boost库
Boost是一个功能强大 , 构造精良 , 跨越平台 , 代码开源 , 完全免费的 C ++ 程序库。共包含 160 余个库 / 组件 , 涵盖字符串与文本处理、容器、迭代器、算法、图像处理、模板元编程、并发编程等多个领域。 由 c ++ 标准委员会成员发起倡议并建立 boost 社区 , C ++11 标准库中三分之二来自 boost , 并且将来还会有更多的库进入 c ++ 标准库 , 因此 boost 是一个 c ++ " 准 " 标准库。支持现有的所有操作系统。
二、使用boost实现socket客户端与服务端
2.1 客户端代码
BoostClient.h
#include "BoostClient.h" BoostClient::BoostClient(io_service& service, ip::tcp::endpoint ep):m_ios(service) { pSocket.reset(new ip::tcp::socket(service)); boost::system::error_code ec; pSocket->connect(ep, ec); if(ec) { std::cerr << "Error connecting to server: " << ec.message() << std::endl; bConnected = false; return; } bConnected = true; } void BoostClient::AsyncRecvMessage() { memset(data_.data(), 0, sizeof(data_)); pSocket->async_read_some(buffer(data_), [this](const boost::system::error_code &ec, size_t bytes) { if(!ec) { std::cout<<"recv size:"<<bytes<<std::endl; std::cout<<"recv data:"<<data_.data()<<std::endl; AsyncRecvMessage(); } else if(ec == error::eof) { // 断开连接 bConnected = false; std::cout<<"disconnected !"<<std::endl; } else { std::cout<<"read error: "<<ec.message()<<std::endl; } }); } void BoostClient::AsyncSendMessage(std::string message) { if(message.empty() || !bConnected) { return; } async_write(*pSocket, buffer(message.c_str(), message.size()), [this](const boost::system::error_code &ec, size_t writed_bytes) { if (!ec) { } else { std::cout <<"send error:"<<ec.message() << std::endl; bConnected = false; } }); }
BoostClient.h
#include <iostream> #include <array> #include <boost/asio.hpp> using namespace boost::asio; class BoostClient { public: BoostClient(io_service& service, ip::tcp::endpoint ep); void AsyncRecvMessage(); void AsyncSendMessage(std::string message); private: bool bConnected; // io_service &ios; io_service &m_ios; std::shared_ptr<ip::tcp::socket> pSocket; std::array<char, 1024> data_; };
main.cpp
#include <iostream> #include "BoostClient.h" #define DEFAULT_SERVER_IP "127.0.0.1" #define DEFAULT_SERVER_PORT 8888 int main(int argc, char **argv) { ip::tcp::endpoint ep(ip::address_v4::from_string(DEFAULT_SERVER_IP), DEFAULT_SERVER_PORT); io_service service; BoostClient client(service, ep); client.AsyncSendMessage("hello, server this is message."); client.AsyncRecvMessage(); service.run(); return 0; }
CMakeLists.txt\
cmake_minimum_required(VERSION 3.0.0) project(boost_client) add_executable(boost_client main.cpp BoostClient.cpp ) #boost库检查 find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED) if(Boost_FOUND) include_directories(${Boost_INCLUDE_DIRS}) link_directories(${Boost_LIBRARY_DIRS}) else() message("check boost lib failed!") endif() target_link_libraries(${PROJECT_NAME} pthread libboost_thread.a libboost_filesystem.a libboost_log_setup.a libboost_log.a libboost_locale.a libboost_coroutine.a libboost_context.a )
2.2 服务端代码
BoostServer.h
#include <memory> #include <iostream> #include <array> #include <vector> #include <queue> #include <thread> #include <mutex> #include <atomic> #include <boost/asio.hpp> #include <boost/thread.hpp> using namespace boost::asio; typedef std::function<void(void)> Callback_Close; typedef std::function<void(std::string)> Callback_SendMessage; // 接收客户端发送来的消息 class Session_Recv : public std::enable_shared_from_this<Session_Recv> { public: Session_Recv(ip::tcp::socket socket); void SetClose(Callback_Close pClose); void SetMsg_Ntf(Callback_SendMessage pMsg); void async_read(); bool GetState(); private: ip::tcp::socket socket_; std::array<char, 1024> data_; std::atomic<bool> connected_; Callback_Close ptr_close_; Callback_SendMessage ptr_message_; }; // 向客户端发送消息 class Session_Send : public std::enable_shared_from_this<Session_Send> { public: Session_Send(ip::tcp::socket socket); void SetClose(Callback_Close pClose); void async_send(std::string message); bool GetState(); private: ip::tcp::socket socket_; std::atomic<bool> connected_; Callback_Close ptr_close_; }; // 服务器 接收port_recv端口发来的数据 向port_send端口发送数据 class BoostServer { public: BoostServer(boost::asio::io_context &ctx, short port_recv, short port_send, std::string ip_recv = "", std::string ip_send = ""); void Close_Session_Recv(); void Close_Session_Send(); void GetContent(std::string content); private: void Async_Accept_Recv(); void Async_Accept_Send(); void SendMessage(); private: std::shared_ptr<std::thread> ptr_Thread_Msg; std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Recv_; std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Send_; std::shared_ptr<ip::tcp::endpoint> ptr_Recv_ep; std::shared_ptr<ip::tcp::endpoint> ptr_Send_ep; std::vector<std::shared_ptr<Session_Recv>> clients_recv; std::vector<std::shared_ptr<Session_Send>> clients_send; std::queue<std::string> Array_messages; std::atomic<bool> bStopSend; std::mutex mtx; };
BoostServer.cpp
#include "boostServer.h" Session_Recv::Session_Recv(ip::tcp::socket socket):socket_(std::move(socket)) { connected_ = true; ptr_close_ = nullptr; ptr_message_ = nullptr; } void Session_Recv::SetClose(Callback_Close pClose) { ptr_close_ = pClose; } void Session_Recv::SetMsg_Ntf(Callback_SendMessage pMsg) { ptr_message_ = pMsg; } void Session_Recv::async_read() { auto self(shared_from_this()); memset(data_.data(), 0, sizeof(data_)); // 读操作完成时回调该函数, 读取到一些数据就会触发回调 socket_.async_read_some(buffer(data_), [this, self](const boost::system::error_code &ec, size_t bytes) { if (!ec) { if(ptr_message_) { ptr_message_(std::string(data_.data(), bytes)); } std::cout << "size:"<< bytes<<std::endl; async_read(); } else if(ec == error::eof) { // 断开连接 std::cout <<"read disconnect:"<<ec.message() << std::endl; connected_ = false; if(ptr_close_) { ptr_close_(); } } else { std::cout<<"read error:"<<ec.message()<<std::endl; } }); } bool Session_Recv::GetState() { return connected_; } Session_Send::Session_Send(ip::tcp::socket socket):socket_(std::move(socket)) { connected_ = true; ptr_close_ = nullptr; } void Session_Send::SetClose(Callback_Close pClose) { ptr_close_ = pClose; } void Session_Send::async_send(std::string message) { if(message.empty() || !connected_) { return; } auto self(shared_from_this()); async_write(socket_, buffer(message.c_str(), message.size()), [this, self](const boost::system::error_code &ec, size_t writed_bytes) { if (!ec) { // async_read(); } else { std::cout <<"send error:"<<ec.message() << std::endl; connected_ = false; if(ptr_close_) { ptr_close_(); } } }); } bool Session_Send::GetState() { return connected_; } BoostServer::BoostServer(boost::asio::io_context &ctx, short port_recv, short port_send, std::string ip_recv, std::string ip_send) { ptr_Thread_Msg.reset(new std::thread(std::bind(&BoostServer::SendMessage, this))); if("" == ip_recv) { ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_recv)); } else { ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_recv), port_recv)); } Acceptor_Recv_.reset(new ip::tcp::acceptor(ctx, *ptr_Recv_ep)); if("" == ip_send) { ptr_Send_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_send)); } else { ptr_Send_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_send), port_send)); } Acceptor_Send_.reset(new ip::tcp::acceptor(ctx, *ptr_Send_ep)); clients_recv.clear(); clients_send.clear(); Async_Accept_Recv(); Async_Accept_Send(); } void BoostServer::Async_Accept_Recv() { Acceptor_Recv_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket) { if (!ec) { auto ptr = std::make_shared<Session_Recv>(std::move(socket)); ptr->SetClose(std::bind(&BoostServer::Close_Session_Recv, this)); ptr->SetMsg_Ntf(std::bind(&BoostServer::GetContent, this, std::placeholders::_1)); ptr->async_read(); clients_recv.push_back(ptr); } Async_Accept_Recv(); }); } void BoostServer::Async_Accept_Send() { Acceptor_Send_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket) { if (!ec) { auto ptr = std::make_shared<Session_Send>(std::move(socket)); ptr->SetClose(std::bind(&BoostServer::Close_Session_Send, this)); clients_send.push_back(ptr); } Async_Accept_Send(); }); } void BoostServer::Close_Session_Recv() { for(auto ptr = clients_recv.begin(); ptr != clients_recv.end();) { if (false == ptr->get()->GetState()) { ptr->reset(); clients_recv.erase(ptr); continue; } ptr++; } std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl; } void BoostServer::Close_Session_Send() { for(auto ptr = clients_send.begin(); ptr != clients_send.end();) { if(false == ptr->get()->GetState()) { ptr->reset(); clients_send.erase(ptr); continue; } ptr++; } std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl; } void BoostServer::GetContent(std::string content) { if(content.empty()) { return; } if(mtx.try_lock()) { Array_messages.push(content); mtx.unlock(); } } void BoostServer::SendMessage() { std::string message = ""; bStopSend = false; while(!bStopSend) { if(mtx.try_lock()) { if(!Array_messages.empty()) { message = Array_messages.front(); Array_messages.pop(); } mtx.unlock(); } if(!message.empty()) { for(auto ptr = clients_send.begin(); ptr != clients_send.end(); ptr ++) { std::cout << "send message" << std::endl; ptr->get()->async_send(message); } message.clear(); continue; } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } }
main.cpp
#include "boostServer.h" #define DEFAULT_SERVER_IP "127.0.0.1" #define DEFAULT_SERVER_PORT1 8888 #define DEFAULT_SERVER_PORT2 9999 int main(int argc, char **argv) { boost::asio::io_context ctx; BoostServer server(ctx, DEFAULT_SERVER_PORT1, DEFAULT_SERVER_PORT2, DEFAULT_SERVER_IP, DEFAULT_SERVER_IP); ctx.run(); return 0; }
CMakeLists.txt
cmake_minimum_required(VERSION 3.0.0) project(boostServer VERSION 0.1.0) set(CPACK_PROJECT_NAME ${PROJECT_NAME}) set(CPACK_PROJECT_VERSION ${PROJECT_VERSION}) #boost库检查 find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED) if(Boost_FOUND) include_directories(${Boost_INCLUDE_DIRS}) link_directories(${Boost_LIBRARY_DIRS}) else() message("check boost lib failed!") endif() add_executable(boostServer main.cpp boostServer.h boostServer.cpp ) target_link_libraries(${PROJECT_NAME} pthread libboost_thread.a libboost_filesystem.a libboost_log_setup.a libboost_log.a libboost_locale.a libboost_coroutine.a libboost_context.a )