分布式各系统时间统一程序

简介: 分布式各系统时间统一程序

1、背景

使用场景是在一个大型分布式系统下,对时间有一个较高的水平要求。因为需要矫正每台运行服务的主机时间。


2、Cristian’s algorithm 算法(克里斯蒂安算法)

Cristian's algorithm 算法是一种时钟同步算法, 用于通过客户端进程与时间服务器同步时间。此算法适用于低延迟网络, 其中往返时间与准确性相比, 它是短的, 而易于冗余的分布式系统/应用程序与该算法不能并存。这里的往返时间是指请求开始到相应响应结束之间的持续时间。

3、实现思路

令T_0 为客户端获取时钟时间请求的本地时间。

令T_1 为客户端收到服务端回执的包的本地时间。

3.1、步骤:

1、首先客户端发送时钟时间请求包;

2、服务端收到立即给该客户端回执一个服务器本地时间的包;

3、客户端收到服务器发来的回执包,进行误差计算。

3.2、公式

公式:

其中:

T_0 为客户端获取时钟时间请求的本地时间。

T_1 为客户端收到服务端回执的包的本地时间。

T_server为服务器返回的时钟时间

T_client 同步时钟时间

T_1 - T_0指网络和服务器将请求传输到服务器, 处理请求并将响应返回给客户端进程所花费的总时间;这里假设网络延迟T_0 和T_1大致相等。


客户端的时间最多与服务端时间差(T_1 - T_0)/2。


因此,我们以t作为最大容忍误差。


4、具体代码

4.1、构建时间戳

既然传输的包中需要有本地时间。所以我们需要构建一个适合自己的时间戳,当然你也可以用C++标准库自带的。

struct DateTime
{
  int year;
  int month;
  int day;
  int hour;
  int min;
  int sec;
  int milliSec;
};

为了后续对时间进行处理运算,这里提供了一些便捷的方法。

例如 自定义时间戳转QDatetime、自定义时间戳转SYSTEMTIME等等

  QDateTime DateTime::ToQDateTime() const
  {
    QString str = QString("%1-%2-%3 %4:%5:%6 %7")
      .arg(year).arg(month).arg(day).arg(hour).arg(min).arg(sec).arg(milliSec);
    QDateTime time = QDateTime::fromString(str, "yyyy-M-d h:m:s z");
    return time;
  }
  SYSTEMTIME DateTime::ToSystmTime() const
  {
    SYSTEMTIME time;
    time.wYear = year;
    time.wMonth = month;
    time.wDay = day;
    time.wHour = hour;
    time.wMinute = min;
    time.wSecond = sec;
    time.wMilliseconds = milliSec;
    return time;
  }
  void DateTimeFrom::QDateTime(const QDateTime& time)
  {
    year = time.date().year();
    month = time.date().month();
    day = time.date().day();
    hour = time.time().hour();
    min = time.time().minute();
    sec = time.time().second();
    milliSec = time.time().msec();
  }
  DateTime DateTime::operator -(const DateTime& other)
  {
    QDateTime t1 = this->ToQDateTime();
    QDateTime t2 = other.ToQDateTime();
    qint64 d1 = t1.toMSecsSinceEpoch();
    qint64 d2 = t2.toMSecsSinceEpoch();
    QDateTime interval = QDateTime::fromMSecsSinceEpoch(abs(t1.toMSecsSinceEpoch() - t2.toMSecsSinceEpoch()));
    DateTime time;
    time.FromQDateTime(interval);
    time.hour -= 8;
    return time;
  }
  DateTime DateTime::operator +(const DateTime& other)
  {
    QDateTime t1 = this->ToQDateTime();
    QDateTime t2 = other.ToQDateTime();
    QDateTime interval = QDateTime::fromMSecsSinceEpoch(t1.toMSecsSinceEpoch() + t2.toMSecsSinceEpoch());
    DateTime time;
    time.FromQDateTime(interval);
    return time;
  }
  DateTime& DateTime::operator +=(const DateTime& other)
  {
    QDateTime t1 = this->ToQDateTime();
    QDateTime t2 = other.ToQDateTime();
    QDateTime interval = QDateTime::fromMSecsSinceEpoch(t1.toMSecsSinceEpoch() + t2.toMSecsSinceEpoch());
    FromQDateTime(interval);
    return *this;
  }
};


