boost asio多线程

简介: boost asio多线程

Cnsumer.h

#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <network/EndpointDescription.h>
#include "IArchive.h"
#include "IPlayer.h"
#include "Convert.h"
#include <messages/Messages.h>  // for msg register
#include <Util/Log.h>
#include <thread>
#include <atomic>
//#include "CriticalSection.h"
#include "RdKafkaProducer.h"
#define MAX_THREAD 16
namespace sinftech {
namespace collector {
class Consumer : archive::IPlayer::Listener {
public:
  Consumer(
    archive::IArchive& archive,
  bool& bRun
    );
  ~Consumer();
  void start(const archive::Time& time, const uint8_t& speed);
  void stop();
  void setEndTime(const archive::Time& endTime);
  void ExtractData(const archive::ByteArray& data, const archive::Time& playTime, const int threadIndex);
  void RunThread();
  void JoinThread();
private:
  void onStarted() override;
  void onStopped() override;
  void onDataExtracted(archive::ByteArray&& data, const archive::Time& playTime) override;
private:
  bool& _bRun;
  archive::IArchive& _archive;
  archive::PlayerPtr _player;
  archive::Time _startTime;
  archive::Time _curTime;
  archive::Time _endTime;
  std::thread _threads[MAX_THREAD];
  std::unique_ptr<boost::asio::io_service> _services[MAX_THREAD];
  std::unique_ptr<boost::asio::io_service::work> _works[MAX_THREAD];
  const boost::posix_time::ptime _epoch;
  //CriticalSection _crs;
  //std::set<Target2> _vTargs;
  Target2* _targs[MAX_THREAD];
  const uint8_t S_CLASS_PB_LEVEL = 1;
  const uint32_t _maxSendTarg;
  std::atomic<int> _threadCount;
  std::atomic<int> _targetCount[MAX_THREAD];
  RdKafkaProducer kafkaInstance[MAX_THREAD];
};
}//namespace collector
}//namespace sinftech

Consumer.cpp

