Elasticsearch JDBC的使用-MySQL 数据源导入和增量索引、更新

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介:

在使用 Elasticsearch 的时候,经常会涉及到要将其它数据源的数据导入到 Elasticsearch 中,今天就来介绍一下关于 Elasticsearch 从 MySQL 导入数据和增量索引的实现 
这里要用到一个 Elasticsearch 的插件 elasticsearch-jdbc

需要的资源和版本 
Elasticsearch 版本:2.2.0 CSDN下载 
elasticsearch-jdbc 版本 : 2.2 CSDN下载

一、安装 jdbc

jdbc 的压缩包我已经放在了 /usr/local/src/ 目录下,可以去它的 GitHub地址 获取对应版本的压缩包

cd /usr/local/src/unzip ./elasticsearch-jdbc-2.2.0.0-dist.zip cp -r ./elasticsearch-jdbc-2.2.0.0 /usr/local/elasticsearch-2.2.0/jdbc2.2123123

这样就可以使用啦,jdbc 还提供了一些常用的例子,在 【ES安装目录/jdbc2.2/bin/ 】这个文件夹下,改一改就可以用,都是bash 文件,记得加运行权限哦

二、使用jdbc

我们先在 MySQL中创建一个用于测试的数据表 article ,并添加几条数据 
(注意, update_time 字段我加了ON UPDATE CURRENT_TIMESTAMP,数据发生改变就会更新此字段)

