DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

本文涉及的产品
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
简介: 结合赋值节点通过MongoDB时间戳类型字段实现增量同步场景示例

背景:数据集成无法同步MongoDB时间戳字段类型实现增量同步。
场景:定时获取10分钟的增量数据,MongoDB增量字段为时间戳格式数据。

设置任务依赖实现参数传递:

设置节点依赖关系,调度配置都设置10分钟调度
data4.png

1、使用两个赋值节点定义时间戳格式的时间

开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
data3.png

2、配置MongoDB同步节点

添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
data2.png

MongoDB原始数据:

脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
data1.png

"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",

脚本配置示例

{
    "type": "job",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "ds1",
                                "query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
                "column": [
                    {
                        "name": "doc_id",
                        "type": "STRING"
                    },
                    {
                        "name": "create_time",
                        "type": "DOUBLE"
                    },
                    {
                        "name": "date_time",
                        "type": "DATE"
                    }
                ],
                "collectionName": "test1"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": false,
                "compress": false,
                "datasource": "odps_first",
                "column": [
                    "doc_id",
                    "create_time",
                    "date_time"
                ],
                "emptyAsNull": false,
                "table": "tablename"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

DataWorks百问百答历史记录 请点击这里查看>>

更多DataWorks技术和产品信息,欢迎加入【DataWorks钉钉交流群】

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
4月前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
|
3月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之如何判断数据库类型是否支持整库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何将STRING类型转换为DATETIME类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之怎么在同步脚本里进行列转行
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
存储 运维 DataWorks
DataWorks产品使用合集之怎么实现时间字段进行分区同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理在DI节点同步到OceanBase数据库时,出现SQLException: Not supported feature or function
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
数据采集 运维 DataWorks
DataWorks产品使用合集之如何从es同步数据到es
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何新增MongoDB数据源并设置鉴权
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之如何在DataWorks中实现离线同步多个分表到MC的多级分区表
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。

热门文章

最新文章

相关产品

  • 大数据开发治理平台 DataWorks