#include "stdafx.h"
#include "Consumer.h"
#include <thread>
#include <memory>
#include <boost/bind.hpp>
#include <boost/asio/placeholders.hpp>
#include "Factory.h"
#include "Resource.h"
#include "UniquePtrCast.h"
#include "PlaySerialization.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <zlib/zlib.h>
#include "RdKafkaProducer.h"
#include <json/json.h>
namespace sinftech {
namespace collector {
namespace pt = boost::posix_time;
Consumer::Consumer(archive::IArchive& archive, bool& bRun)
  : _archive(archive)
  , _player(archive::create_player(archive, *this))
  , _bRun(bRun)
  , _threadCount(0)
  , _epoch(boost::gregorian::date(1970, 1, 1))
  , _maxSendTarg(YamlConfig::GetInstance().maxTargets)
{
  for (int i = 0;i < MAX_THREAD;i++) {
    _targs[i] = new Target2[_maxSendTarg];
    memset(_targs[i], 0, sizeof(Target2) * _maxSendTarg);
    _targetCount[i] = 0;
    _services[i] = std::make_unique<boost::asio::io_service>();
    _works[i] = std::make_unique<boost::asio::io_service::work>(*_services[i]);
  }
  RunThread();
}
Consumer::~Consumer() {
  stop();
  for (const auto& service : _services)
    service->stop();
  JoinThread();
  _player.release();
}
void Consumer::RunThread() {
  for (int i = 0;i < MAX_THREAD;i++)
  {
    _threads[i] = std::thread([this, i] {
      try {
        _services[i]->run();
      }
      catch (const std::exception& e) {
        util::log(std::string("playback timer exception: ") + e.what());
        throw;
      }
      catch (...) {
        util::log(std::string("playback timer exception: unknown"));
        throw;
      }
    });
  }
}
void Consumer::JoinThread() {
  for (int i = 0;i < MAX_THREAD;i++)
  {
    if (_threads[i].joinable()) {
      _threads[i].join();
    }
  }
}
void Consumer::setEndTime(const archive::Time& endTime)
{
  _endTime = endTime;
  _player->setEndTime(endTime);
}
void Consumer::start(const archive::Time& time, const uint8_t& speed) {
  //stop();
  auto offsets = _player->timeToOffset(time);
  if (!offsets.empty()) {
  _startTime = time;
    archive::Offset offset = offsets.back();
    _player->start(offset, (archive::SpeedCoefficient&)speed);
  }
}
void Consumer::stop() {
  _player->stop();
  _player.release();
}
void Consumer::onStarted() {
  _bRun = true;
  util::log("onStarted, playbackTime:" + boost::posix_time::to_simple_string(_startTime));
}
void Consumer::onStopped() {
  _bRun = false;
  util::log("onStopped, archive file read finish! playbackTime:" + boost::posix_time::to_simple_string(_curTime));    // _curTime not-a-date-time表示中断!!!
  //PostMessageW(AfxGetApp()->GetMainWnd()->GetSafeHwnd(), COMMAND_A, COMMAND_A, 0);
}
std::string GetStringTime(long long timestamp)
{
  auto milli = timestamp/* + (long long)8 * 60 * 60 * 1000*/;
  auto mTime = std::chrono::milliseconds(milli);
  auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);
  auto tt = std::chrono::system_clock::to_time_t(tp);
  std::tm *now = std::gmtime(&tt);
  std::string tm = std::to_string(now->tm_year + 1900) + "-";
  tm = tm + std::to_string(now->tm_mon + 1) + "-";
  tm = tm + std::to_string(now->tm_mday) + " ";
  tm = tm + std::to_string(now->tm_hour) + ":";
  tm = tm + std::to_string(now->tm_min) + ":";
  tm = tm + std::to_string(now->tm_sec);
  return tm;
}
void Consumer::onDataExtracted(archive::ByteArray&& data, const archive::Time& playTime) {
  _curTime = playTime;
  _services[_threadCount]->post(std::bind(&Consumer::ExtractData, this, data, playTime, _threadCount.load()));
  _threadCount = ++_threadCount % 4;
}
void Consumer::ExtractData(const archive::ByteArray& data, const archive::Time& playTime, const int threadIndex) {
  auto result = deserialize(data);
  if (!result.message)
    return;
  if (typeid(*result.message) == typeid(msg::display::RefreshTargetDescription))
  {
    msg::display::RefreshTargetDescription* message = reinterpret_cast<msg::display::RefreshTargetDescription*>(&(*result.message));
    Json::Value root;
    root["pos"]["displayId"] = message->displayId;
    root["pos"]["mmsi"] = message->mmsi;
    root["pos"]["id_r"] = message->radarId;
    root["pos"]["state"] = message->predicted ? 2 : 1;
    root["pos"]["quality"] = message->quality;
    root["pos"]["period"] = message->period;
    root["pos"]["geoPnt"]["latitude"] = message->position.latitude();
    root["pos"]["geoPnt"]["longitude"] = message->position.longitude();
    root["pos"]["course"] = message->course;
    root["pos"]["speed"] = message->speed;
    root["pos"]["heading"] = message->heading;
    root["pos"]["len"] = message->length;
    root["pos"]["wid"] = message->width;
    root["pos"]["shiptype"] = message->shipType;
    root["pos"]["flags"] = message->flags;
    root["pos"]["s_class"] = Convert::_fromShipClass(message->shipClass);
    root["pos"]["m_mmsi"] = message->mothershipMmsi;
    root["pos"]["imo"] = message->imo;
    root["pos"]["warning"] = (int)geo::AlarmLevel(message->warningType);
    root["pos"]["id_zone"] = message->warningZoneId;
    root["pos"]["w_time"] = message->warningTime;
    root["pos"]["w_distance"] = message->warningDistance;
    root["pos"]["id"] = message->uniqueId;
    root["pos"]["fleetId"] = message->fleetId;
    root["pos"]["rec_course"] = message->recomendedCourse;
    root["pos"]["rec_speed"] = message->recomendedSpeed;
    root["pos"]["fairwayFlag"] = Convert::_fromFairwayBinding(message->fairwayBinding);
    root["pos"]["Vessel_Name"] = Convert::ws2s(message->vesselName);
    root["pos"]["Vendor_ID"] = message->vendorId;
    root["pos"]["Call_Sign"] = message->callSign;
    root["pos"]["comment"] = Convert::ws2s(message->comment);
    root["pos"]["fairway"] = Convert::ws2s(message->fairway);
    root["sost"] = message->predicted ? 2 : 1;
    root["id"] = message->uniqueId;
    root["lastTm"] = (playTime - _epoch).total_milliseconds();
    std::string strJson = root.toStyledString();
    RdKafkaProducer::GetInstance().PushToKafka(strJson.data(), strJson.size(), KAFUKA_TOPIC::UNION_TARGET_TOPIC);
  }
  if (typeid(*result.message) == typeid(msg::display::RefreshTargetCoordinates))
  {
    msg::display::RefreshTargetCoordinates* message = reinterpret_cast<msg::display::RefreshTargetCoordinates*>(&(*result.message));
    Json::Value root;
    root["pos"]["displayId"] = message->displayId;
    root["pos"]["geoPnt"]["latitude"] = message->position.latitude();
    root["pos"]["geoPnt"]["longitude"] = message->position.longitude();
    root["pos"]["course"] = message->course;
    root["pos"]["speed"] = message->speed;
    root["pos"]["heading"] = message->heading;
    root["pos"]["mmsi"] = message->mmsi;
    root["pos"]["id_r"] = message->radarId;
    root["pos"]["len"] = message->length;
    root["pos"]["period"] = message->period;
    root["pos"]["id"] = message->uniqueId;
    root["pos"]["s_class"] = Convert::_fromShipClass(message->shipClass);
    root["sost"] = message->predicted ? 2 : 1;
    root["lastTm"] = (playTime - _epoch).total_milliseconds();
    root["id"] = message->uniqueId;
    std::string strJson = root.toStyledString();
    RdKafkaProducer::GetInstance().PushToKafka(strJson.data(), strJson.size(), KAFUKA_TOPIC::UNION_TARGET_TOPIC);
    //std::ofstream ofs("data2.txt", std::ios::app);
    //ofs << "displayId:" << message->displayId << ", mmsi:" << message->mmsi
    // << ", lat:" << message->position.latitude()
    // << ", lon:" << message->position.longitude()
    // << ", course:" << message->course
    // << ", heading:" << message->heading
    // << ", speed:" << message->speed
    // << ", time:" << boost::posix_time::to_simple_string(playTime)
    // << std::endl;
    //std::ofstream ofs("data2.txt", std::ios::app);
    //ofs << "displayId:" << message->displayId << ", boost time:" << boost::posix_time::to_simple_string(playTime) << ", stl time:" << GetStringTime(targ.lastTm) << ", count1:" << ++count1 << std::endl;
  }
}
}//namespace collector
}//namespace sinftech
相关文章
|
3月前
|
C++
boost asio异步和stl异步的简单对比
boost asio异步和stl异步的简单对比
|
5月前
19.10 Boost Asio 同步文件传输
在原生套接字编程中我们介绍了利用文件长度来控制文件传输的方法,本节我们将采用另一种传输方式,我们通过判断字符串是否包含`goodbye lyshark`关键词来验证文件是否传输结束了,当然了这种传输方式明显没有根据长度传输严谨,但使用这种方式也存在一个有点,那就是无需确定文件长度,因为无需读入文件所以在传输速度上要快一些,尤其是面对大文件时。服务端代码如下所示,在代码中我们分别封装实现`recv_remote_file`该函数用于将远程特定目录下的文件拉取到本地目录下,而`send_local_file`函数则用于将一个本地文件传输到对端主机上,这两个函数都接收三个参数,分别是套接字句柄,本地
55 0
19.10 Boost Asio 同步文件传输
|
5月前
19.3 Boost Asio 多线程通信
多线程服务依赖于两个通用函数,首先`boost::bind`提供了一个高效的、简单的方法来创建函数对象和函数对象适配器,它的主要功能是提供了一种将函数和它的参数绑定到一起的方法,这种方法可以将具有参数的成员函数、普通函数以及函数对象转化为不带参数的函数对象。当参数绑定后则下一步就需要使用多线程功能,Boost库中提供了`boost::thread`库,`boost::thread`可以用于创建线程、启动线程、等待线程执行结束以及线程间通信等多种操,有了这两个关键库那么我们只需要`accept.accept(*sock)`等待套接字上线,当有套接字上线后则自动创建`MyThread`子线程,
47 0
19.3 Boost Asio 多线程通信
|
8月前
|
存储 设计模式 Java
4.7 C++ Boost 多线程并发库
C++语言并没有对多线程与网络的良好支持,虽然新的C++标准加入了基本的`thread`库,但是对于并发编程的支持仍然很基础,Boost库提供了数个用于实现高并发与网络相关的开发库这让我们在开发跨平台并发网络应用时能够像Java等语言一样高效开发。 thread库为C++增加了多线程处理能力,其主要提供了清晰的,互斥量,线程,条件变量等,可以很容易的实现多线程应用开发,而且该库是可跨平台的,并且支持`POSIX`和`Windows`线程。
104 0
|
3月前
|
网络协议
boost asio异步小析
boost asio异步小析
|
5月前
|
容器
19.12 Boost Asio 获取远程进程
远程进程遍历功能实现原理与远程目录传输完全一致,唯一的区别在于远程进程枚举中使用`EnumProcess`函数枚举当前系统下所有活动进程,枚举结束后函数返回一个`PROCESSENTRY32`类型的容器,其中的每一个成员都是一个进程信息,只需要对该容器进行动态遍历即可得到所有的远程主机列表。
27 0
19.12 Boost Asio 获取远程进程
|
5月前
|
存储 网络协议 API
19.0 Boost 基于ASIO网络编程技术
Boost ASIO库是一个基于C++语言的开源网络编程库,该库提供了成熟、高效、跨平台的网络API接口,并同时支持同步与异步两种模式,ASIO库提供了多重I/O对象、异步定时器、可执行队列、信号操作和协程等支持,使得开发者可以轻松地编写可扩展的高性能网络应用程序,同时保持代码简洁、易于维护。在学习`ASIO`库之前,我们先来实现一个简单的地址解析功能,Boost库中提供了`ip::tcp::resolver`对象,该对象可用于解析给定主机名和端口号的`IP`地址,学会使用这个对象即可实现对特定主机域名地址的解析功能,如下封装实现了`GetDNSAddress`该函数传入一个域名,并输出该域名
64 0
19.0 Boost 基于ASIO网络编程技术
|
网络协议 编译器 消息中间件