使用Kettle动态生成页码并实现分页数据同步

简介: 使用Kettle动态生成页码并实现分页数据同步

需求


将 DB2 数据库中的表数据导入另一个 DB2 数据库的表里面。

源表(DB2):table1

目标表(DB2):table2

数据量:千万级别


思路


当时直接使用 Kettle 将数据从源表导入到目标表中,但是考虑到数据量过于庞大,实际执行过程花费了很长时间,因此考虑采用分页导入的方式来进行数据传输,即:

根据实际情况设置一个每次处理的数据量,比如:5,000条,然后根据总的数据条数和每次处理的数据量计算出一共分几页。


假设总数据量有:10,000,000,所以页数为:10,000,000/5,000=2000页

注: 若存在小数,小数部分算一页,比如:20.3算21页


解决方案


根据上述思路,我们首先需要考虑如何计算得到总页数,以及页码。可以考虑增加一个辅助配置表来存放页码,这也是我在网上看到的处理方法,但是不符合工作需求,所以我考虑必须自动计算一个表的总页数,并生成页码。最后根据页码来循环导入数据。

主流程如下:onetable_A.kjb


image.png


流程说明:

  1. 首先我们建立一个作业流程,然后配置一个 START 节点;
  2. 第一个转换流程用于查询表数据的数目,并计算得到总页数,以及得到一个页码集合;
  3. 再建一个作业流程,来循环处理页码,主要是将第几页的数据从源表同步到目标表中。


根据表数据生成页码


首先我们来重点关注第一个转换流程,这也是本次实现过程中最重要的一点。

流程结构如下:


image.png


getTotal 节点用于查询表数据的数目,其实就是在系统表中做查询操作,具体实现如下:


image.png


点击预览按钮,结果如下:


image.png


Java 代码脚本部分主要是根据上一步得到的数据数目来计算页码,并生成一个页码集合。关于 Java 代码脚本的使用,这里就不做介绍了。构建过程如下:


1.jpgimage.png


Java 代码如下:


public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;
    /* TODO: Your code here. (Using info fields)
    FieldHelper infoField = get(Fields.Info, "info_field_name");
    RowSet infoStream = findInfoRowSet("info_stream_tag");
    Object[] infoRow = null;
    int infoRowCount = 0;
    // Read all rows from info step before calling getRow() method, which returns first row from any
    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
    while((infoRow = getRowFrom(infoStream)) != null){
      // do something with info data
      infoRowCount++;
    }
    */
  }
  Object[] r = getRow();
  if (r == null) {
    setOutputDone();
    return false;
  }
  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
  // enough to handle any new fields you are creating in this step.
  /* TODO: Your code here. (See Sample)
  // Get the value from an input field
  String foobar = get(Fields.In, "a_fieldname").getString(r);
  foobar += "bar";
  // Set a value in a new output field
  get(Fields.Out, "output_fieldname").setValue(r, foobar);
  */
//此处创建 r,是为了获取输入参数TOTAL_SRC的值
r = createOutputRow(r, data.outputRowMeta.size());
double num = get(Fields.In, "TOTAL_SRC").getNumber(r);
int pageNum = 5000;
int pages = (int)num/pageNum +1;    //计算总页数
//生成页码,并输出
for(int i=0;i<pages;i++){
    //个人觉得r类似于输出器,如果想将每个页码都输出去,则必须独立进行声明,此步骤为本人测试所得
      r = createOutputRow(r, data.outputRowMeta.size());
    get(Fields.Out, "PAGE").setValue(r, i);     //将页码赋值给PAGE
    //get(Fields.Out, "TJOBSEQ").setValue(r, get(Fields.In, "JOB_SEQ").getString(r));
    //get(Fields.Out, "id").setValue(r, i);
  putRow(data.outputRowMeta, r);
} 
  // Send the row on to the next step.
  return true;
}
复制代码


虽然我们想要得到的页码为整数类型,但是在设置 PAGE 类型时需要设置为 String 类型,否则会报错,如下图所示:


