Logstash极简教程

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

简介

一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

1.png

输入:
2.png

输出:
3.png

logstash提供的功能依赖于logstash插件,默认安装包中已经集成了很多的开箱即用的插件,如
logstash-input-jdbc,logstash-output-elasticsearch,logstash-input-elasticsearch。你没看错,elasticsearch即可以作为logstash的输入,也可以作为输出。

Logstash的代码为JRuby,由于Jruby的运行于JVM中,那么它与Java就具备了天然的关联性。这为Java程序的集成工作提供了便利的基础。

下载

官网下载首页:https://www.elastic.co/cn/downloads/logstash

官网历史版本下载:https://www.elastic.co/cn/downloads/past-releases#logstash

官网7.15.2下载页:

4.png

安装

直接下载压缩包,解压之后里面包含windows和linux下的启动程序,windows下使用logstash.bat,linux下使用logstash。

windows

windwos下解压安装,安装之后得文件目录如下:
5.png

  • bin:可执行文件目录,启动程序为logstash.bat,logstash-plugin.bat为插件管理程序
  • config:配置文件目录
  • data:存放程序运行过程中产生得数据,和同步数据无关
  • jdk:安装完毕默认自带了一个jdk,启动得时候如果找不到java_home,就使用自己自带得jdk
  • lib:ruby的一些依赖库
  • logs:日志文件存放的目录,运行日志存放在logstash-plain.log文件中。需要注意的是此文件只会记录sql语句执行记录,不会记录从数据库查询到的结果。
  • TODO

启动

windows

为了测试一下安装是否成功,或者想体验一下logstash的输入和输出。进入config目录,新建一个logstash-test.conf配置文件,然后使用此配置文件启动logstash。

#启动命令:.\bin\logstash.bat  -f .\config\logstash-test.conf --path.data=/data/test
#测试:访问127.0.0.1:9600,能打开说明启动成功。在控制台输入hello,控制到输出会打印输入的信息
input {
    stdin{
    }
}
output {
    stdout{
    }
}

配置文件主要有两个模块,分别是input和output,功能是接收用户的控制台输入信息,并将其输出到控制台。

配置参考

1:注意尽量不要有空行
2:sql中不要出现type字段,如果必须同步表中的type列,使用别名,select type as new_type from user

全量同步mysql数据到es

本例在CentOS中实现,将每分钟全量同步一次mysql中的视图数据到es中。

本例要实现的功能是将mysql中的数据同步到es中,整体流程为:

  • 先在mysql数据库中建立视图,视图的字段根据es索引库的字段要求来设置
  • 准备es和logstash,确保mysql、logstash、es所在机器之间没有网络问题
  • es中不必预先创建索引库,logstash在同步过程中会自动创建索引库
  • logstash定时访问mysql并执行查询视图的sql语句,将查询结果同步到es中
    input {
    stdin {
    }
    jdbc {
    jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/cml"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
    schedule => "* * * * *" 
    type => "type_cml"
    }
    }
    filter {
    json {
    source => "message"
    remove_field => ["message"]
    }
    }
    output {
    elasticsearch {
    hosts => "192.168.1.88:9955"
    index => "cml"
    document_id => "%{id}"
    }
    stdout {
    codec => json_lines
    }
    }
    

配置解释:

  • statement_filepath:要执行的sql语句所在文件,本例中该文件内容为:select * from view_person_info
  • schedule:同步周期,这里并不是cron表达式,5个星分别代表分、时、天、月、年,本例中即每分钟第0秒执行一次
  • type:为本input指定一个类型,此类型并不是指同步到es中,在es中新建的类型
  • document_id:es中文档的id,一般使用mysql数据库中的id

启动logstash命令:

进入bin,执行  sh logstash -f ../config/test_person_info.conf

多个input同步mysql数据到不同es索引库

本例在windows中实现,5个input分别查询mysql中不同的5个视图,将查询结果分别同步到5个不同的es索引库


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\standard_address_info.sql"
  schedule => "* * * * *"
  type => "type_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\person_info.sql"
  schedule => "* * * * *"
  type => "type_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  type => "type_house_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\company_info.sql"
  schedule => "* * * * *"
  type => "type_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\adcode_fence_info.sql"
  schedule => "* * * * *"
  type => "type_adcode_fence_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  if[type]=="type_adcode_fence_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_adcode_fence_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_company_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_company_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_house_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_house_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_person_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_person_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_standard_address_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_standard_address_info"
      document_id => "%{id}"
    }
  }
  stdout {
  codec => json_lines
  }
}

