DataX教程(08)- 监控与汇报

简介: DataX教程(08)- 监控与汇报

01 引言


通过前面的博文,我们对DataX有了一定的深入了解了:


  • 《DataX教程(01)- 入门》
  • 《DataX教程(02)- IDEA运行DataX完整流程(填完所有的坑)》
  • 《DataX教程(03)- 源码解读(超详细版)
  • 《DataX教程(04)- 配置完整解读》
  • 《DataX教程(05)- DataX Web项目实践》
  • 《DataX教程(06)- DataX调优》
  • 《DataX教程(07)- 图解DataX任务分配及执行流程》


本文主要讲解DataX的监控与汇报功能。


02 监控功能


2.1 ErrorRecordChecker


在JobContainer类里面,可以看到引用了一个类ErrorRecordChecker,它在JobContainer初始化的时候做了初始操作。

f3b343379c6f48d2804a2541139f3066.png


ErrorChecker是一个监控类,主要用来检查任务是否到达错误记录限制。有检查条数(recordLimit)和百分比(percentageLimit)两种方式:


  • errorRecord表示出错条数不能大于限制数,当超过时任务失败。比如errorRecord为0表示不容许任何脏数据;
  • errorPercentage表示出错比例,在任务结束时校验;
  • errorRecord优先级高于errorPercentage。


2.2 ErrorRecordChecker源码


Control+O可以看到ErrorRecordChecker,有如下几个方法:

b959084d2d924582af5387d02bca767a.png这里主要做简要描述,


① 构造函数ErrorRecordChecker(Configuration configuration):主要就是从任务配置文件job.json里面获取errorLimit.record错误记录数限制及errorLimit.percentage错误记录百分比的值:

public ErrorRecordChecker(Configuration configuration) {
     this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
             configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
 }


② 检查错误记录数限制checkRecordLimit(Communication communication):主要就是从communication里获取总共的错误记录数,然后判断是否超出配置的值,如果是,则抛出异常

 public void checkRecordLimit(Communication communication) {
        if (recordLimit == null) {
            return;
        }
        long errorNumber = CommunicationTool.getTotalErrorRecords(communication);
        if (recordLimit < errorNumber) {
            LOG.debug(
                    String.format("Error-limit set to %d, error count check.",
                            recordLimit));
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_DIRTY_DATA_LIMIT_EXCEED,
                    String.format("脏数据条数检查不通过,限制是[%d]条,但实际上捕获了[%d]条.",
                            recordLimit, errorNumber));
        }
    }


③ 检查错误记录百分比checkPercentageLimit(Communication communication):主要就是从communication里获取总共的错误记录数与总数的百分比值,然后判断是否超出配置的值,如果是,则抛出异常:

public void checkPercentageLimit(Communication communication) {
      if (percentageLimit == null) {
          return;
      }
      LOG.debug(String.format(
              "Error-limit set to %f, error percent check.", percentageLimit));
      long total = CommunicationTool.getTotalReadRecords(communication);
      long error = CommunicationTool.getTotalErrorRecords(communication);
      if (total > 0 && ((double) error / (double) total) > percentageLimit) {
          throw DataXException.asDataXException(
                  FrameworkErrorCode.PLUGIN_DIRTY_DATA_LIMIT_EXCEED,
                  String.format("脏数据百分比检查不通过,限制是[%f],但实际上捕获到[%f].",
                          percentageLimit, ((double) error / (double) total)));
      }
  }


好了,这里就讲完了ErrorRecordChecker的功能了,注意check方法里面有一个Communication类,这是一个通讯类,主要用来保存当前任务的状态信息的,接下来也会讲解。


2.3 ErrorRecordChecker检查时机


Control点击可以看到ErrorRecordChecker被JobContainer调用(初始化,前面已讲),以及在AbstractScheduler任务任务调度schedule方法执行的时候调用了。

f4d9afc395604c4181625129b6367960.png


再看看check方法在哪里调用了,经过追踪,可以分析得出:


  • 在JobContainer的schedule方法结束后会调用,检查整个任务的错误记录数
  • 在AbstractScheduler的schedule方法,里面开了一个while死循环,不断去采集任务的状态,检查的时间间隔配置(core.container.job.sleepInterval)在core.json里面的job.sleepInterval里配置。


