简介
一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。
输入:
输出:
logstash提供的功能依赖于logstash插件,默认安装包中已经集成了很多的开箱即用的插件,如
logstash-input-jdbc,logstash-output-elasticsearch,logstash-input-elasticsearch。你没看错,elasticsearch即可以作为logstash的输入,也可以作为输出。
Logstash的代码为JRuby,由于Jruby的运行于JVM中,那么它与Java就具备了天然的关联性。这为Java程序的集成工作提供了便利的基础。
下载
官网下载首页:https://www.elastic.co/cn/downloads/logstash
官网历史版本下载:https://www.elastic.co/cn/downloads/past-releases#logstash
官网7.15.2下载页:
安装
直接下载压缩包,解压之后里面包含windows和linux下的启动程序,windows下使用logstash.bat,linux下使用logstash。
windows
windwos下解压安装,安装之后得文件目录如下:
- bin:可执行文件目录,启动程序为logstash.bat,logstash-plugin.bat为插件管理程序
- config:配置文件目录
- data:存放程序运行过程中产生得数据,和同步数据无关
- jdk:安装完毕默认自带了一个jdk,启动得时候如果找不到java_home,就使用自己自带得jdk
- lib:ruby的一些依赖库
- logs:日志文件存放的目录,运行日志存放在logstash-plain.log文件中。需要注意的是此文件只会记录sql语句执行记录,不会记录从数据库查询到的结果。
- TODO
启动
windows
为了测试一下安装是否成功,或者想体验一下logstash的输入和输出。进入config目录,新建一个logstash-test.conf配置文件,然后使用此配置文件启动logstash。
#启动命令:.\bin\logstash.bat -f .\config\logstash-test.conf --path.data=/data/test
#测试:访问127.0.0.1:9600,能打开说明启动成功。在控制台输入hello,控制到输出会打印输入的信息
input {
stdin{
}
}
output {
stdout{
}
}
配置文件主要有两个模块,分别是input和output,功能是接收用户的控制台输入信息,并将其输出到控制台。
配置参考
1:注意尽量不要有空行
2:sql中不要出现type字段,如果必须同步表中的type列,使用别名,select type as new_type from user
全量同步mysql数据到es
本例在CentOS中实现,将每分钟全量同步一次mysql中的视图数据到es中。
本例要实现的功能是将mysql中的数据同步到es中,整体流程为:
- 先在mysql数据库中建立视图,视图的字段根据es索引库的字段要求来设置
- 准备es和logstash,确保mysql、logstash、es所在机器之间没有网络问题
- es中不必预先创建索引库,logstash在同步过程中会自动创建索引库
- logstash定时访问mysql并执行查询视图的sql语句,将查询结果同步到es中
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/cml" jdbc_user => "root" jdbc_password => "123456" jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql" schedule => "* * * * *" type => "type_cml" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => "192.168.1.88:9955" index => "cml" document_id => "%{id}" } stdout { codec => json_lines } }
配置解释:
- statement_filepath:要执行的sql语句所在文件,本例中该文件内容为:select * from view_person_info
- schedule:同步周期,这里并不是cron表达式,5个星分别代表分、时、天、月、年,本例中即每分钟第0秒执行一次
- type:为本input指定一个类型,此类型并不是指同步到es中,在es中新建的类型
- document_id:es中文档的id,一般使用mysql数据库中的id
启动logstash命令:
进入bin,执行 sh logstash -f ../config/test_person_info.conf
多个input同步mysql数据到不同es索引库
本例在windows中实现,5个input分别查询mysql中不同的5个视图,将查询结果分别同步到5个不同的es索引库
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\standard_address_info.sql"
schedule => "* * * * *"
type => "type_standard_address_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\person_info.sql"
schedule => "* * * * *"
type => "type_person_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
schedule => "* * * * *"
type => "type_house_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\company_info.sql"
schedule => "* * * * *"
type => "type_company_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\adcode_fence_info.sql"
schedule => "* * * * *"
type => "type_adcode_fence_info"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if[type]=="type_adcode_fence_info"{
elasticsearch {
hosts => "192.168.1.78:9955"
index => "cml_type_adcode_fence_info"
document_id => "%{id}"
}
}
if[type]=="type_company_info"{
elasticsearch {
hosts => "192.168.1.78:9955"
index => "cml_type_company_info"
document_id => "%{id}"
}
}
if[type]=="type_house_info"{
elasticsearch {
hosts => "192.168.1.78:9955"
index => "cml_type_house_info"
document_id => "%{id}"
}
}
if[type]=="type_person_info"{
elasticsearch {
hosts => "192.168.1.78:9955"
index => "cml_type_person_info"
document_id => "%{id}"
}
}
if[type]=="type_standard_address_info"{
elasticsearch {
hosts => "192.168.1.78:9955"
index => "cml_type_standard_address_info"
document_id => "%{id}"
}
}
stdout {
codec => json_lines
}
}
启动命令:
进入 bin 目录,执行 .\logstash.bat -f ..\config\cml.conf
增量同步
增量同步是指根据数据库中某表的某列作为标识,根据该标识的值同步数据,同步一次之后将标识的值更新为和当前数据库一致,下次同步的时候按照大于当前标识值进行同步。常见的在电商系统中,根据商品id增量同步数据。
本例演示根据id同步数据,需按照如下要求配置 input-jdbc 插件:
- use_column_value 设置为 true,表示新增一个追踪标识,该标识使用数据库字段列的值。设置为时false,:sql_last_value反映上一次执行查询的时间
- tracking_column 设置为 id ,即使用数据库中的id 列作为追踪标识。
如果 use_column_value 为真,需配置此参数. 数据库中,该 column 必须是递增的。 - tracking_column_type:可选的值有两个["numeric", "timestamp"],也就是说我们只能根据数据库中某一个数值类型或者时间类型设置 user_column_value。
- record_last_run 设置为 true ,表示记录上次执行结果, 如果为真,将会把上次执行的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
- last_run_metadata_path 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
- clean_run 设置为false
- 在sql中添加where条件,如 SELECT * from test_table where id > :sql_last_value
house_info.conf
input {
stdin {
}
jdbc {
#mysql jdbc connection string to our backup databse 后面的test对应mysql中的test数据库
jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_table"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "5"
statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
schedule => "* * * * *"
#使用其它字段追踪,而不是用时间
use_column_value => true
#追踪的字段,如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
tracking_column => id
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
last_run_metadata_path => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\lastRunMetadata\house_info.txt"
#是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将 column 名称转小写
lowercase_column_names => false
#设定ES索引类型
type => "type_house_info"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "192.168.1.78:9955"
index => "increment_house_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
house_info.sql
SELECT * from test_table where id > :sql_last_value
house_info.txt
--- 1440
若运行过程中,修改house_info.txt 中的追踪标识的值,无法生效,需要重新启动logstash
多个input增量同步
increment.conf
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
clean_run => false
lowercase_column_names => false
type => "_doc_person_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
clean_run => false
lowercase_column_names => false
type => "_doc_adcode_fence_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
clean_run => false
lowercase_column_names => false
type => "_doc_standard_address_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
clean_run => false
lowercase_column_names => false
type => "_doc_company_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
clean_run => false
lowercase_column_names => false
type => "_doc_house_info"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="_doc_person_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_person_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_adcode_fence_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_standard_address_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_company_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_company_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_house_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_house_info"
document_id => "%{id}"
}
}
stdout {
codec => json_lines
}
}
5个input对应5个sql、5个txt。5个sql中使用 sql_last_value 作为变量
txt的内容为:
sql的内容为:
解决时区问题
logstash默认使用的时区是 UTC (世界标准时间),此时间比中国地区时间早8小时,那么就会出现一下问题:
- ES中的@timestamp 字段的值比当前时间小8小时
- 数据库中的update_time、create_time等日期、时间类型的字段值同步到ES后,ES中显示的数据和数据库数据不一致
- 做增量同步的时候,若根据 update_time 字段进行查询,会出现不准确的情况
- ......
解决办法:
- jdbc_default_timezone :input-jdbc 插件的此配置需设置为中国时区,如 "Asia/Shanghai"
- plugin_timezone: input-jdbc 插件的此配置需设置为 local
- 增加过滤器,在过滤器中编写 ruby 脚本,读取数据库中的日期、时间字段、并增加8小时
test_total.conf 配置参考:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
clean_run => false
lowercase_column_names => false
jdbc_default_timezone => "Asia/Shanghai"
plugin_timezone => "local"
type => "_doc_person_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
clean_run => false
lowercase_column_names => false
jdbc_default_timezone => "Asia/Shanghai"
plugin_timezone => "local"
type => "_doc_adcode_fence_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
clean_run => false
lowercase_column_names => false
jdbc_default_timezone => "Asia/Shanghai"
plugin_timezone => "local"
type => "_doc_standard_address_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
clean_run => false
lowercase_column_names => false
jdbc_default_timezone => "Asia/Shanghai"
plugin_timezone => "local"
type => "_doc_company_info"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
schedule => "* * * * *"
use_column_value => true
tracking_column => id
record_last_run => true
last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
clean_run => false
lowercase_column_names => false
jdbc_default_timezone => "Asia/Shanghai"
plugin_timezone => "local"
type => "_doc_house_info"
}
}
filter {
#start,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
#将@timestamp字段加8小时,赋值给一个变量temptimestamp
ruby {
code => "event.set('temptimestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
#将变量temptimestamp的值赋值给@timestamp
ruby {
code => "event.set('@timestamp',event.get('temptimestamp'))"
}
#删除临时变量temptimestamp
mutate {
remove_field => ["temptimestamp"]
}
#end,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
#start,解决mysql中的update_time字段的值同步到es后少8小时的问题
ruby {
code => "event.set('temp_update_time', event.get('update_time').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('update_time',event.get('temp_update_time'))"
}
mutate {
remove_field => ["temp_update_time"]
}
#end,解决mysql中的update_time字段的值同步到es后少8小时的问题
#start,解决mysql中的create_time字段的值同步到es后少8小时的问题
ruby {
code => "event.set('temp_create_time', event.get('create_time').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('create_time',event.get('temp_create_time'))"
}
mutate {
remove_field => ["temp_create_time"]
}
#end,解决mysql中的create_time字段的值同步到es后少8小时的问题
}
output {
if [type]=="_doc_person_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_person_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_adcode_fence_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_standard_address_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_company_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_company_info"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
if [type]=="_doc_house_info" {
elasticsearch {
hosts => "192.168.1.88:9955"
index => "test_house_info"
document_id => "%{id}"
}
}
stdout {
codec => json_lines
}
}
多库同步、增加过滤器
//todo
多库同步,将两个mysql库中的数据同步到同一个es索引库中,如从商品所在数据库获取商品数据,从库存所数据库获取库存数据,然后将他们的信息同步到es中。
增加过滤器,如使用过滤器进行脱敏处理;增加非数据库字段
json同步
//todo
mysql8 , 表字段为json,存储数据为json(非json字符串,就是json),同步到es失败问题
插件
简介
logstash通过各种插件的配合来实现数据同步的功能,插件可分为三类:输入插件、过滤插件、输出插件。如 input-jdbc 就是一个输入插件,用来从数据库中获取数据。
常用命令
查看可用插件
bin/logstash-plugin list
安装新插件
方案1、从https://github.com/logstash-plugins/安装新插件
bin/logstash-plugin install logstash-output-exec
方案2、本地安装
bin/logstash-plugin install /data/my-plugin.gem
schedule
logstash中的 schedule 使用的并不是 linux的cron表达式,5个星分别代表分、时、天、月、年,可以支持时区。
例子:
- " *":每年每月每天每小时每分钟执行一次
- " 5 1-3 *":每年1至3月每天上午5点的每分钟执行一次
- "0 ":每年每月每日每小时的第0分钟执行一次
- "0 6 * America/Chicago":将在美国芝加哥时间(即UTC/GMT -5)的每天上午6点0分执行一次
pipelines
windows中,在配置 path.config 的时候路径需要使用【\】,如: path.config: D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\test_total_increment.conf
简介
piplines 用于以多实例的方式启动 logstash
配置参考
以下示例配置了两个 pipelines,第一个 syncmain 根据id同步数据库数据到es,第二个 syncupdate 根据 update_time 同步数据库数据到es。
pipelines.yml
- pipeline.id: sync_main
pipeline.workers: 4
pipeline.batch.size: 1
queue.type: persisted
path.config: "/usr/local/logstash-7.14.0/config/test_total.conf"
- pipeline.id: sync_update
pipeline.workers: 4
pipeline.batch.size: 1
queue.type: persisted
path.config: "/usr/local/logstash-7.14.0/config/test_update.conf"
配置说明:
- pipeline.workers: 有几个逻辑cpu就设置为几个
- path.config:logstash 配置文件
启动命令:
进入bin目录,执行 sh logstash -r
添加 pipelines 后,logstash 将不以 -f test_total.conf 的方式启动。-r 表示自动 reload 配置文件的更新,也就是说更新了 test_total.conf 之后,无需更新启动logstash ,配置项会动态自动加载。