启动命令:

进入 bin 目录,执行 .\logstash.bat -f ..\config\cml.conf

增量同步

增量同步是指根据数据库中某表的某列作为标识,根据该标识的值同步数据,同步一次之后将标识的值更新为和当前数据库一致,下次同步的时候按照大于当前标识值进行同步。常见的在电商系统中,根据商品id增量同步数据。

本例演示根据id同步数据,需按照如下要求配置 input-jdbc 插件:

  • use_column_value 设置为 true,表示新增一个追踪标识,该标识使用数据库字段列的值。设置为时false,:sql_last_value反映上一次执行查询的时间
  • tracking_column 设置为 id ,即使用数据库中的id 列作为追踪标识。
    如果 use_column_value 为真,需配置此参数. 数据库中,该 column 必须是递增的。
  • tracking_column_type:可选的值有两个["numeric", "timestamp"],也就是说我们只能根据数据库中某一个数值类型或者时间类型设置 user_column_value。
  • record_last_run 设置为 true ,表示记录上次执行结果, 如果为真,将会把上次执行的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  • last_run_metadata_path 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  • clean_run 设置为false
  • 在sql中添加where条件,如 SELECT * from test_table where id > :sql_last_value

6.png

house_info.conf


input {
  stdin {
  }
  jdbc {
  #mysql jdbc connection string to our backup databse  后面的test对应mysql中的test数据库
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_table"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "5"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  #使用其它字段追踪,而不是用时间
  use_column_value => true
  #追踪的字段,如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
  tracking_column => id
  #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  record_last_run => true
  #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  last_run_metadata_path => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\lastRunMetadata\house_info.txt"
  #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  clean_run => false
  #是否将 column 名称转小写
  lowercase_column_names => false
  #设定ES索引类型
  type => "type_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  elasticsearch {
  hosts => "192.168.1.78:9955"
  index => "increment_house_info"
  document_id => "%{id}"
  }
  stdout {
  codec => json_lines
  }
}

house_info.sql


SELECT * from test_table where id > :sql_last_value

house_info.txt


--- 1440

若运行过程中,修改house_info.txt 中的追踪标识的值,无法生效,需要重新启动logstash

多个input增量同步

increment.conf


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

5个input对应5个sql、5个txt。5个sql中使用 sql_last_value 作为变量

7.png

txt的内容为:

8.png

sql的内容为:

9.png

解决时区问题

logstash默认使用的时区是 UTC (世界标准时间),此时间比中国地区时间早8小时,那么就会出现一下问题:

  • ES中的@timestamp 字段的值比当前时间小8小时
  • 数据库中的update_time、create_time等日期、时间类型的字段值同步到ES后,ES中显示的数据和数据库数据不一致
  • 做增量同步的时候,若根据 update_time 字段进行查询,会出现不准确的情况
  • ......

解决办法:

  • jdbc_default_timezone :input-jdbc 插件的此配置需设置为中国时区,如 "Asia/Shanghai"
  • plugin_timezone: input-jdbc 插件的此配置需设置为 local
  • 增加过滤器,在过滤器中编写 ruby 脚本,读取数据库中的日期、时间字段、并增加8小时

test_total.conf 配置参考:


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_house_info"
  }
}
filter {
  #start,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #将@timestamp字段加8小时,赋值给一个变量temptimestamp
  ruby {
    code => "event.set('temptimestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  #将变量temptimestamp的值赋值给@timestamp
  ruby {
    code => "event.set('@timestamp',event.get('temptimestamp'))"
  }
  #删除临时变量temptimestamp
  mutate {
    remove_field => ["temptimestamp"]
  }
  #end,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #start,解决mysql中的update_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_update_time', event.get('update_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('update_time',event.get('temp_update_time'))"
  }
  mutate {
    remove_field => ["temp_update_time"]
  }
  #end,解决mysql中的update_time字段的值同步到es后少8小时的问题
  #start,解决mysql中的create_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_create_time', event.get('create_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('create_time',event.get('temp_create_time'))"
  }
  mutate {
    remove_field => ["temp_create_time"]
  }
  #end,解决mysql中的create_time字段的值同步到es后少8小时的问题
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

