Logstash极简教程

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

简介

一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

1.png

输入:
2.png

输出:
3.png

logstash提供的功能依赖于logstash插件,默认安装包中已经集成了很多的开箱即用的插件,如
logstash-input-jdbc,logstash-output-elasticsearch,logstash-input-elasticsearch。你没看错,elasticsearch即可以作为logstash的输入,也可以作为输出。

Logstash的代码为JRuby,由于Jruby的运行于JVM中,那么它与Java就具备了天然的关联性。这为Java程序的集成工作提供了便利的基础。

下载

官网下载首页:https://www.elastic.co/cn/downloads/logstash

官网历史版本下载:https://www.elastic.co/cn/downloads/past-releases#logstash

官网7.15.2下载页:

4.png

安装

直接下载压缩包,解压之后里面包含windows和linux下的启动程序,windows下使用logstash.bat,linux下使用logstash。

windows

windwos下解压安装,安装之后得文件目录如下:
5.png

  • bin:可执行文件目录,启动程序为logstash.bat,logstash-plugin.bat为插件管理程序
  • config:配置文件目录
  • data:存放程序运行过程中产生得数据,和同步数据无关
  • jdk:安装完毕默认自带了一个jdk,启动得时候如果找不到java_home,就使用自己自带得jdk
  • lib:ruby的一些依赖库
  • logs:日志文件存放的目录,运行日志存放在logstash-plain.log文件中。需要注意的是此文件只会记录sql语句执行记录,不会记录从数据库查询到的结果。
  • TODO

启动

windows

为了测试一下安装是否成功,或者想体验一下logstash的输入和输出。进入config目录,新建一个logstash-test.conf配置文件,然后使用此配置文件启动logstash。

#启动命令:.\bin\logstash.bat  -f .\config\logstash-test.conf --path.data=/data/test
#测试:访问127.0.0.1:9600,能打开说明启动成功。在控制台输入hello,控制到输出会打印输入的信息
input {
    stdin{
    }
}
output {
    stdout{
    }
}

配置文件主要有两个模块,分别是input和output,功能是接收用户的控制台输入信息,并将其输出到控制台。

配置参考

1:注意尽量不要有空行
2:sql中不要出现type字段,如果必须同步表中的type列,使用别名,select type as new_type from user

全量同步mysql数据到es

本例在CentOS中实现,将每分钟全量同步一次mysql中的视图数据到es中。

本例要实现的功能是将mysql中的数据同步到es中,整体流程为:

  • 先在mysql数据库中建立视图,视图的字段根据es索引库的字段要求来设置
  • 准备es和logstash,确保mysql、logstash、es所在机器之间没有网络问题
  • es中不必预先创建索引库,logstash在同步过程中会自动创建索引库
  • logstash定时访问mysql并执行查询视图的sql语句,将查询结果同步到es中
    input {
    stdin {
    }
    jdbc {
    jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/cml"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
    schedule => "* * * * *" 
    type => "type_cml"
    }
    }
    filter {
    json {
    source => "message"
    remove_field => ["message"]
    }
    }
    output {
    elasticsearch {
    hosts => "192.168.1.88:9955"
    index => "cml"
    document_id => "%{id}"
    }
    stdout {
    codec => json_lines
    }
    }
    

配置解释:

  • statement_filepath:要执行的sql语句所在文件,本例中该文件内容为:select * from view_person_info
  • schedule:同步周期,这里并不是cron表达式,5个星分别代表分、时、天、月、年,本例中即每分钟第0秒执行一次
  • type:为本input指定一个类型,此类型并不是指同步到es中,在es中新建的类型
  • document_id:es中文档的id,一般使用mysql数据库中的id

启动logstash命令:

进入bin,执行  sh logstash -f ../config/test_person_info.conf

多个input同步mysql数据到不同es索引库

本例在windows中实现,5个input分别查询mysql中不同的5个视图,将查询结果分别同步到5个不同的es索引库


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\standard_address_info.sql"
  schedule => "* * * * *"
  type => "type_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\person_info.sql"
  schedule => "* * * * *"
  type => "type_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  type => "type_house_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\company_info.sql"
  schedule => "* * * * *"
  type => "type_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\adcode_fence_info.sql"
  schedule => "* * * * *"
  type => "type_adcode_fence_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  if[type]=="type_adcode_fence_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_adcode_fence_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_company_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_company_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_house_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_house_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_person_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_person_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_standard_address_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_standard_address_info"
      document_id => "%{id}"
    }
  }
  stdout {
  codec => json_lines
  }
}

启动命令:

进入 bin 目录,执行 .\logstash.bat -f ..\config\cml.conf