最后贴下,AbstractScheduler的schedule方法实现实时采集的代码:

 while (true) {
  /**
     * step 1: collect job stat
     * step 2: getReport info, then report it
     * step 3: errorLimit do check
     * step 4: dealSucceedStat();
     * step 5: dealKillingStat();
     * step 6: dealFailedStat();
     * step 7: refresh last job stat, and then sleep for next while
     *
     * above steps, some ones should report info to DS
     *
     */
    Communication nowJobContainerCommunication = this.containerCommunicator.collect();
    nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
    LOG.debug(nowJobContainerCommunication.toString());
    //汇报周期
    long now = System.currentTimeMillis();
    if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
        Communication reportCommunication = CommunicationTool
                .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
        this.containerCommunicator.report(reportCommunication);
        lastReportTimeStamp = now;
        lastJobContainerCommunication = nowJobContainerCommunication;
    }
    errorLimit.checkRecordLimit(nowJobContainerCommunication);
    if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
        LOG.info("Scheduler accomplished all tasks.");
        break;
    }
    if (isJobKilling(this.getJobId())) {
        dealKillingStat(this.containerCommunicator, totalTasks);
    } else if (nowJobContainerCommunication.getState() == State.FAILED) {
        dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
    }
    Thread.sleep(jobSleepIntervalInMillSec);
}


03 汇报功能


3.1 汇报运行流程


友情提示:可能图片较大,建议下载下来使用图片编辑器查看。


首先贴上一张图,里面描述的是Scheduler调度器与ErrorRecordChecker错误检查器及Communicator通讯者的整个调用关系,从上往下看:

d41f567617344e92a617dc7ef809e35e.png


3.2 汇报的运行流程


3.2.1 汇报的几个角色


汇报主要有几个重要的角色:


  • AbstractCommunicator通讯者抽象类:主要用来做通讯的协调;
  • Communication通讯的信息载体:主要用来存放通讯过程中产生的信息,为单例;
  • LocalTGCommunicationManager通讯信息载体工厂:根据任务id来获取通讯信息载体单例的工厂;
  • CommunicationTool信息载体工具类:此工具类是通讯业务层的处理,主要用来收集当前信息,并写入到Communication通讯的信息载体;
  • AbstractReporter信息上报:用来上报通讯信息。


3.2.2 汇报的流程


简要的流程描述:


  1. 首先根据配置new一个通讯者对象,有两种,分别为“StandAloneJobContainerCommunicator”、“StandAloneTGContainerCommunicator”,生成后,注入进Scheduler调度者,此时,Scheduler就有了一个Communicator工具了;
  2. 通讯者Communicator使用collect方法生成通讯的载体,也就是Communication,用来存放任务的相关信息,ErrorRecorder就是从这个Communication里获取当前任务的信息的;
  3. Scheduler调度器类里面,使用Communicator通讯工具的collect方法来获取communication通讯载体单例(获取单例方法在LocalTGCommunicationManager类,里面定义了Map,key为任务id,value为Communication通讯载体);
  4. Scheduler获取到Communication通讯载体后,使用CommunicationTool工具类把当前任务的状态信息写入;
  5. 最后使用reporter来上报Communication信息。


3.3 什么时候写信息内容


前面的3.1和3.2只做到了通讯类Communicator和通讯信息载体Communication的初始化,以及上报的流程,但是没有针对到哪里写入内容到Communication?这里直接看写入信息到Communication的地方,核心内容在TaskGroupContainer里面,下面来看看:


①首先根据任务id获取Communication的代码地方,在内部类TaskExecutor构造函数的地方:

ac024090c819402880bad907393e4087.png


②把Communication注入进Channel通道类,Channel通道类主要做内容的记录(核心:统计和限速都在这里):d15738f35d4a40258987d03cd262a8df.png


③Channel注入进了BufferedRecordExchanger或BufferedRecordTransformerExchanger,