image.png


并不影响后续的使用。


对于下述流程进行测试验证。


image.pngimage.png

执行结果为:

1.jpg


从结果可以看出,每个页码都被正确输出。那么接下来我们需要将页码复制到结果中,传递到接下来的作业流程中。


根据页码循环同步数据


页码生成完毕后,接下来就是根据页码从源表查询数据,然后同步到目标表中。流程设计如下:


image.png


传进来的页码必须先从结果中获取到,然后再定义为变量,才能被后续所使用。


第一个转换流程的内部实现如下:


image.png


为了区分,我们将变量名叫做 EPAGE。


接下来就是数据同步,查看 getData_Epage。


image.png


首先需要根据页码从源表中获取到数据,注意这里为了使用页码条件,查询得到的结果不得不多了一列结果。


image.png


表输出时,注意不要勾选裁剪表,需要指定数据库字段,将上面查到的结果中多的一列给剔除掉。如果不想在此处做数据库字段指定操作,可以修改表输入中的查询语句。


select
REC_CREATOR,
REC_CREATE_TIME,
....
from (select ROW_NUMBER() over() as a, g001.* from
ZGROD112.D112_L2_FV_QD_CPCSEG002 g001) as temp 
where a>=(5*${EPAGE}+1) and a<=(5*(${EPAGE}+1))
复制代码


至此,关于大规模数据表之间的同步操作结束。


总结


本例重点讲述了如何根据表数据的数目动态生成页码,从而减少了页码配置表的构建步骤,最终减轻数据同步对于内存资源的占用。


拓展


实际工作中,我们不可能只对一张表做数据同步,往往会针对多张表做同步操作,所以对每张表还会有一个循环处理操作。关于这种情况的处理,需要一个额外的配置表,用来存放源系统和目标系统基本信息,以及源表和目标表信息,如果需要做增量同步操作,还可以加几个字段。后续有时间可以简单写一下。


目录
相关文章
|
3天前
|
缓存 监控
遇到Hologres慢查询列表的导出功能出现问题,无法下载查询结果的情况
【2月更文挑战第20天】遇到Hologres慢查询列表的导出功能出现问题,无法下载查询结果的情况
60 1
|
SQL 前端开发 JavaScript
eggjs 怎么实现获取账单列表接口并且实现列表数据分页查询功能?
eggjs 怎么实现获取账单列表接口并且实现列表数据分页查询功能?
152 0
eggjs 怎么实现获取账单列表接口并且实现列表数据分页查询功能?
|
3天前
|
SQL 关系型数据库 MySQL
MYSQL基础知识之【添加数据,查询数据】
MYSQL基础知识之【添加数据,查询数据】
29 0
|
6月前
|
关系型数据库 MySQL PostgreSQL
ruoyi数据源修改为PostgreSQL分页错误
ruoyi数据源修改为PostgreSQL分页错误
|
7月前
|
SQL DataWorks 数据处理
在DataWorks中,您可以将抽取数据的SQL和数据集成任务放在一起
在DataWorks中,您可以将抽取数据的SQL和数据集成任务放在一起
48 4
|
10月前
|
SQL 关系型数据库 MySQL
解决MySQL中分页查询时多页有重复数据,实际只有一条数据的问题
使用主键进行分页查询就不会产生数据排序重复/缺失的问题
140 1
|
11月前
|
前端开发 数据库
分页查询文件上传的主页面
分页查询文件上传的主页面,可参考
29 0
|
关系型数据库 MySQL 数据库
数据库学习-更新数据作业示例【带源码】
MySQL数据库 “更新数据” 习题示例,包含源码,能建立起对于如何更新数据的基本概念
132 0
数据库学习-更新数据作业示例【带源码】
|
关系型数据库 MySQL 数据库
数据库学习-查询数据作业示例【带源码】
MySQL数据库 “查询数据 ” 习题示例,包含源码,能建立起对于查询数据 的基本概念
180 0
数据库学习-查询数据作业示例【带源码】