如何不写一行代码把 Mysql json 字符串解析为 Elasticsearch 的独立字段-阿里云开发者社区

开发者社区> laoyang360> 正文

如何不写一行代码把 Mysql json 字符串解析为 Elasticsearch 的独立字段

简介: 1、事出有因 实战问题:有数百万数据需要导入 Elasticsearch 做性能对比测试,但当前数据存储在 Mysql 中,且核心字段以 Json 字符串形式存储。Mysql 存储如下所示:
+关注继续查看

image.png

链接

image.png

有没有又快又好的方法?接收同事是非开发人员,如果不写一行代码(脚本)就更好了!


2、方案探讨

2.1 前置认知

比较成熟同步方案选型。


Mysql 到 Elasticsearch 同步选定:logstash。


2.2 Json 字段的处理方案

2.2.1 方案一:遍历 Mysql,解析Json。


逐行遍历 Mysql,把 Json 字符串字段解析为单个字段,更新到Mysql中。


然后,logstash 同步到 Elasticsearch。


优点:很好理解,切实可行。


缺点:需要写解析代码,且涉及 Mysql 的逐行更新操作,慢且效率低。


2.2.2 方案二:logstash 中间环节用 json filter 插件过滤搞定 Json 串解析。


在 logstash 中间 filter 环节,加上 json 串的过滤。


举例如下(类似):


filter {

     json {

       source => "message",

       target => "doc"

     }

   }

实战参考:


https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html


优点:少了代码解析环节,借助插件实现。


缺点:需要修改 logstash 同步脚本,有一点学习成本。


2.2.3 方案三:Ingest 数据预处理搞定 json 解析。


既然 logstash json filter 插件能做数据解析,那么,与之对标的 Ingest 管道预处理中的 json processor 等 processor 组合肯定也能搞定。


优点1:少了代码解析环节,借助 Ingest processor 组合实现复杂数据预处理功能。


优点2:相比 logstash filter 更通俗易懂,小白也能快速上手。


缺点:占无。


3、实战一把

如前分析,方案一、二 也能搞定。


但是,方案三更方便,更适合技术小白人员甚至非技术人员。


我们就以方案三实战一把。


3.1 创建预处理管道

PUT _ingest/pipeline/text2json_pipeline

{

 "description": "describe pipeline",

 "processors": [

   {

     "json": {

       "field": "wb_detail",

       "target_field": "wb_json"

     }

   },

   {

     "script": {

       "source": """

           ctx.loc = ctx.wb_json.loc;

           ctx.cont = ctx.wb_json.wc;

           ctx.author = ctx.wb_json.usn;

           ctx.area = ctx.wb_json.uloc;

           ctx.url = ctx.wb_json.sr;

         """

     }

   },

   {

     "remove": {

       "field": "wb_json"

     }

   }

 ]

}

如上所示,应用了三个 process。


processor 1:json 处理。


将 wb_detail 源字符串 变成 wb_json json串。


wb_json 属于中间过度字段。


processor 2:script 处理。


将 wb_json json 串中的字段逐个字段切分。


processor 3:remove 删除字段处理。


删除中间过度字段 wb_json。


3.2 创建索引,并指定 default_pipeline

PUT test-003

{

 "settings": {

   "number_of_shards": 1,

   "number_of_replicas": 0,

   "refresh_interval": "30s",

   "index.default_pipeline":"text2json_pipeline"

 },

 "mappings": {

   "properties": {

     "area": {

       "type": "text",

       "analyzer": "ik_smart",

       "fields": {

         "keyword": {

           "type": "keyword"

         }

       }

     },

     "author": {

       "type": "keyword"

     },

     "cont": {

       "type": "text",

       "analyzer": "ik_max_word",

       "fields": {

         "keyword": {

           "type": "keyword"

         }

       }

     },

     "id": {

       "type": "long"

     },

     "loc": {

       "type": "keyword"

     },

     "publish_time": {

       "type": "date"

     },

     "publish_timestamp": {

       "type": "keyword"

     },

     "update_time": {

       "type": "date"

     },

     "url": {

       "type": "keyword"

     },

     "wb_detail": {

       "type": "keyword"

     },

     "wb_id": {

       "type": "keyword"

     }

   }

 }

}

