Cnsumer.h
#pragma once #include <boost/asio/io_service.hpp> #include <boost/asio/deadline_timer.hpp> #include <network/EndpointDescription.h> #include "IArchive.h" #include "IPlayer.h" #include "Convert.h" #include <messages/Messages.h> // for msg register #include <Util/Log.h> #include <thread> #include <atomic> //#include "CriticalSection.h" #include "RdKafkaProducer.h" #define MAX_THREAD 16 namespace sinftech { namespace collector { class Consumer : archive::IPlayer::Listener { public: Consumer( archive::IArchive& archive, bool& bRun ); ~Consumer(); void start(const archive::Time& time, const uint8_t& speed); void stop(); void setEndTime(const archive::Time& endTime); void ExtractData(const archive::ByteArray& data, const archive::Time& playTime, const int threadIndex); void RunThread(); void JoinThread(); private: void onStarted() override; void onStopped() override; void onDataExtracted(archive::ByteArray&& data, const archive::Time& playTime) override; private: bool& _bRun; archive::IArchive& _archive; archive::PlayerPtr _player; archive::Time _startTime; archive::Time _curTime; archive::Time _endTime; std::thread _threads[MAX_THREAD]; std::unique_ptr<boost::asio::io_service> _services[MAX_THREAD]; std::unique_ptr<boost::asio::io_service::work> _works[MAX_THREAD]; const boost::posix_time::ptime _epoch; //CriticalSection _crs; //std::set<Target2> _vTargs; Target2* _targs[MAX_THREAD]; const uint8_t S_CLASS_PB_LEVEL = 1; const uint32_t _maxSendTarg; std::atomic<int> _threadCount; std::atomic<int> _targetCount[MAX_THREAD]; RdKafkaProducer kafkaInstance[MAX_THREAD]; }; }//namespace collector }//namespace sinftech
Consumer.cpp
#include "stdafx.h" #include "Consumer.h" #include <thread> #include <memory> #include <boost/bind.hpp> #include <boost/asio/placeholders.hpp> #include "Factory.h" #include "Resource.h" #include "UniquePtrCast.h" #include "PlaySerialization.h" #include <boost/date_time/posix_time/posix_time.hpp> #include <zlib/zlib.h> #include "RdKafkaProducer.h" #include <json/json.h> namespace sinftech { namespace collector { namespace pt = boost::posix_time; Consumer::Consumer(archive::IArchive& archive, bool& bRun) : _archive(archive) , _player(archive::create_player(archive, *this)) , _bRun(bRun) , _threadCount(0) , _epoch(boost::gregorian::date(1970, 1, 1)) , _maxSendTarg(YamlConfig::GetInstance().maxTargets) { for (int i = 0;i < MAX_THREAD;i++) { _targs[i] = new Target2[_maxSendTarg]; memset(_targs[i], 0, sizeof(Target2) * _maxSendTarg); _targetCount[i] = 0; _services[i] = std::make_unique<boost::asio::io_service>(); _works[i] = std::make_unique<boost::asio::io_service::work>(*_services[i]); } RunThread(); } Consumer::~Consumer() { stop(); for (const auto& service : _services) service->stop(); JoinThread(); _player.release(); } void Consumer::RunThread() { for (int i = 0;i < MAX_THREAD;i++) { _threads[i] = std::thread([this, i] { try { _services[i]->run(); } catch (const std::exception& e) { util::log(std::string("playback timer exception: ") + e.what()); throw; } catch (...) { util::log(std::string("playback timer exception: unknown")); throw; } }); } } void Consumer::JoinThread() { for (int i = 0;i < MAX_THREAD;i++) { if (_threads[i].joinable()) { _threads[i].join(); } } } void Consumer::setEndTime(const archive::Time& endTime) { _endTime = endTime; _player->setEndTime(endTime); } void Consumer::start(const archive::Time& time, const uint8_t& speed) { //stop(); auto offsets = _player->timeToOffset(time); if (!offsets.empty()) { _startTime = time; archive::Offset offset = offsets.back(); _player->start(offset, (archive::SpeedCoefficient&)speed); } } void Consumer::stop() { _player->stop(); _player.release(); } void Consumer::onStarted() { _bRun = true; util::log("onStarted, playbackTime:" + boost::posix_time::to_simple_string(_startTime)); } void Consumer::onStopped() { _bRun = false; util::log("onStopped, archive file read finish! playbackTime:" + boost::posix_time::to_simple_string(_curTime)); // _curTime not-a-date-time表示中断!!! //PostMessageW(AfxGetApp()->GetMainWnd()->GetSafeHwnd(), COMMAND_A, COMMAND_A, 0); } std::string GetStringTime(long long timestamp) { auto milli = timestamp/* + (long long)8 * 60 * 60 * 1000*/; auto mTime = std::chrono::milliseconds(milli); auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime); auto tt = std::chrono::system_clock::to_time_t(tp); std::tm *now = std::gmtime(&tt); std::string tm = std::to_string(now->tm_year + 1900) + "-"; tm = tm + std::to_string(now->tm_mon + 1) + "-"; tm = tm + std::to_string(now->tm_mday) + " "; tm = tm + std::to_string(now->tm_hour) + ":"; tm = tm + std::to_string(now->tm_min) + ":"; tm = tm + std::to_string(now->tm_sec); return tm; } void Consumer::onDataExtracted(archive::ByteArray&& data, const archive::Time& playTime) { _curTime = playTime; _services[_threadCount]->post(std::bind(&Consumer::ExtractData, this, data, playTime, _threadCount.load())); _threadCount = ++_threadCount % 4; } void Consumer::ExtractData(const archive::ByteArray& data, const archive::Time& playTime, const int threadIndex) { auto result = deserialize(data); if (!result.message) return; if (typeid(*result.message) == typeid(msg::display::RefreshTargetDescription)) { msg::display::RefreshTargetDescription* message = reinterpret_cast<msg::display::RefreshTargetDescription*>(&(*result.message)); Json::Value root; root["pos"]["displayId"] = message->displayId; root["pos"]["mmsi"] = message->mmsi; root["pos"]["id_r"] = message->radarId; root["pos"]["state"] = message->predicted ? 2 : 1; root["pos"]["quality"] = message->quality; root["pos"]["period"] = message->period; root["pos"]["geoPnt"]["latitude"] = message->position.latitude(); root["pos"]["geoPnt"]["longitude"] = message->position.longitude(); root["pos"]["course"] = message->course; root["pos"]["speed"] = message->speed; root["pos"]["heading"] = message->heading; root["pos"]["len"] = message->length; root["pos"]["wid"] = message->width; root["pos"]["shiptype"] = message->shipType; root["pos"]["flags"] = message->flags; root["pos"]["s_class"] = Convert::_fromShipClass(message->shipClass); root["pos"]["m_mmsi"] = message->mothershipMmsi; root["pos"]["imo"] = message->imo; root["pos"]["warning"] = (int)geo::AlarmLevel(message->warningType); root["pos"]["id_zone"] = message->warningZoneId; root["pos"]["w_time"] = message->warningTime; root["pos"]["w_distance"] = message->warningDistance; root["pos"]["id"] = message->uniqueId; root["pos"]["fleetId"] = message->fleetId; root["pos"]["rec_course"] = message->recomendedCourse; root["pos"]["rec_speed"] = message->recomendedSpeed; root["pos"]["fairwayFlag"] = Convert::_fromFairwayBinding(message->fairwayBinding); root["pos"]["Vessel_Name"] = Convert::ws2s(message->vesselName); root["pos"]["Vendor_ID"] = message->vendorId; root["pos"]["Call_Sign"] = message->callSign; root["pos"]["comment"] = Convert::ws2s(message->comment); root["pos"]["fairway"] = Convert::ws2s(message->fairway); root["sost"] = message->predicted ? 2 : 1; root["id"] = message->uniqueId; root["lastTm"] = (playTime - _epoch).total_milliseconds(); std::string strJson = root.toStyledString(); RdKafkaProducer::GetInstance().PushToKafka(strJson.data(), strJson.size(), KAFUKA_TOPIC::UNION_TARGET_TOPIC); } if (typeid(*result.message) == typeid(msg::display::RefreshTargetCoordinates)) { msg::display::RefreshTargetCoordinates* message = reinterpret_cast<msg::display::RefreshTargetCoordinates*>(&(*result.message)); Json::Value root; root["pos"]["displayId"] = message->displayId; root["pos"]["geoPnt"]["latitude"] = message->position.latitude(); root["pos"]["geoPnt"]["longitude"] = message->position.longitude(); root["pos"]["course"] = message->course; root["pos"]["speed"] = message->speed; root["pos"]["heading"] = message->heading; root["pos"]["mmsi"] = message->mmsi; root["pos"]["id_r"] = message->radarId; root["pos"]["len"] = message->length; root["pos"]["period"] = message->period; root["pos"]["id"] = message->uniqueId; root["pos"]["s_class"] = Convert::_fromShipClass(message->shipClass); root["sost"] = message->predicted ? 2 : 1; root["lastTm"] = (playTime - _epoch).total_milliseconds(); root["id"] = message->uniqueId; std::string strJson = root.toStyledString(); RdKafkaProducer::GetInstance().PushToKafka(strJson.data(), strJson.size(), KAFUKA_TOPIC::UNION_TARGET_TOPIC); //std::ofstream ofs("data2.txt", std::ios::app); //ofs << "displayId:" << message->displayId << ", mmsi:" << message->mmsi // << ", lat:" << message->position.latitude() // << ", lon:" << message->position.longitude() // << ", course:" << message->course // << ", heading:" << message->heading // << ", speed:" << message->speed // << ", time:" << boost::posix_time::to_simple_string(playTime) // << std::endl; //std::ofstream ofs("data2.txt", std::ios::app); //ofs << "displayId:" << message->displayId << ", boost time:" << boost::posix_time::to_simple_string(playTime) << ", stl time:" << GetStringTime(targ.lastTm) << ", count1:" << ++count1 << std::endl; } } }//namespace collector }//namespace sinftech