背景:数据集成无法同步MongoDB时间戳字段类型实现增量同步。
场景:定时获取10分钟的增量数据,MongoDB增量字段为时间戳格式数据。
设置任务依赖实现参数传递:
设置节点依赖关系,调度配置都设置10分钟调度
1、使用两个赋值节点定义时间戳格式的时间
开始时间:
参数:day=$[yyyy-mm-dd] start_time=$[hh24:mi:ss- 1/24/60*10]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
结束时间:
参数:day=$[yyyy-mm-dd] end_time=$[hh24:mi:ss]
赋值语言选ODPS SQL:select UNIX_TIMESTAMP("${day} ${end_time}");
2、配置MongoDB同步节点
添加本节点输入参数 start_time和end_time,取值自上游的两个赋值节点
MongoDB原始数据:
脚本模式配置示例代码,源端create_time是double类型,存的时间戳。
"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
脚本配置示例
{
"type": "job",
"steps": [
{
"stepType": "mongodb",
"parameter": {
"datasource": "ds1",
"query": "{'create_time':{'$gte':${start_time},'$lt':${end_time}}}",
"column": [
{
"name": "doc_id",
"type": "STRING"
},
{
"name": "create_time",
"type": "DOUBLE"
},
{
"name": "date_time",
"type": "DATE"
}
],
"collectionName": "test1"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "",
"truncate": false,
"compress": false,
"datasource": "odps_first",
"column": [
"doc_id",
"create_time",
"date_time"
],
"emptyAsNull": false,
"table": "tablename"
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": ""
},
"speed": {
"throttle": false,
"concurrent": 2
}
}
}
DataWorks百问百答历史记录 请点击这里查看>>
更多DataWorks技术和产品信息,欢迎加入【DataWorks钉钉交流群】