DataX教程(09)- DataX是如何做到限速的?

简介: DataX教程(09)- DataX是如何做到限速的?

01 引言


通过前面的博文,我们对DataX有了一定的深入了解了:


  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
  • 《DataX教程(03)- 源码解读(超详细版)
  • 《DataX教程(04)- 配置完整解读》
  • 《DataX教程(05)- DataX Web项目实践》
  • 《DataX教程(06)- DataX调优》
  • 《DataX教程(07)- 图解DataX任务分配及执行流程》
  • 《DataX教程(08)- 监控与汇报》


随着对DataX深入学习,我提出了一个疑问,究竟DataX是如何做到限速的?本文来讲解下。


02 逆向定位代码


我们知道是在core.json文件里面的speed方法里面限速DataX的,可以通过record记录数和byte字节数来限速:

3c7f672b284c40bd84ed63fe7294b351.png


这个配置在CoreConstant类里面定义了:

82637dcfb35741b9a24e2beb78609bce.png


选中常量,右键使用IDEA的Find Usages…可以看到有两个地方调用了这个值:

619b15a9b128436fa8b0184bcd52ed11.png


接下来,看看这两个配置在Channel类如何实现限速的。


03 Channel类里实现限速


从下图,可以看到在Channel初始化时,顺带初始化了限速的记录数(recordSpeed)以及字节数(byteSpeed) ,接下来Control+F看看recordSpeed在哪里调用了。

adecc747142c47e3b2c0120cd9ec4f16.png


可以看到在statPush方法里面用到了:

ce37d84103864df1a9bfc17a7e72699e.png


statPush整个流程的描述:


  1. 判断byteSpeed(bps)和recordSpeed(tps)是否都大于0?如果不是,则退出;
  2. 根据当前的byteSpeed和设定的byteSpeed对比,求出睡眠时间(公式:currentByteSpeed * interval / this.byteSpeed- interval;)
  3. 根据当前的recordSpeed和设定的recordSpeed对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;)
  4. 取休眠时间最大值;
  5. Thread.sleep(sleepTime)来休眠


下面贴上statPush的完整代码:

private void statPush(long recordSize, long byteSize) {
      currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
              recordSize);
      currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
              byteSize);
      //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
      currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
      currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
      boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
      boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
      if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
          return;
      }
      long lastTimestamp = lastCommunication.getTimestamp();
      long nowTimestamp = System.currentTimeMillis();
      long interval = nowTimestamp - lastTimestamp;
      if (interval - this.flowControlInterval >= 0) {
          long byteLimitSleepTime = 0;
          long recordLimitSleepTime = 0;
          if (isChannelByteSpeedLimit) {
              long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
                      CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
              if (currentByteSpeed > this.byteSpeed) {
                  // 计算根据byteLimit得到的休眠时间
                  byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
                          - interval;
              }
          }
          if (isChannelRecordSpeedLimit) {
              long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
                      CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
              if (currentRecordSpeed > this.recordSpeed) {
                  // 计算根据recordLimit得到的休眠时间
                  recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
                          - interval;
              }
          }
          // 休眠时间取较大值
          long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                  recordLimitSleepTime : byteLimitSleepTime;
          if (sleepTime > 0) {
              try {
                  Thread.sleep(sleepTime);
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
          }
          lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
                  currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
          lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
                  currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
          lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
                  currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
          lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
                  currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
          lastCommunication.setTimestamp(nowTimestamp);
      }
  }


04 文末


通过阅读本文可以知道DataX的限速原理了,如有疑问的童鞋,欢迎留言,本文完!

目录
相关文章
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
SQL 数据可视化 关系型数据库
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
4955 0
DataX教程(05)- DataX Web项目实践
|
数据采集 分布式计算 监控
DataX教程(03)- 源码解读(超详细版)
DataX教程(03)- 源码解读(超详细版)
3815 0
DataX教程(03)- 源码解读(超详细版)
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
1510 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
关系型数据库 MySQL 调度
DataX教程(05)- DataX Web项目实践
DataX教程(05)- DataX Web项目实践
2730 0
|
监控 调度 DataX
DataX教程(08)- 监控与汇报
DataX教程(08)- 监控与汇报
1319 0
DataX教程(08)- 监控与汇报
|
分布式计算 Oracle NoSQL
DataX教程(01)- 入门
DataX教程(01)- 入门
9124 0
DataX教程(01)- 入门
|
12月前
|
SQL 数据采集 BI
Quick 引擎-抽取性能提升
本文介绍了一种通过并发抽取方案提升 Quick BI 数据抽取性能的方法,利用 DataX 进行二次开发,实现数据同步至高性能 OLAP 引擎。通过指定分区键或配置多条 SQL 实现任务拆分,显著减少了数据抽取时间,优化效果得到客户认可。
396 0
Quick 引擎-抽取性能提升
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。