主页:写程序的小王叔叔的博客欢迎来访👀
支持:点赞收藏关注
本博客内容,实践前,请先逐一浏览,然后再逐一学习
1、效果
2、实现
2.1 创建数据库
2.2 创建作业
2.2.1 初始化变量:设置变量,通过变量实现作业的循环更新初始值
parent_job.setVariable("isContinue","1");parent_job.setVariable("lastUpdateTime","");true;
2.2.2 创建核心转换
【见2.3】
2.2.3 写日志记录
isContinue=${isContinue}-------------------lastUpdateTime=${lastUpdateTime}===============
2.2.4 设置循环
通过【2.2】中设置,可以将基本循环抽取动作的作业可以实现循环。
2.3 创建转换:关键处!!!
思路:
1.通过MySQL中kettle业务抽取的时间备用表,进行设置最后一次修改更新时间。
2.设置基本循环单次抽取的条数,和基本抽取的json格式
3.设置抽取的数据源
4.解析抽取后的es中内置的hits-source的相关结构
5.成功解析之后,将抽取到的数据进行入库,同时变量获取最新的更新时间保存到MySQL中,便于下次更新使用
2.3.1 选择数据源
selectround(unix_timestamp(timetable_dev.modify_time)*1000) asmodifyTime, '1'asisContinuefromes_kettle.timetable_devWHEREindex_name='sta_resource_operation'
2.3.2 更新常量
{"from":0,"size":10,"query":{"bool":{"filter":[{"bool":{"must":[{"range":{"last_update_time":{"from":startTime,"to":null,"include_lower":true,"include_upper":true,"boost":1}}}],"adjust_pure_negative":true,"boost":1}}],"adjust_pure_negative":true,"boost":1}},"sort":[{"last_update_time":{"order":"asc"}}]}
2.3.4 参数替换
2.3.5 设置数据源基本请求信息
2.3.6 配置解析hits结构
2.3.7 配置解析的结构
2.3.8 筛选结构
2.3.9 最后如ES库
2.3.10 根据时间设置循环
执行SQL脚本:
update es_kettle.timetable_devset modify_time = FROM_UNIXTIME('?','%Y-%m-%d %H:%i:%S')where index_name ='sta_resource_operation'
以上就是ES通过作业,转换进行抽取到新的ES结果
3、注意事项
3.1)设置对应字段
3.2)组件之间的关联性
4、最后完成效果
转载声明:本文为博主原创文章,未经博主允许不得转载
⚠️注意 ~
💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!
如有疑问❓可以在评论区💬或私信💬,尽我最大能力🏃♀️帮大家解决👨🏫!
如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~