DataWorks中数据集成是es数据源,检索查询条件中怎么通过调度参数获取对应的时间戳?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在DataWorks中,当数据集成使用Elasticsearch(ES)作为数据源时,可以通过调度参数动态获取时间戳,并将其用于检索查询条件中。以下是具体实现方法和步骤:
在DataWorks的任务调度配置中,可以定义与时间相关的调度参数。这些参数会在任务运行时被替换为实际的时间值。
常用调度参数:
${yyyymmdd}
:表示当前业务日期,格式为YYYYMMDD
。${yyyy-mm-dd}
:表示当前业务日期,格式为YYYY-MM-DD
。${bizdate}
:表示业务日期,默认为前一天的日期,格式为YYYYMMDD
。${currentTimeStamp}
:表示当前系统时间戳。示例: 如果需要获取前一天的开始时间和结束时间,可以在调度参数中定义如下变量:
var1=${yyyymmdd-1} # 前一天的日期
var2=${yyyymmdd} # 当前日期
在数据集成任务中,Elasticsearch Reader支持通过脚本模式配置查询条件。可以将调度参数嵌入到查询条件中,以动态生成时间范围。
关键参数:
beginDateTime
:数据消费的开始时间位点,格式为yyyyMMddHHmmss
。endDateTime
:数据消费的结束时间位点,格式为yyyyMMddHHmmss
。示例脚本: 在Elasticsearch Reader的脚本中,可以使用${变量名}
的方式引用调度参数。例如:
{
"reader": {
"plugin": "elasticsearch",
"parameter": {
"connection": [
{
"endpoint": "http://your-es-endpoint",
"index": "your_index_name"
}
],
"query": "{\"range\": {\"timestamp_field\": {\"gte\": \"${beginDateTime}\", \"lt\": \"${endDateTime}\"}}}"
}
}
}
timestamp_field
:Elasticsearch中的时间字段名称。${beginDateTime}
和 ${endDateTime}
:分别表示查询的时间范围起点和终点。在任务运行时,调度参数会被替换为实际的时间值。例如: - 如果任务运行时间为2023-10-07
,且调度参数定义如下:
beginDateTime=${yyyymmdd-1}000000 # 前一天的0点0分0秒
endDateTime=${yyyymmdd}000000 # 当天的0点0分0秒
则实际替换后的时间范围为:
beginDateTime=20231006000000
endDateTime=20231007000000
beginDateTime
和endDateTime
必须符合yyyyMMddHHmmss
格式,否则可能导致查询失败。假设需要每天同步Elasticsearch中前一天的数据,可以按照以下步骤配置:
定义调度参数:
beginDateTime=${yyyymmdd-1}000000
endDateTime=${yyyymmdd}000000
配置Reader脚本:
{
"reader": {
"plugin": "elasticsearch",
"parameter": {
"connection": [
{
"endpoint": "http://your-es-endpoint",
"index": "your_index_name"
}
],
"query": "{\"range\": {\"@timestamp\": {\"gte\": \"${beginDateTime}\", \"lt\": \"${endDateTime}\"}}}"
}
}
}
提交并运行任务:
通过以上步骤,您可以利用DataWorks的调度参数动态获取时间戳,并将其应用于Elasticsearch数据源的检索查询条件中,从而实现灵活的数据同步需求。
DataWorks基于MaxCompute/Hologres/EMR/CDP等大数据引擎,为数据仓库/数据湖/湖仓一体等解决方案提供统一的全链路大数据开发治理平台。