01 引言
通过前面的博文,我们对DataX有了一定的深入了解了:
- 《DataX教程(01)- 入门》
- 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
- 《DataX教程(03)- 源码解读(超详细版)
- 《DataX教程(04)- 配置完整解读》
- 《DataX教程(05)- DataX Web项目实践》
- 《DataX教程(06)- DataX调优》
- 《DataX教程(07)- 图解DataX任务分配及执行流程》
- 《DataX教程(08)- 监控与汇报》
随着对DataX深入学习,我提出了一个疑问,究竟DataX是如何做到限速的?本文来讲解下。
02 逆向定位代码
我们知道是在core.json文件里面的speed方法里面限速DataX的,可以通过record记录数和byte字节数来限速:
这个配置在CoreConstant
类里面定义了:
选中常量,右键使用IDEA的Find Usages…可以看到有两个地方调用了这个值:
接下来,看看这两个配置在Channel类如何实现限速的。
03 Channel类里实现限速
从下图,可以看到在Channel初始化时,顺带初始化了限速的记录数(recordSpeed)以及字节数(byteSpeed) ,接下来Control+F看看recordSpeed在哪里调用了。
可以看到在statPush
方法里面用到了:
statPush整个流程的描述:
- 判断byteSpeed(bps)和recordSpeed(tps)是否都大于0?如果不是,则退出;
- 根据当前的byteSpeed和设定的byteSpeed对比,求出睡眠时间(公式:currentByteSpeed * interval / this.byteSpeed- interval;)
- 根据当前的recordSpeed和设定的recordSpeed对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;)
- 取休眠时间最大值;
- Thread.sleep(sleepTime)来休眠
下面贴上statPush的完整代码:
private void statPush(long recordSize, long byteSize) { currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS, recordSize); currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES, byteSize); //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数 currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime); currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime); boolean isChannelByteSpeedLimit = (this.byteSpeed > 0); boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0); if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) { return; } long lastTimestamp = lastCommunication.getTimestamp(); long nowTimestamp = System.currentTimeMillis(); long interval = nowTimestamp - lastTimestamp; if (interval - this.flowControlInterval >= 0) { long byteLimitSleepTime = 0; long recordLimitSleepTime = 0; if (isChannelByteSpeedLimit) { long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) - CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval; if (currentByteSpeed > this.byteSpeed) { // 计算根据byteLimit得到的休眠时间 byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed - interval; } } if (isChannelRecordSpeedLimit) { long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) - CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval; if (currentRecordSpeed > this.recordSpeed) { // 计算根据recordLimit得到的休眠时间 recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed - interval; } } // 休眠时间取较大值 long sleepTime = byteLimitSleepTime < recordLimitSleepTime ? recordLimitSleepTime : byteLimitSleepTime; if (sleepTime > 0) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES, currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)); lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES, currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES)); lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS, currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)); lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS, currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS)); lastCommunication.setTimestamp(nowTimestamp); } }
04 文末
通过阅读本文可以知道DataX
的限速原理了,如有疑问的童鞋,欢迎留言,本文完!