在DataWorks中,MongoDB时间戳类型数据可以通过以下步骤实现增量同步:
设置任务依赖实现参数传递:设置节点依赖关系,调度配置都设置10分钟调度
1、使用两个赋值节点定义时间戳格式的时间开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("unknown unknown");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("unknown unknown");
2、配置MongoDB同步节点添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
MongoDB原始数据:脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
"query": "{'create_time':{'$gte':unknown,'$lt':unknown}}",
脚本配置示例{ "type": "job", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "ds1", "query": "{'create_time':{'$gte':unknown,'$lt':unknown}}", "column": [ { "name": "doc_id", "type": "STRING" }, { "name": "create_time", "type": "DOUBLE" }, { "name": "date_time", "type": "DATE" } ], "collectionName": "test1" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": false, "compress": false, "datasource": "odps_first", "column": [ "doc_id", "create_time", "date_time" ], "emptyAsNull": false, "table": "tablename" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "throttle": false, "concurrent": 2 } }},此回答整理自钉群“DataWorks交流群(答疑@机器人)”
在DataWorks中,实现MongoDB时间戳类型增量同步,可以使用以下方法:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
DataWorks基于MaxCompute/Hologres/EMR/CDP等大数据引擎,为数据仓库/数据湖/湖仓一体等解决方案提供统一的全链路大数据开发治理平台。