DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

本文涉及的产品
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
简介: 结合赋值节点通过MongoDB时间戳类型字段实现增量同步场景示例

背景:数据集成无法同步MongoDB时间戳字段类型实现增量同步。
场景:定时获取10分钟的增量数据,MongoDB增量字段为时间戳格式数据。

设置任务依赖实现参数传递:

设置节点依赖关系,调度配置都设置10分钟调度
data4.png

1、使用两个赋值节点定义时间戳格式的时间

开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
data3.png

2、配置MongoDB同步节点

添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
data2.png

MongoDB原始数据:

脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
data1.png

"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",

脚本配置示例

{
    "type": "job",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "ds1",
                                "query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
                "column": [
                    {
                        "name": "doc_id",
                        "type": "STRING"
                    },
                    {
                        "name": "create_time",
                        "type": "DOUBLE"
                    },
                    {
                        "name": "date_time",
                        "type": "DATE"
                    }
                ],
                "collectionName": "test1"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": false,
                "compress": false,
                "datasource": "odps_first",
                "column": [
                    "doc_id",
                    "create_time",
                    "date_time"
                ],
                "emptyAsNull": false,
                "table": "tablename"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

DataWorks百问百答历史记录 请点击这里查看>>

更多DataWorks技术和产品信息,欢迎加入【DataWorks钉钉交流群】

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标  通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群  企业数据仓库开发人员  大数据平台开发人员  数据分析师  大数据运维人员  对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
3月前
|
人工智能 JSON NoSQL
Go MongoDB Driver 中的 A D M E 类型是什么
Go MongoDB Driver 中的 A D M E 类型是什么
39 1
|
4月前
|
SQL NoSQL MongoDB
MongoDB 索引类型介绍
MongoDB 索引类型介绍
80 3
|
4月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何新增MongoDB数据源并设置鉴权
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
消息中间件 SQL 分布式计算
DataWorks产品使用合集之如何离线增量同步Kafka数据,并指定时间范围进行同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之怎么离线同步MongoDB的增量数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
DataWorks Kubernetes 大数据
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
飞天大数据平台产品问题之DataWorks提供的商业化服务如何解决
|
3月前
|
SQL DataWorks 安全
DataWorks产品使用合集之如何实现分钟级调度
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
运维 DataWorks 监控
DataWorks产品使用合集之如何自定义UDTF
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
分布式计算 DataWorks API
DataWorks产品使用合集之如何设置把结果传入变量
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

相关产品

  • 大数据开发治理平台 DataWorks