DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?-阿里云开发者社区

开发者社区> DataWorks> 正文

DataWorks百问百答28:MongoDB时间戳类型字段如何实现增量同步?

简介: 结合赋值节点通过MongoDB时间戳类型字段实现增量同步场景示例

背景:数据集成无法同步MongoDB时间戳字段类型实现增量同步。
场景:定时获取10分钟的增量数据,MongoDB增量字段为时间戳格式数据。

设置任务依赖实现参数传递:

设置节点依赖关系,调度配置都设置10分钟调度
data4.png

1、使用两个赋值节点定义时间戳格式的时间

开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
data3.png

2、配置MongoDB同步节点

添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
data2.png

MongoDB原始数据:

脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
data1.png

"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",

脚本配置示例

{
    "type": "job",
    "steps": [
        {
            "stepType": "mongodb",
            "parameter": {
                "datasource": "ds1",
                                "query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
                "column": [
                    {
                        "name": "doc_id",
                        "type": "STRING"
                    },
                    {
                        "name": "create_time",
                        "type": "DOUBLE"
                    },
                    {
                        "name": "date_time",
                        "type": "DATE"
                    }
                ],
                "collectionName": "test1"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "odps",
            "parameter": {
                "partition": "",
                "truncate": false,
                "compress": false,
                "datasource": "odps_first",
                "column": [
                    "doc_id",
                    "create_time",
                    "date_time"
                ],
                "emptyAsNull": false,
                "table": "tablename"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "throttle": false,
            "concurrent": 2
        }
    }
}

DataWorks百问百答历史记录 请点击这里查看>>

更多DataWorks技术和产品信息,欢迎加入【DataWorks钉钉交流群】

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
DataWorks
使用钉钉扫一扫加入圈子
+ 订阅

DataWorks作为飞天大数据平台操作系统,对接各种大数据计算引擎,以all in one box的方式提供专业高效、安全可靠的全域智能大数据平台,高效率完成数据全链路研发流程,建设企业数据治理体系。 从2009年飞天大数据平台写下第一行代码开始,DataWorks历经10年发展,形成一套成熟的产品功能体系,满足企业数据中台搭建需求。

官方博客
DataWorks产品官网