DROP TABLE IF EXISTS `article`;CREATE TABLE `article` (  `id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,  `subject` varchar(150) NOT NULL,  `author` varchar(15) DEFAULT NULL,  `create_time` timestamp NULL DEFAULT NULL,  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;# 数据INSERT INTO `article` VALUES ('1', '"闺蜜"崔顺实被韩检方传唤 韩总统府促彻查真相', 'jam', '2016-10-31 17:49:21', '2016-10-31 17:50:21');INSERT INTO `article` VALUES ('2', '韩举行"护国训练" 青瓦台:决不许国家安全出问题', 'jam00', '2016-10-31 17:50:39', '2016-10-31 17:50:51');INSERT INTO `article` VALUES ('3', '媒体称FBI已经取得搜查令 检视希拉里电邮', 'tomi', '2016-10-31 17:51:03', '2016-10-31 17:51:08');INSERT INTO `article` VALUES ('4', '村上春树获安徒生奖 演讲中谈及欧洲排外问题', 'jason', '2016-10-31 17:51:38', '2016-10-31 17:51:41');INSERT INTO `article` VALUES ('5', '希拉里团队炮轰FBI 参院民主党领袖批其“违法”', 'tommy', '2016-10-31 17:52:07', '2016-10-31 17:52:09');123456789101112131415123456789101112131415

1、数据源导入

首先执行全部数据导入(注:ES 使用的是默认配置) 
我们写一个名叫 mysql-article.sh 的bash脚本,并放在 /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 下面,脚本内容如下(内容注释会在后面给出)

#执行/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh  #文件内容如下#!/bin/shDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" bin=${DIR}/../bin lib=${DIR}/../lib echo ' {     "type" : "jdbc",     "jdbc" : {         "url" : "jdbc:mysql://localhost:3306/test",         "user" : "root",         "password" : "123456",         "sql" : "select *, id as _id from article",         "index" : "jdbctest",         "type" : "article",         "index_settings" : {             "analysis" : {                 "analyzer" : {                     "ik" : {                         "tokenizer" : "ik"                     }                 }             }         },         "type_mapping": {             "article" : {                 "properties" : {                     "id" : {                         "type" : "integer",                         "index" : "not_analyzed"                     },                     "subject" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "author" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "create_time" : {                         "type" : "date"                     },                     "update_time" : {                         "type" : "date"                     }                 }             }         }     } } ' | java \     -cp "${lib}/*" \     -Dlog4j.configurationFile=${bin}/log4j2.xml \     org.xbib.tools.Runner \     org.xbib.tools.JDBCImporter1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575812345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758

执行后会自动创建 jdbctest 索引(若不存在) ,article 类型 和几个对应的字段,这里因为有中文,我使用了 ik 分词器(如何使用?) 
若执行失败,请查看日志文件,jdbc 的日志存放在 /usr/local/elasticsearch-2.2.0/logs/jdbc.log 
查看是否导入成功

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty'#返回{  "took" : 33,  "timed_out" : false,  "_shards" : {    "total" : 5,    "successful" : 5,    "failed" : 0   },  "hits" : {    "total" : 5,    "max_score" : 1.0,    "hits" : [ {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "5",      "_score" : 1.0,      "_source" : {        "id" : 5,        "subject" : "希拉里团队炮轰FBI 参院民主党领袖批其“违法”",        "author" : "tommy",        "create_time" : "2016-10-31T17:52:07.000+08:00",        "update_time" : "2016-10-31T17:52:09.000+08:00"       }     }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "2",      "_score" : 1.0,      "_source" : {        "id" : 2,        "subject" : "韩举行"护国训练" 青瓦台:决不许国家安全出问题",        "author" : "jam00",        "create_time" : "2016-10-31T17:50:39.000+08:00",        "update_time" : "2016-10-31T17:50:51.000+08:00"       }     }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "4",      "_score" : 1.0,      "_source" : {        "id" : 4,        "subject" : "村上春树获安徒生奖 演讲中谈及欧洲排外问题",        "author" : "jason",        "create_time" : "2016-10-31T17:51:38.000+08:00",        "update_time" : "2016-10-31T17:51:41.000+08:00"       }     }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "1",      "_score" : 1.0,      "_source" : {        "id" : 1,        "subject" : ""闺蜜"崔顺实被韩检方传唤 韩总统府促彻查真相",        "author" : "jam",        "create_time" : "2016-10-31T17:49:21.000+08:00",        "update_time" : "2016-10-31T17:50:21.000+08:00"       }     }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "3",      "_score" : 1.0,      "_source" : {        "id" : 3,        "subject" : "媒体称FBI已经取得搜查令 检视希拉里电邮",        "author" : "tomi",        "create_time" : "2016-10-31T17:51:03.000+08:00",        "update_time" : "2016-10-31T17:51:08.000+08:00"       }     } ]   } }1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757612345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576

内容已成功导入到 Elasticsearch 中

2、增量索引、更新

如果我们对数据做了更改或是有新数据加入,若再执行全部导入,就有点得不偿失了 
这里我们就要用到jdbc 的两个属性 statefile(状态文件) 和 schedule(计划任务时间),并且 sql 语句也要改成动态的 
改动如下

"statefile" : "statefile-article.json","schedule" : "0 0-59 0-23 ? * *","sql" : [     {        "statement" : "select *, id as _id from article where update_time > ?",        "parameter" : [ "$metrics.lastexecutionstart" ]     } ],1234567812345678

改动后的完整文件 mysql-article.sh

#!/bin/shDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" bin=${DIR}/../bin lib=${DIR}/../lib echo ' {     "type" : "jdbc",     "jdbc" : {         "url" : "jdbc:mysql://localhost:3306/test",         "user" : "root",         "password" : "123456",         "statefile" : "statefile-article.json",         "schedule" : "0 0-59 0-23 ? * *",         "sql" : [             {                 "statement" : "select *, id as _id from article where update_time > ?",                 "parameter" : [ "$metrics.lastexecutionstart" ]             }         ],         "index" : "jdbctest",         "type" : "article",         "index_settings" : {             "analysis" : {             "analyzer" : {                 "ik" : {                     "tokenizer" : "ik"                 }             }         }         },         "type_mapping": {             "article" : {                 "properties" : {                     "id" : {                         "type" : "integer",                         "index" : "not_analyzed"                     },                     "subject" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "author" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "create_time" : {                         "type" : "date"                     },                     "update_time" : {                         "type" : "date"                     }                 }             }         }     } } ' | java \     -cp "${lib}/*" \     -Dlog4j.configurationFile=${bin}/log4j2.xml \     org.xbib.tools.Runner \     org.xbib.tools.JDBCImporter12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061621234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162

运行该文件 :/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 
可以看到 命令行端 被占用,一直在运行,并且在 mysql-article.sh 的同级目录下生成了一个 statefile-article.json 的文件,sql 语句中需要的数据 lastexecutionstart 就保存在该文件中 
现在我们来改动一下MySQL 中的数据,增加一条数据,并修改一条 id 等于 5 的数据

INSERT INTO article() VALUES(NULL,'测试JDBC','jam00','2016-11-01 13:34:15','2016-11-01 13:34:15');UPDATE article SET `subject`='测试JDBC-改动' WHERE id=5;1212

最多等一分钟,再看看ES 中的数据

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty' -d '{     "sort": {          "id": { "order": "desc" }     } }'# 返回 ..."hits" : [ {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "6",      "_score" : null,      "_source" : {        "id" : 6,        "subject" : "测试JDBC",        "author" : "jam00",        "create_time" : "2016-11-01T13:34:15.000+08:00",        "update_time" : "2016-11-01T13:34:15.000+08:00"       },      "sort" : [ 6 ]     }, {      "_index" : "jdbctest",      "_type" : "article",      "_id" : "5",      "_score" : null,      "_source" : {        "id" : 5,        "subject" : "测试JDBC-改动",        "author" : "tommy",        "create_time" : "2016-10-31T17:52:07.000+08:00",        "update_time" : "2016-11-01T13:35:41.000+08:00"       },      "sort" : [ 5 ]     }...12345678910111213141516171819202122232425262728293031323334351234567891011121314151617181920212223242526272829303132333435

测试成功。 
为了让 mysql-article.sh 后台执行,我们可以使用 nohup 命令

nohup /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh &11

当我们想停止执行的时候。

ps aux |grep jdbc2.2#返回root     26118  0.0  0.1 106092  1212 pts/0    S    14:03   0:00 /bin/sh /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh root     26123 11.0  4.4 1079192 44932 pts/0   Sl   14:03   0:00 java -cp /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../lib/* -Dlog4j.configurationFile=/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../bin/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter# 使用 kill 命令关闭进程, 26123 就是上面一句返回的进程号,不用杀掉 26118 ,杀掉26123 这个进程,26118 进程会自动关闭kill -9 26123 12345671234567

至此,MySQL 数据源的 增量索引和更新就完成了。

3、bash 文件释义

增量索引的bash文件注释如下,更多详细配置请查阅官方文档

#!/bin/sh# 当前脚本的绝对路径DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" bin=${DIR}/../bin lib=${DIR}/../lib echo ' {     "type" : "jdbc",     "jdbc" : {         # 链接 mysql 的 test 数据库         "url" : "jdbc:mysql://localhost:3306/test",         # mysql 用户         "user" : "root",         # mysql 密码         "password" : "123456",         # 计划任务状态文件         "statefile" : "statefile-article.json",         # 计划任务时间 这里是每分钟执行一次         "schedule" : "0 0-59 0-23 ? * *",         # 执行导入的sql 语句         "sql" : [             {                 "statement" : "select *, id as _id from article where update_time > ?",                 "parameter" : [ "$metrics.lastexecutionstart" ]             }         ],         # 索引名称 jdbctest         "index" : "jdbctest",         # 类型名称 article         "type" : "article",         # 类型设置         "index_settings" : {             "analysis" : {             "analyzer" : {                 "ik" : {                         # 涉及到中文使用ik 分词                     "tokenizer" : "ik"                 }             }         }         },         # 类型中的字段映射         "type_mapping": {             # 类型名称             "article" : {                 "properties" : {                     # 对应的字段                     "id" : {                         # 字段类型                         "type" : "integer",                         # 当成一个准确的值进行索引(全匹配)                         "index" : "not_analyzed"                     },                     "subject" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "author" : {                         "type" : "string",                         "analyzer" : "ik"                     },                     "create_time" : {                         "type" : "date"                     },                     "update_time" : {                         "type" : "date"                     }                 }             }         }     } } ' | java \     -cp "${lib}/*" \     -Dlog4j.configurationFile=${bin}/log4j2.xml \     org.xbib.tools.Runner \     org.xbib.tools.JDBCImporter123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778

这里选几个属性来介绍一下 
url数据库链接串,所以把这个链接串改成其它数据源,这个脚本也可以使用(前提是那个数据源中有对应的 article 表) 
statefile :计划任务状态文件名称。它长这样:

{   "type" : "jdbc",   "jdbc" : {     "index_settings" : {       "analysis" : {         "analyzer" : {           "ik" : {             "tokenizer" : "ik"           }        }      }    },     "index" : "jdbctest",     "schedule" : "0 0-59 0-23 ? * *",     "sql" : [ {       "statement" : "select *, id as _id from article where update_time > ?",       "parameter" : [ "$metrics.lastexecutionstart" ]    } ],     "metrics" : {       "lastexecutionend" : "2016-11-01T06:01:01.441Z",       "lastexecutionstart" : "2016-11-01T06:01:01.125Z",       "counter" : "23"     },     "type" : "article",     "statefile" : "statefile-article.json",     "user" : "root",     "password" : "123456",     "url" : "jdbc:mysql://localhost:3306/test",     "type_mapping" : {       "article" : {         "properties" : {           "create_time" : {             "type" : "date"           },           "id" : {             "type" : "integer",             "index" : "not_analyzed"           },           "author" : {             "type" : "string",             "analyzer" : "ik"           },           "update_time" : {             "type" : "date"           },           "subject" : {             "type" : "string",             "analyzer" : "ik"           }        }      }    }  }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354

其实 jdbc 每次执行的就是这个文件,执行完成后就覆盖此文件,改变的只是 metrics 属性内的时间,而 lastexecutionstart 这个时间就是我们下面 sql 语句要用到的最后更新时间 
schedule : 计划任务时间表。表示多久执行一次更新。下面有几个例子 
0 0-59 0-23 ? * *:每分钟执行一次 
0 0/5 0-23 ? * * :每五分钟执行一次;当分钟等于 0,5,10,15…55的时候执行 
我还是贴一个官方的字段描述

字段名称 允许的值 允许的特殊字符
Seconds 0-59 , - * /
Minutes 0-59 , - * /
Hours 0-23 , - * /
Day-of-month 1-31 , - * ? / L W
Month 1-12 or JAN-DEC , - * /
Day-of-Week 1-7 or SUN-SAT , - * ? / L #
Year (Optional) empty, 1970-2199 , - * /

详细注释请点击查看

sql:支持两种方式,一种是直接写sql语句,一种是有条件的sql语句。一般我们会在sql语句中使用”field as _id “这样的方式来指定这条数据在ES 中的唯一标识(field字段为唯一标识) 
parameter 属性中的可选的动态参数有

$now - the current timestamp $state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION $metrics.counter - a counter $lastrowcount - number of rows from last statement $lastexceptiondate - SQL timestamp of last exception $lastexception - full stack trace of last exception $metrics.lastexecutionstart - SQL timestamp of the time when last execution started $metrics.lastexecutionend - SQL timestamp of the time when last execution ended $metrics.totalrows - total number of rows fetched $metrics.totalbytes - total number of bytes fetched $metrics.failed - total number of failed SQL executions $metrics.succeeded - total number of succeeded SQL executions123456789101112123456789101112

在上面例子中的 sql

select *, id as _id from article where update_time > ?

表示获取更新时间(update_time)大于 最后执行时间($metrics.lastexecutionstart)的所有数据

其它如 index、type_mapping 之类的属性就不一一介绍了,很容易理解

赶快动手试一下吧!

http://blog.csdn.net/jam00/article/details/52984382

本文转自  陈小龙哈   51CTO博客,原文链接:http://blog.51cto.com/chenxiaolong/1895117

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
18天前
|
存储 关系型数据库 MySQL
阿里面试:为什么要索引?什么是MySQL索引?底层结构是什么?
尼恩是一位资深架构师,他在自己的读者交流群中分享了关于MySQL索引的重要知识点。索引是帮助MySQL高效获取数据的数据结构,主要作用包括显著提升查询速度、降低磁盘I/O次数、优化排序与分组操作以及提升复杂查询的性能。MySQL支持多种索引类型,如主键索引、唯一索引、普通索引、全文索引和空间数据索引。索引的底层数据结构主要是B+树,它能够有效支持范围查询和顺序遍历,同时保持高效的插入、删除和查找性能。尼恩还强调了索引的优缺点,并提供了多个面试题及其解答,帮助读者在面试中脱颖而出。相关资料可在公众号【技术自由圈】获取。
|
1月前
|
SQL 关系型数据库 MySQL
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
|
26天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
58 3
Mysql(4)—数据库索引
|
20天前
|
存储 关系型数据库 MySQL
如何在MySQL中进行索引的创建和管理?
【10月更文挑战第16天】如何在MySQL中进行索引的创建和管理?
42 1
|
22天前
|
Java 关系型数据库 MySQL
mysql5.7 jdbc驱动
遵循上述步骤,即可在Java项目中高效地集成MySQL 5.7 JDBC驱动,实现数据库的访问与管理。
66 1
|
9天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
46 0
|
10天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
41 0
|
21天前
|
监控 关系型数据库 MySQL
mysql8索引优化
综上所述,深入理解和有效实施这些索引优化策略,是解锁MySQL 8.0数据库高性能查询的关键。
28 0
|
25天前
|
SQL 关系型数据库 MySQL
美团面试:mysql 索引失效?怎么解决? (重点知识,建议收藏,读10遍+)
本文详细解析了MySQL索引失效的多种场景及解决方法,包括破坏最左匹配原则、索引覆盖原则、前缀匹配原则、`ORDER BY`排序不当、`OR`关键字使用不当、索引列上有计算或函数、使用`NOT IN`和`NOT EXISTS`不当、列的比对等。通过实例演示和`EXPLAIN`命令分析,帮助读者深入理解索引失效的原因,并提供相应的优化建议。文章还推荐了《尼恩Java面试宝典》等资源,助力面试者提升技术水平,顺利通过面试。
|
28天前
|
关系型数据库 MySQL 数据库
深入浅出MySQL索引优化:提升数据库性能的关键
在这个数据驱动的时代,数据库性能的优劣直接关系到应用的响应速度和用户体验。MySQL作为广泛使用的数据库之一,其索引优化是提升查询性能的关键。本文将带你一探MySQL索引的内部机制,分析索引的类型及其适用场景,并通过实际案例演示如何诊断和优化索引,以实现数据库性能的飞跃。

热门文章

最新文章

下一篇
无影云桌面