多库同步、增加过滤器

//todo

多库同步,将两个mysql库中的数据同步到同一个es索引库中,如从商品所在数据库获取商品数据,从库存所数据库获取库存数据,然后将他们的信息同步到es中。

增加过滤器,如使用过滤器进行脱敏处理;增加非数据库字段

json同步

//todo
mysql8 , 表字段为json,存储数据为json(非json字符串,就是json),同步到es失败问题

插件

简介

logstash通过各种插件的配合来实现数据同步的功能,插件可分为三类:输入插件、过滤插件、输出插件。如 input-jdbc 就是一个输入插件,用来从数据库中获取数据。

常用命令

查看可用插件
bin/logstash-plugin list

安装新插件
方案1、从https://github.com/logstash-plugins/安装新插件
bin/logstash-plugin install logstash-output-exec

方案2、本地安装
bin/logstash-plugin install /data/my-plugin.gem

schedule

logstash中的 schedule 使用的并不是 linux的cron表达式,5个星分别代表分、时、天、月、年,可以支持时区。

例子:

  • " *":每年每月每天每小时每分钟执行一次
  • " 5 1-3 *":每年1至3月每天上午5点的每分钟执行一次
  • "0 ":每年每月每日每小时的第0分钟执行一次
  • "0 6 * America/Chicago":将在美国芝加哥时间(即UTC/GMT -5)的每天上午6点0分执行一次

pipelines

windows中,在配置 path.config 的时候路径需要使用【\】,如: path.config: D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\test_total_increment.conf

简介

piplines 用于以多实例的方式启动 logstash

配置参考

以下示例配置了两个 pipelines,第一个 syncmain 根据id同步数据库数据到es,第二个 syncupdate 根据 update_time 同步数据库数据到es。

pipelines.yml

- pipeline.id: sync_main
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_total.conf"
- pipeline.id: sync_update
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_update.conf"

配置说明:

  • pipeline.workers: 有几个逻辑cpu就设置为几个
  • path.config:logstash 配置文件

启动命令:

进入bin目录,执行     sh logstash -r

添加 pipelines 后,logstash 将不以 -f test_total.conf 的方式启动。-r 表示自动 reload 配置文件的更新,也就是说更新了 test_total.conf 之后,无需更新启动logstash ,配置项会动态自动加载。

引用

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
15小时前
|
存储 JSON Java
ELK 圣经:Elasticsearch、Logstash、Kibana 从入门到精通
ELK是一套强大的日志管理和分析工具,广泛应用于日志监控、故障排查、业务分析等场景。本文档将详细介绍ELK的各个组件及其配置方法,帮助读者从零开始掌握ELK的使用。
|
存储 数据可视化 Java
Logstash快速入门
Logstash快速入门
149 1
|
存储 Linux 数据处理
Logstash 7.11安装配置
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构,从 IP 地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择,可以同时从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
326 0
Logstash 7.11安装配置
|
存储 数据可视化 网络协议
ElasticStack常用工具:ElasticSearch、Kibana、Beats、Logstash
ElasticStack常用工具:ElasticSearch、Kibana、Beats、Logstash
92 0
|
消息中间件 JSON NoSQL
logstash 7.6.2 基础教程
logstash 7.6.2 基础教程
304 0
|
Java 关系型数据库
Logstash 安装
https://www.elastic.co/cn/downloads/logstash官网一、下载logstash[root@jiaxin-ceshi ~]# cd /usr/local/src/[root@jiaxin-ceshi src]# wget https://artifacts.
9008 0
|
编解码
collectd 与 logstash配置
节点 node1: 配置logstash node2: 配置collectd, collectd收集本地的信息, 通过配置将信息发送到node1节点 node1安装配置logstash rpm -ivh logstash.
1229 0
|
编解码 索引 开发者
logstash的使用教程
一、简单使用 cd logstash_HOME bin/logstash -e 'input { stdin { } } output { stdout {} }' 启动 Logstash 后,再键入 Hello hiekay,结果如下: image.png 在生产环境中,Logstash 的管道要复杂很多,可能需要配置多个输入、过滤器和输出插件。
2799 0
|
编解码 监控 NoSQL
Logstash 讲解与实战应用
原文网址:http://blog.51cto.com/tchuairen/1840596 一、Logstash 介绍 Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
914 0