Kettle实现ES到ES循环增量抽取

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Kettle实现ES到ES循环增量抽取

主页:写程序的小王叔叔的博客欢迎来访👀

支持:点赞收藏关注



本博客内容,实践前,请先逐一浏览,然后再逐一学习

1、效果

image.png

2、实现

2.1 创建数据库

Kettle安装使用

2.2 创建作业

2.2.1 初始化变量:设置变量,通过变量实现作业的循环更新初始值

parent_job.setVariable("isContinue","1");parent_job.setVariable("lastUpdateTime","");true;

2.2.2 创建核心转换

【见2.3】

2.2.3 写日志记录

image.png

isContinue=${isContinue}-------------------lastUpdateTime=${lastUpdateTime}===============

2.2.4 设置循环

image.png

通过【2.2】中设置,可以将基本循环抽取动作的作业可以实现循环。

2.3 创建转换:关键处!!!

思路:

1.通过MySQL中kettle业务抽取的时间备用表,进行设置最后一次修改更新时间。

2.设置基本循环单次抽取的条数,和基本抽取的json格式

3.设置抽取的数据源

4.解析抽取后的es中内置的hits-source的相关结构

5.成功解析之后,将抽取到的数据进行入库,同时变量获取最新的更新时间保存到MySQL中,便于下次更新使用

2.3.1 选择数据源

image.png

selectround(unix_timestamp(timetable_dev.modify_time)*1000) asmodifyTime, '1'asisContinuefromes_kettle.timetable_devWHEREindex_name='sta_resource_operation'

2.3.2 更新常量

image.png

{"from":0,"size":10,"query":{"bool":{"filter":[{"bool":{"must":[{"range":{"last_update_time":{"from":startTime,"to":null,"include_lower":true,"include_upper":true,"boost":1}}}],"adjust_pure_negative":true,"boost":1}}],"adjust_pure_negative":true,"boost":1}},"sort":[{"last_update_time":{"order":"asc"}}]}

2.3.4 参数替换

image.png

2.3.5 设置数据源基本请求信息

image.png

2.3.6 配置解析hits结构

image.png

2.3.7 配置解析的结构

image.png

2.3.8 筛选结构

image.png

2.3.9 最后如ES库

image.png

2.3.10 根据时间设置循环

image.png

执行SQL脚本:

update es_kettle.timetable_devset modify_time = FROM_UNIXTIME('?','%Y-%m-%d %H:%i:%S')where index_name ='sta_resource_operation'

以上就是ES通过作业,转换进行抽取到新的ES结果

3、注意事项

3.1)设置对应字段

3.2)组件之间的关联性

4、最后完成效果

image.png

image.png

转载声明:本文为博主原创文章,未经博主允许不得转载

⚠️注意 ~

💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!

如有疑问❓可以在评论区💬或私信💬,尽我最大能力🏃‍♀️帮大家解决👨‍🏫!

如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~

相关文章
|
3月前
|
Java
ETL工具 Kettle 中 kettle循环传递变量_(最简单的方法)
本文详细介绍了如何在Kettle工具中使用循环传递变量,通过示例展示了如何将movies表数据按月插入到ods_movies表,涉及新建转换、获取变量、作业配置和执行,呈现了一个嵌套作业结构.
219 3
|
1月前
|
存储 JSON 自然语言处理
es索引文档过程
Elasticsearch 索引文档流程:先通过 REST API 或客户端创建索引,定义文档结构的映射;接着索引 JSON 格式的文档,Elasticsearch 解析、索引并存储;最后,文档以倒排索引形式存储,支持高效全文搜索。
40 5
|
6月前
|
缓存 索引
kibana上执行ES DSL语言查询数据并查看表结构与数据、删除索引、查看文件大小
kibana上执行ES DSL语言查询数据并查看表结构与数据、删除索引、查看文件大小
302 0
|
8月前
|
存储
ES批量写入数据
ES批量写入数据
288 1
|
8月前
|
算法 Apache 数据库
Sqoop的增量数据加载策略与示例
Sqoop的增量数据加载策略与示例
|
8月前
|
JSON 数据格式
es批量插入文件中的数据
es批量插入文件中的数据
|
JSON 移动开发 NoSQL
【ES系列九】——批量同步数据至ES
通过es官网提供的bulk方法进行实现
|
JSON 安全 数据安全/隐私保护
elasticdump迁移ES数据详解
elasticdump迁移ES数据详解
|
SQL Oracle 关系型数据库
基于变量方式实现kettle快速循环迁移表数据(八)
基于变量方式实现kettle快速循环迁移表数据(八)
390 0
基于变量方式实现kettle快速循环迁移表数据(八)
|
关系型数据库 MySQL Java
[玩转ES] ES批量/全量导入数据
[玩转ES] ES批量/全量导入数据
636 0
[玩转ES] ES批量/全量导入数据