4.2、定义数据包

由于我们只需要一个时间戳并且我们需要标识一下该包是请求还是回执包,以及自身的IP信息。所以这个包将很简单。state的范围:1-请求 2-回执

struct ClockPag
{
  char client[40]{ 0 };
  int state = 0; 
  DateTime time;
};


4.3、客户端实现

头文件 ClockClient.h

#pragma once
#include "DateTime.h"
#include <QUdpsocket>
#include <QNetworkDatagram>
#include <QTimer>
extern int g_step;
constexpr int gc_port = 10023;
constexpr int gc_recvPort = 10024;
class ClockClient : public QObject
{
  Q_OBJECT
public:
  ClockClient();
  void Start(int step = g_step * 1000);
  void Stop();
private:
  bool CorrectionTime();
  void SendLocaltime();
  QString GetLocalIp();
  bool InitNetWork();
public:
  DWORD AppOfSelfCheck(LPCTSTR name);
  HANDLE GetProcessHandleByID(DWORD nID);
  std::vector<DWORD> GetProcessIDByname(const char * name);
private slots:
  void onTimerServer();
private:
  QString m_localIp;
  qint32 m_localPort;
  ClockPag m_clockPag;
  QUdpSocket m_send;
  QUdpSocket m_rece;
  DateTime m_oldTime;
  DateTime m_currTime;
  DateTime m_serviceTime;
  QTimer m_timer;
  QList<QHostAddress> ipAddressesList;
};


源文件 ClockClient.cpp

#include "ClockClient.h"
#include <iostream>
#include <minwindef.h>
#include <QList>
#include <QNetworkInterface>
extern int g_step;
ClockClient::ClockClient()
  : m_localIp{ GetLocalIp() }
  , m_localPort{ gc_port }
{
  bool ret = connect(&m_timer, &QTimer::timeout, this, &ClockClient::SendLocaltime);
  if (false == ret)
  {
    std::cout << "绑定失败!" << std::endl;
    exit(EXIT_FAILURE);
  }
  if (false == InitNetWork())
  {
    std::cout << "网络初始化失败!" << std::endl;
    exit(EXIT_FAILURE);
  }
}
bool ClockClient::InitNetWork()
{
  bool ret = m_rece.bind(gc_recvPort, QUdpSocket::ShareAddress | QUdpSocket::ReuseAddressHint);
  if (false == ret)
  {
    return false;
  }
  ret = connect(&m_rece, &QUdpSocket::readyRead, this, &ClockClient::onTimerServer);
  return ret;
}
void ClockClient::Start(int step)
{
  m_timer.start(step); // step:10s
}
void ClockClient::Stop()
{
  m_timer.stop();
}
void ClockClient::onTimerServer()
{
  while (true == m_rece.hasPendingDatagrams())
  {
    QByteArray ba = m_rece.receiveDatagram().data();
    char* buffer = new char[ba.size()];
    memcpy(buffer, ba.data(), ba.size());
    memset(m_clockPag.client, 0, sizeof(m_clockPag.client));
    m_clockPag = *reinterpret_cast<ClockPag*>(buffer);
    QString ip = m_rece.receiveDatagram().senderAddress().toString();
    //ip = ip.mid(7);
    if (m_localIp != QString(m_clockPag.client)
      || ip == m_localIp || m_clockPag.state != 2)
    {
      delete[] buffer;
      buffer = nullptr;
      continue;
    }
    m_clockPag.state = 3;
    m_serviceTime = m_clockPag.time;//服务器时间
    m_currTime.FromQDateTime(QDateTime::currentDateTime()); // 本地时间
    delete[] buffer;
    buffer = nullptr;
    std::cout << "设置状态:" << CorrectionTime() << std::endl;
  }
}
bool ClockClient::CorrectionTime()
{
  DateTime rtt = m_currTime - m_oldTime;
  rtt.hour += 8;
  qint64 eee = rtt.ToQDateTime().toMSecsSinceEpoch();
  qint64 rtt2 = rtt.ToQDateTime().toMSecsSinceEpoch() / 2; // 平均延时
  std::cout << "校验周期(毫秒)" << rtt2 << std::endl;
  DateTime midTime;
  midTime.FromQDateTime(QDateTime::fromMSecsSinceEpoch(rtt2));
  DateTime localTime = m_serviceTime + midTime;
  //if(m_serviceTime.ToQDateTime().toMSecsSinceEpoch() >)
  if (localTime.ToQDateTime().toMSecsSinceEpoch() > m_currTime.ToQDateTime().toMSecsSinceEpoch())
  {
    DateTime tmp = localTime - m_currTime;
    tmp.hour += 8;
    std::cout << "本地误差(毫秒)" << tmp.ToQDateTime().toMSecsSinceEpoch() << std::endl;
  }
  else
  {
    DateTime tmp = m_currTime - localTime;
    tmp.hour += 8;
    std::cout << "本地误差(毫秒)" << tmp.ToQDateTime().toMSecsSinceEpoch() << std::endl;
  }
  SYSTEMTIME myTime = localTime.ToSystmTime();
  return SetLocalTime(&myTime);
}
void ClockClient::SendLocaltime()
{
  DateTime currTime{};
  currTime.FromQDateTime(QDateTime::currentDateTime());
  m_clockPag.time = currTime;
  memset(m_clockPag.client, 0, sizeof(m_clockPag.client));
  char* ptr = const_cast<char*>(m_localIp.toStdString().c_str());
  std::string str = m_localIp.toLocal8Bit().toStdString();
  m_clockPag.state = 1;
  for (int i = 0; i < m_localIp.size(); ++i)
  {
    m_clockPag.client[i] = m_localIp[i].toLatin1();
  }
  int len = m_send.writeDatagram(reinterpret_cast<char*>(&m_clockPag), sizeof(m_clockPag),
    QHostAddress::Broadcast, m_localPort);
  if (len != sizeof(m_clockPag))
  {
    return ;
  }
  m_oldTime = currTime;
  return ;
}
QString ClockClient::GetLocalIp()
{
  QString ipAddress;
  foreach(QNetworkInterface netInterface, QNetworkInterface::allInterfaces())
  {
    QList<QNetworkAddressEntry>  entryList = netInterface.addressEntries();
    foreach(QNetworkAddressEntry entry, entryList)
    {
      if (entry.ip().protocol() == QAbstractSocket::IPv4Protocol && entry.ip() != QHostAddress::LocalHost)
      {
        QString LocalIP = entry.ip().toString();
        if (LocalIP.startsWith("192.", Qt::CaseSensitive))
        {
          ipAddress = entry.ip().toString();
        }
      }
    }
  }
  return ipAddress;
}