而这连个Exchanger主要是为了记录RecordSender记录发送者、RecordReceiver记录接收者、TransformerExchanger的内容,就是记录ETL这3个模块里面的内容

7880fdf109ea4ff2ba15916ab2d8adfa.png


根据流程,可以看到Channel类使用来收集ETL的信息的,那么看看Channel这个类的一些核心方法。


3.4 Channel通讯信息接收


Channel类有很多的方法,Control+O可以看到:

c835ea017cf54783af8845d699c5a391.png


举个例子,可以看看Channelpush(final Record r)方法:

public void push(final Record r) {
     Validate.notNull(r, "record不能为空.");
     this.doPush(r);
     this.statPush(1L, r.getByteSize());
 }


进入statPush方法:

private void statPush(long recordSize, long byteSize) {
     currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
             recordSize);
     currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
             byteSize);
     //在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
     currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
     currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
     boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
     boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
     if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
         return;
     }
     long lastTimestamp = lastCommunication.getTimestamp();
     long nowTimestamp = System.currentTimeMillis();
     long interval = nowTimestamp - lastTimestamp;
     if (interval - this.flowControlInterval >= 0) {
         long byteLimitSleepTime = 0;
         long recordLimitSleepTime = 0;
         if (isChannelByteSpeedLimit) {
             long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
                     CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
             if (currentByteSpeed > this.byteSpeed) {
                 // 计算根据byteLimit得到的休眠时间
                 byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
                         - interval;
             }
         }
         if (isChannelRecordSpeedLimit) {
             long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
                     CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
             if (currentRecordSpeed > this.recordSpeed) {
                 // 计算根据recordLimit得到的休眠时间
                 recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
                         - interval;
             }
         }
         // 休眠时间取较大值
         long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
                 recordLimitSleepTime : byteLimitSleepTime;
         if (sleepTime > 0) {
             try {
                 Thread.sleep(sleepTime);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
         }
         lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
                 currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
         lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
                 currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
         lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
                 currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
         lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
                 currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
         lastCommunication.setTimestamp(nowTimestamp);
     }
 }


可以看到把内容都设置进Communication信息载体了,这里还有其它的方法如pushAll等。大家Control鼠标点一下就能trace整个调用链了,其实就是不同的插件调用触发Exchanger方法,然后在Exchanger里面调用Channel的方法来记录到Communication信息载体。


04 文末


好了,到此把DataX的监控与汇报功能讲解完毕了,有疑问的童鞋欢迎留言,谢谢大家的阅读,本文完!

目录
相关文章
|
Web App开发 前端开发 Java
数据探查中心Apache Zeppelin二次开发初体验
数据探查中心Apache Zeppelin二次开发初体验
906 0
数据探查中心Apache Zeppelin二次开发初体验
|
5月前
|
SQL 消息中间件 监控
实时计算 Flink版产品使用问题之怎么使用Metric Reporters监控作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 存储 监控
实时计算 Flink版产品使用合集之Checkpoint监控和反压监控在哪里看
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
674 0
|
8月前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
调度 DataX 容器
DataX教程(07)- 图解DataX任务分配及执行流程
DataX教程(07)- 图解DataX任务分配及执行流程
615 0
DataX教程(07)- 图解DataX任务分配及执行流程
|
监控 调度 DataX
DataX教程(08)- 监控与汇报
DataX教程(08)- 监控与汇报
575 0
|
监控 DataX
DataX教程(09)- DataX是如何做到限速的?
DataX教程(09)- DataX是如何做到限速的?
388 0
|
JSON Java DataX
DataX教程(04)- 配置完整解读
DataX教程(04)- 配置完整解读
2653 0
|
存储 数据采集 SQL
大数据数据采集的数据迁移(同步/传输)的Sqoop之基本命令和使用的job作业
在大数据领域中,Sqoop是一款非常流行的数据迁移工具。它可以将关系型数据库中的数据快速地移动到Hadoop生态系统中,方便我们进行更深入的分析和处理。本文将介绍Sqoop的基本命令及如何使用Sqoop来创建和运行job作业,希望能够为大家提供一些参考和帮助。
195 0