DataX教程(09)- DataX是如何做到限速的?

简介: DataX教程(09)- DataX是如何做到限速的?

01 引言


通过前面的博文,我们对DataX有了一定的深入了解了:


  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
  • 《DataX教程(03)- 源码解读(超详细版)
  • 《DataX教程(04)- 配置完整解读》
  • 《DataX教程(05)- DataX Web项目实践》
  • 《DataX教程(06)- DataX调优》
  • 《DataX教程(07)- 图解DataX任务分配及执行流程》
  • 《DataX教程(08)- 监控与汇报》


随着对DataX深入学习,我提出了一个疑问,究竟DataX是如何做到限速的?本文来讲解下。


02 逆向定位代码


我们知道是在core.json文件里面的speed方法里面限速DataX的,可以通过record记录数和byte字节数来限速:

3c7f672b284c40bd84ed63fe7294b351.png


这个配置在CoreConstant类里面定义了:

82637dcfb35741b9a24e2beb78609bce.png


选中常量,右键使用IDEA的Find Usages…可以看到有两个地方调用了这个值:

619b15a9b128436fa8b0184bcd52ed11.png


接下来,看看这两个配置在Channel类如何实现限速的。


03 Channel类里实现限速


从下图,可以看到在Channel初始化时,顺带初始化了限速的记录数(recordSpeed)以及字节数(byteSpeed) ,接下来Control+F看看recordSpeed在哪里调用了。

adecc747142c47e3b2c0120cd9ec4f16.png


可以看到在statPush方法里面用到了:

ce37d84103864df1a9bfc17a7e72699e.png


statPush整个流程的描述:


  1. 判断byteSpeed(bps)和recordSpeed(tps)是否都大于0?如果不是,则退出;
  2. 根据当前的byteSpeed和设定的byteSpeed对比,求出睡眠时间(公式:currentByteSpeed * interval / this.byteSpeed- interval;)
  3. 根据当前的recordSpeed和设定的recordSpeed对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;)
  4. 取休眠时间最大值;
  5. Thread.sleep(sleepTime)来休眠


下面贴上statPush的完整代码:

private void statPush(long recordSize, long byteSize) {
      currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
              recordSize);
      currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
              byteSize);
      //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
      currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
      currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
      boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
      boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
      if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
          return;
      }
      long lastTimestamp = lastCommunication.getTimestamp();
      long nowTimestamp = System.currentTimeMillis();
      long interval = nowTimestamp - lastTimestamp;
      if (interval - this.flowControlInterval >= 0) {
          long byteLimitSleepTime = 0;
          long recordLimitSleepTime = 0;
          if (isChannelByteSpeedLimit) {
              long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
                      CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
              if (currentByteSpeed > this.byteSpeed) {
                  // 计算根据byteLimit得到的休眠时间
                  byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
                          - interval;
              }
          }
          if (isChannelRecordSpeedLimit) {
              long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
                      CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
              if (currentRecordSpeed > this.recordSpeed) {
                  // 计算根据recordLimit得到的休眠时间
                  recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
                          - interval;
              }
          }
          // 休眠时间取较大值
          long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                  recordLimitSleepTime : byteLimitSleepTime;
          if (sleepTime > 0) {
              try {
                  Thread.sleep(sleepTime);
              } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
              }
          }
          lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
                  currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
          lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
                  currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
          lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
                  currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
          lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
                  currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
          lastCommunication.setTimestamp(nowTimestamp);
      }
  }


04 文末


通过阅读本文可以知道DataX的限速原理了,如有疑问的童鞋,欢迎留言,本文完!

目录
相关文章
|
6月前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
4月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之在处理PostgreSQL数据库遇到报错。该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
72 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
6月前
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
2542 5
|
6月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之一直无法正常运行,并且网络状况良好,是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
84 8
|
5月前
|
运维 Java 关系型数据库
实时计算 Flink版操作报错合集之网络带宽不够的报错是怎样的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL NoSQL 关系型数据库
实时计算 Flink版产品使用合集之断点续传的步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
551 0
|
6月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在开发环境中配置MaxCompute参数进行调度,但参数解析不出来,如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。