通过 default_pipeline 提前指定预处理管道的方式非常巧妙,避免了一次 reindex 操作。


相当于在写入环节同时做了数据的处理。


3.3 logstash 数据同步

之前同步讲的很多了,这里就不做具体字段含义的讲解,基本见名释义,很好理解。不明白的读者,留言讨论或者加 wx:elastic 6 讨论。


input {

 stdin {

 }

 jdbc {

 # mysql jdbc connection string to our backup databse  

 jdbc_connection_string => "jdbc:mysql://172.21.0.x:3306/weibo_base"

 # the user we wish to excute our statement as

 jdbc_user => "root"

 jdbc_password => "XXXXX"

 record_last_run => "true"

 use_column_value => "true"

 tracking_column => "id"

 last_run_metadata_path => "/home/elasticsearch/logstash-7.6.0/sync/test_info"

 clean_run => "false"

 # the path to our downloaded jdbc driver

 jdbc_driver_library => "/home/elasticsearch/mysql-connector-java-5.1.47.jar"

 # the name of the driver class for mysql

 jdbc_driver_class => "com.mysql.jdbc.Driver"

 jdbc_paging_enabled => "true"

 jdbc_page_size => "500"

# 以下对应着要执行的sql的绝对路径

 statement_filepath => "/home/elasticsearch/logstash-7.6.0/sync/jdbc_test.sql"

#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新

schedule => "* * * * *"

 }

}

filter {

}

output {

 elasticsearch {

#ESIP地址与端口

 hosts => "172.21.0.x:9200"

#ES索引名称(自己定义的)

 index => "test-003"

 user => "elastic"

 password => "XXXXXX"

#自增ID编号

 document_id => "%{id}"

 }

 stdout {

#以JSON格式输出

 codec => json_lines

 }

}

以上三步,搞定。


4、看效果

有图有真相。

数据源 json 字符串已经拆分为独立字段:area、loc、author 等。

拆分结果达到预期,就加了管道预处理一下,没有写一行脚本。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
json字符串还原map
今天有个需求,由于调用第三方接口一直调不通,但是又需要先把接口写好供前端调用,于是想到以前写的一个json字符串还原的类。这里做个笔记。 package com.
983 0
ML之DS:仅需一行代码实现对某字段下的所有数值实现同一机制的改变或转换(比如全部转为str类型/全部取平方值)
ML之DS:仅需一行代码实现对某字段下的所有数值实现同一机制的改变或转换(比如全部转为str类型/全部取平方值)
23 0
iOS剪切板UIPasteboard开发应用解析(一)
iOS剪切板UIPasteboard开发应用解析
17 0
SAS进阶《深入解析SAS》之Base SAS基础、读取外部数据到SAS数据集
SAS进阶《深入解析SAS》之Base SAS基础、读取外部数据到SAS数据集 前言:在学习完《SAS编程与商业案例》后,虽然能够接手公司的基本工作,但是为了更深入的SAS学习,也为了站在更高的一个层次上去掌握和优化公司工作,故而又咬紧牙关加紧学习《深入解析SAS》.就目前的两本书的对比,显而易见的是本书比《SAS编程…》要厚的多,再者内容更新是2015年著,最后是内容详实
1811 0
Python零基础学习代码实践 —— 提取字符串里面的单词数
str = input() str1 = str.strip() index = 0 count = 0 while index < len(str1): while str1[index] != " ": index += 1 if index == le...
1102 0
+关注
348
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载
《2021云上架构与运维峰会演讲合集》
立即下载