主函数 main.cpp

#include "ClockClient.h"
#include <iostream>
#include <string>
#include <string.h>
#include "Windows.h"
#include <tchar.h>
#include <comdef.h>
#include "tlhelp32.h"
int g_step = 20;
int main(int argc, char* argv[])
{
  QCoreApplication a(argc, argv);
  ClockClient client;
  client.Start();
  return a.exec();
}


4.3、服务端实现

头文件 ClockServer.h

#pragma once
#include "DateTime.h"
#include <QUdpsocket>
#include <QNetworkDatagram>
constexpr int gc_port = 10023;
constexpr int gc_recvPort = 10024;
class ClockServer : public QObject
{
  Q_OBJECT
public:
  ClockServer();
  HANDLE GetProcessHandleByID(DWORD nID);
  std::vector<DWORD> GetProcessIDByname(const char * name);
  void AppOfSelfCheck(const char * name);
private:
  bool InitNetWork();
  void onCheckTime();
private:
  qint32 m_localPort;
  QUdpSocket m_send;
  QUdpSocket m_rece;
};


源文件 ClockServer.cpp

#include "ClockServer.h"
#include <iostream>
ClockServer::ClockServer()
  :m_localPort{gc_port}
{
  bool ret = InitNetWork();
  if (false == ret)
  {
    std::cout << "网络初始化失败" << std::endl;
    exit(EXIT_FAILURE);
  }
}
bool ClockServer::InitNetWork()
{
  bool ret = m_rece.bind(/*QHostAddress(m_localIp),*/ m_localPort);
  if (false == ret)return ret;
  ret = connect(&m_rece, &QUdpSocket::readyRead, this, &ClockServer::onCheckTime);
  m_rece.setSocketOption(QAbstractSocket::MulticastLoopbackOption, 0);
  return ret;
}
void ClockServer::onCheckTime()
{
  while (true == m_rece.hasPendingDatagrams())
  {
    QByteArray ba = m_rece.receiveDatagram().data();
    char* buffer = new char[ba.size()];
    if(nullptr == buffer)continue;
    memcpy(buffer, ba.data(), ba.size());
    ClockPag data;
    data = *reinterpret_cast<ClockPag*>(buffer);
    delete[] buffer;
    buffer = nullptr;
    if (data.state != 1)
    {
      continue;
    }
    int port = gc_port;
    DateTime currTime{};
    currTime.FromQDateTime(QDateTime::currentDateTime());
    data.state = 2;
    data.time = currTime;
    int len = m_send.writeDatagram(reinterpret_cast<char*>(&data),
      sizeof(data),
      QHostAddress::Broadcast, gc_recvPort);
    qDebug() << u8"收到 ip:" << QString(data.client) << u8"的时钟校验包";
  }
}