增量同步

增量同步是指根据数据库中某表的某列作为标识,根据该标识的值同步数据,同步一次之后将标识的值更新为和当前数据库一致,下次同步的时候按照大于当前标识值进行同步。常见的在电商系统中,根据商品id增量同步数据。

本例演示根据id同步数据,需按照如下要求配置 input-jdbc 插件:

  • use_column_value 设置为 true,表示新增一个追踪标识,该标识使用数据库字段列的值。设置为时false,:sql_last_value反映上一次执行查询的时间
  • tracking_column 设置为 id ,即使用数据库中的id 列作为追踪标识。
    如果 use_column_value 为真,需配置此参数. 数据库中,该 column 必须是递增的。
  • tracking_column_type:可选的值有两个["numeric", "timestamp"],也就是说我们只能根据数据库中某一个数值类型或者时间类型设置 user_column_value。
  • record_last_run 设置为 true ,表示记录上次执行结果, 如果为真,将会把上次执行的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  • last_run_metadata_path 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  • clean_run 设置为false
  • 在sql中添加where条件,如 SELECT * from test_table where id > :sql_last_value

6.png

house_info.conf


input {
  stdin {
  }
  jdbc {
  #mysql jdbc connection string to our backup databse  后面的test对应mysql中的test数据库
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_table"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "5"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  #使用其它字段追踪,而不是用时间
  use_column_value => true
  #追踪的字段,如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
  tracking_column => id
  #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  record_last_run => true
  #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  last_run_metadata_path => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\lastRunMetadata\house_info.txt"
  #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  clean_run => false
  #是否将 column 名称转小写
  lowercase_column_names => false
  #设定ES索引类型
  type => "type_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  elasticsearch {
  hosts => "192.168.1.78:9955"
  index => "increment_house_info"
  document_id => "%{id}"
  }
  stdout {
  codec => json_lines
  }
}

house_info.sql


SELECT * from test_table where id > :sql_last_value

house_info.txt


--- 1440

若运行过程中,修改house_info.txt 中的追踪标识的值,无法生效,需要重新启动logstash

多个input增量同步

increment.conf


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

5个input对应5个sql、5个txt。5个sql中使用 sql_last_value 作为变量

7.png

txt的内容为:

8.png

sql的内容为:

9.png

解决时区问题

logstash默认使用的时区是 UTC (世界标准时间),此时间比中国地区时间早8小时,那么就会出现一下问题:

  • ES中的@timestamp 字段的值比当前时间小8小时
  • 数据库中的update_time、create_time等日期、时间类型的字段值同步到ES后,ES中显示的数据和数据库数据不一致
  • 做增量同步的时候,若根据 update_time 字段进行查询,会出现不准确的情况
  • ......

解决办法:

  • jdbc_default_timezone :input-jdbc 插件的此配置需设置为中国时区,如 "Asia/Shanghai"
  • plugin_timezone: input-jdbc 插件的此配置需设置为 local
  • 增加过滤器,在过滤器中编写 ruby 脚本,读取数据库中的日期、时间字段、并增加8小时

test_total.conf 配置参考:


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_house_info"
  }
}
filter {
  #start,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #将@timestamp字段加8小时,赋值给一个变量temptimestamp
  ruby {
    code => "event.set('temptimestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  #将变量temptimestamp的值赋值给@timestamp
  ruby {
    code => "event.set('@timestamp',event.get('temptimestamp'))"
  }
  #删除临时变量temptimestamp
  mutate {
    remove_field => ["temptimestamp"]
  }
  #end,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #start,解决mysql中的update_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_update_time', event.get('update_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('update_time',event.get('temp_update_time'))"
  }
  mutate {
    remove_field => ["temp_update_time"]
  }
  #end,解决mysql中的update_time字段的值同步到es后少8小时的问题
  #start,解决mysql中的create_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_create_time', event.get('create_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('create_time',event.get('temp_create_time'))"
  }
  mutate {
    remove_field => ["temp_create_time"]
  }
  #end,解决mysql中的create_time字段的值同步到es后少8小时的问题
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

多库同步、增加过滤器

//todo

多库同步,将两个mysql库中的数据同步到同一个es索引库中,如从商品所在数据库获取商品数据,从库存所数据库获取库存数据,然后将他们的信息同步到es中。

增加过滤器,如使用过滤器进行脱敏处理;增加非数据库字段

json同步

//todo
mysql8 , 表字段为json,存储数据为json(非json字符串,就是json),同步到es失败问题

插件

简介

