一、Logstash的介绍
Logstash 它是 ES 下的一款开源软件,它能够同时从多个来源采集数据、转换数据,然后将数据发送到 Eleasticsearch 中创建索引。
我们在项目中通常使用 Logstash 将数据库(如 MySQL)中的数据采用到ES索引中。也就是Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash使用。
二、下载Logstash
官网下载地址:https://www.elastic.co/cn/downloads/logstash
解压:
这是解压后的目录结构。
三、安装logstash-input-jdbc
为什么要安装logstash-input-jdbc呢?
因为版本不同,自身所带的插件也不同,本文所说的是6.x的版本。而6.x版本本身不带logstash-input-jdbc插件,需要手动安装。Logstash5.x以上版本本身自带有logstash-input-jdbc。
logstash-input-jdbc 是ruby开发的,先下载ruby并安装
下载地址: https://rubyinstaller.org/downloads/
下载2.5版本即可。
安装完成查看是否安装成功
安装成功后我们可以在logstash根目录下的以下目录查看对应的插件版本:
如果想省事的话,可以下载解压本文提供的logstash-6.2.1.zip,此logstash中已集成了logstash-input-jdbc插件。
四、配置mysql.conf
在logstash的config目录下配置mysql.conf文件供logstash使用,logstash会根据mysql.conf文件的配置的地址从MySQL中读取数据向ES中写入索引。(本文讲的数据库是MySQL)
参考 https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
配置输入数据源和输出数据源。
input { stdin { } jdbc { jdbc_connection_string=>"jdbc:mysql://localhost:3306/jy_brand?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"# the user we wish to excute our statement asjdbc_user=>"root"jdbc_password=>123456# the path to our downloaded jdbc driver jdbc_driver_library=>"F:/develop/maven/repository3/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"# the name of the driver class for mysqljdbc_driver_class=>"com.mysql.jdbc.Driver"jdbc_paging_enabled=>"true"jdbc_page_size=>"50000"#要执行的sql文件#statement_filepath => "/conf/course.sql"statement=>"select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"#定时配置schedule=>"* * * * *"record_last_run=>truelast_run_metadata_path=>"D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata" } } output { elasticsearch { #ES的ip地址和端口hosts=>"localhost:6200"#hosts => ["localhost:6200","localhost:6202","localhost:6203"]#ES索引库名称index=>"jy_brand"document_id=>"%{id}"document_type=>"doc"template=>"D:/ElasticSearch/logstash-6.2.1/config/jy_brand_template.json"template_name=>"jy_brand"template_overwrite=>"true" } stdout { #日志输出codec=>json_lines } }
特别说明:
1、ES采用UTC时区问题
ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时
wheretimestamp> date_add(:sql_last_value,INTERVAL 8 HOUR)
2、增量同步数据
logstash每个执行完成会在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata记录执行时间下次以此时间为基准进行增量同步数据到索引库。
完结!