主程序mian.cpp

#include <QtCore/QCoreApplication>
#include "ClockServer.h"
int main(int argc, char* argv[])
{
  QCoreApplication a(argc, argv);
  ClockServer server;
  return a.exec();
}


说明

这里我删减了一些不重要代码,原本都是纯后台的。所以我懒得再改代码截效果了。请读者自便。有技术问题请联系我。

目录
相关文章
|
安全 网络安全 数据安全/隐私保护
【计算机网络】网络安全 : 计算机网络安全威胁 ( 四种网络攻击类型 | 主动攻击与被动攻击 | 分布式拒绝服务攻击 DDos | 恶意程序 | 计算机网络安全目标)
【计算机网络】网络安全 : 计算机网络安全威胁 ( 四种网络攻击类型 | 主动攻击与被动攻击 | 分布式拒绝服务攻击 DDos | 恶意程序 | 计算机网络安全目标)
919 0
|
11月前
|
机器人 API
分布式各系统时间统一程序
分布式各系统时间统一程序
51 0
|
11月前
|
算法
分布式各系统时间统一程序
使用场景是在一个大型分布式系统下,对时间有一个较高的水平要求。因为需要矫正每台运行服务的主机时间。
65 0
|
12月前
zookeeper实现分布式应用系统服务器上下线动态感知程序、监听机制与守护线程
zookeeper实现分布式应用系统服务器上下线动态感知程序、监听机制与守护线程
68 0
|
资源调度 Java 程序员
Flink分布式程序的异常处理
Flink分布式程序的异常处理
Flink分布式程序的异常处理
|
消息中间件 存储 Java
Apache Thrift在分布式程序中的应用
Apache Thrift在分布式程序中的应用
Apache Thrift在分布式程序中的应用
|
弹性计算 运维 负载均衡
服务器的“服务器”,enginx之于分布式部署的创新意义:使任何服务器程序秒变集群
本文关键字:nginx,元服务器,单机集群,分布式集群,集群引擎
471 0
服务器的“服务器”,enginx之于分布式部署的创新意义:使任何服务器程序秒变集群
|
编解码 中间件 存储
使用.NET Core搭建分布式音频效果处理服务(二)创建基于FFMpeg的Web程序
准备工作: 1:Net Core 2.1 为何要用2.1,因为在macOS 10.13上一个奇怪的问题,请看另一篇博文介绍 2:FFmpeg 版本无所谓,最新即可,安装教程网上很多,当然也可以使用docker进行ffmpeg的部署,下载地址 http://ffmpeg.org/download.html 3:Rider 2018.1.2 在多平台上,比微软爸爸的VS好用,在window上肯定不如VS,当然你喜欢VSCODE也可以,毕竟JET的全家桶更好吃,哈哈。
1869 0
|
存储 缓存 数据库
使用.NET Core搭建分布式音频效果处理服务(三)完成音频合成效果处理程序
上一节我们已经介绍了FFmpeg在Net Core中的简单应用,这一节我们将根据之前的功能需求和解决方案,进行项目的详细设计工作。   画个流程图 先阐述一下流程,如下图: 整个流程其实非常简单,客户端(无论桌面软件、还是原生APP、还是HTML网页)通过一个统一的接口进行调用,我们这里定义这个接口名称叫AudioSynthesisSync吧,为何名称后面还要加个同步(也是命名规范),目前来说这个接口就属于同步的,异步方式后续再一一解答。
1390 0