logstash通过各种插件的配合来实现数据同步的功能,插件可分为三类:输入插件、过滤插件、输出插件。如 input-jdbc 就是一个输入插件,用来从数据库中获取数据。

常用命令

查看可用插件
bin/logstash-plugin list

安装新插件
方案1、从https://github.com/logstash-plugins/安装新插件
bin/logstash-plugin install logstash-output-exec

方案2、本地安装
bin/logstash-plugin install /data/my-plugin.gem

schedule

logstash中的 schedule 使用的并不是 linux的cron表达式,5个星分别代表分、时、天、月、年,可以支持时区。

例子:

  • " *":每年每月每天每小时每分钟执行一次
  • " 5 1-3 *":每年1至3月每天上午5点的每分钟执行一次
  • "0 ":每年每月每日每小时的第0分钟执行一次
  • "0 6 * America/Chicago":将在美国芝加哥时间(即UTC/GMT -5)的每天上午6点0分执行一次

pipelines

windows中,在配置 path.config 的时候路径需要使用【\】,如: path.config: D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\test_total_increment.conf

简介

piplines 用于以多实例的方式启动 logstash

配置参考

以下示例配置了两个 pipelines,第一个 syncmain 根据id同步数据库数据到es,第二个 syncupdate 根据 update_time 同步数据库数据到es。

pipelines.yml

- pipeline.id: sync_main
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_total.conf"
- pipeline.id: sync_update
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_update.conf"

配置说明:

  • pipeline.workers: 有几个逻辑cpu就设置为几个
  • path.config:logstash 配置文件

启动命令:

进入bin目录,执行     sh logstash -r

添加 pipelines 后,logstash 将不以 -f test_total.conf 的方式启动。-r 表示自动 reload 配置文件的更新,也就是说更新了 test_total.conf 之后,无需更新启动logstash ,配置项会动态自动加载。

引用

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
目录
相关文章
|
9月前
|
存储 人工智能 搜索推荐
Memobase:开源AI长期记忆系统,让AI真正记住每个用户的秘密武器
Memobase 是一个开源的长期记忆系统,专为生成式 AI 应用设计,通过用户画像和时间感知记忆功能,帮助 AI 记住、理解并适应用户需求。
1632 0
|
JSON 缓存 前端开发
Django视图层探索:GET/POST请求处理、参数传递与响应方式详解
Django视图层探索:GET/POST请求处理、参数传递与响应方式详解
|
Kubernetes 网络安全 容器
在K8S中,有个服务使用service的nodeport进行暴露,发现访问不到如何排查?
在K8S中,有个服务使用service的nodeport进行暴露,发现访问不到如何排查?
|
12月前
|
数据采集
使用 Puppeteer 绕过 Captcha:实现商家数据自动化采集
本文介绍了如何使用Puppeteer结合代理IP和用户伪装技术,轻松绕过大众点评的Captcha验证,实现商家信息的高效采集。通过配置Puppeteer、设置代理和用户伪装参数、模拟人类操作等步骤,成功提取了目标页面的数据。该方法不仅提高了爬虫的稳定性和隐蔽性,还为市场研究和商业分析提供了有力支持。注意,数据采集需遵守法律法规及网站政策。
343 1
使用 Puppeteer 绕过 Captcha:实现商家数据自动化采集
|
自然语言处理 应用服务中间件 nginx
一文教会你 分词器elasticsearch-analysis-ik 的安装使用【自定义分词库】
这篇文章是关于如何在Elasticsearch中安装和使用ik分词器的详细教程,包括版本匹配、安装步骤、分词测试、自定义词库配置以及创建使用ik分词器的索引的方法。
一文教会你 分词器elasticsearch-analysis-ik 的安装使用【自定义分词库】
|
存储 JSON OLAP
Hologres支持哪些数据格式?
【8月更文挑战第20天】Hologres支持哪些数据格式?
378 1
|
SQL API Python
`bandit`是一个Python静态代码分析工具,专注于查找常见的安全漏洞,如SQL注入、跨站脚本(XSS)等。
`bandit`是一个Python静态代码分析工具,专注于查找常见的安全漏洞,如SQL注入、跨站脚本(XSS)等。
|
运维 关系型数据库 Java
实时计算 Flink版产品使用问题之如何设置白名单
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Python
Python reStructuredText风格注释详解
reStructuredText风格注释是Python代码注释的一种标准化格式,它提供了一种规范的注释格式,使得代码更加易读、易于维护。reStructuredText风格注释使用两个等号来包围注释标题,并按照一定规范编写。通过使用reStructuredText风格注释,我们可以为代码提供清晰的文档和说明,使得代码更加易读、易于维护。
506 2

热门文章

最新文章