Logstash极简教程

简介: 一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

简介

一个灵活的开源数据收集、处理、传输工具。logstash包含三个模块,输入、过滤和输出。其中输入、输出是必须的,过滤是可选的。logstash工作流程为从数据源中获取数据、对数据做过滤和简单清洗、输出到指定的目标中。

1.png

输入:
2.png

输出:
3.png

logstash提供的功能依赖于logstash插件,默认安装包中已经集成了很多的开箱即用的插件,如
logstash-input-jdbc,logstash-output-elasticsearch,logstash-input-elasticsearch。你没看错,elasticsearch即可以作为logstash的输入,也可以作为输出。

Logstash的代码为JRuby,由于Jruby的运行于JVM中,那么它与Java就具备了天然的关联性。这为Java程序的集成工作提供了便利的基础。

下载

官网下载首页:https://www.elastic.co/cn/downloads/logstash

官网历史版本下载:https://www.elastic.co/cn/downloads/past-releases#logstash

官网7.15.2下载页:

4.png

安装

直接下载压缩包,解压之后里面包含windows和linux下的启动程序,windows下使用logstash.bat,linux下使用logstash。

windows

windwos下解压安装,安装之后得文件目录如下:
5.png

  • bin:可执行文件目录,启动程序为logstash.bat,logstash-plugin.bat为插件管理程序
  • config:配置文件目录
  • data:存放程序运行过程中产生得数据,和同步数据无关
  • jdk:安装完毕默认自带了一个jdk,启动得时候如果找不到java_home,就使用自己自带得jdk
  • lib:ruby的一些依赖库
  • logs:日志文件存放的目录,运行日志存放在logstash-plain.log文件中。需要注意的是此文件只会记录sql语句执行记录,不会记录从数据库查询到的结果。
  • TODO

启动

windows

为了测试一下安装是否成功,或者想体验一下logstash的输入和输出。进入config目录,新建一个logstash-test.conf配置文件,然后使用此配置文件启动logstash。

#启动命令:.\bin\logstash.bat  -f .\config\logstash-test.conf --path.data=/data/test
#测试:访问127.0.0.1:9600,能打开说明启动成功。在控制台输入hello,控制到输出会打印输入的信息
input {
    stdin{
    }
}
output {
    stdout{
    }
}

配置文件主要有两个模块,分别是input和output,功能是接收用户的控制台输入信息,并将其输出到控制台。

配置参考

1:注意尽量不要有空行
2:sql中不要出现type字段,如果必须同步表中的type列,使用别名,select type as new_type from user

全量同步mysql数据到es

本例在CentOS中实现,将每分钟全量同步一次mysql中的视图数据到es中。

本例要实现的功能是将mysql中的数据同步到es中,整体流程为:

  • 先在mysql数据库中建立视图,视图的字段根据es索引库的字段要求来设置
  • 准备es和logstash,确保mysql、logstash、es所在机器之间没有网络问题
  • es中不必预先创建索引库,logstash在同步过程中会自动创建索引库
  • logstash定时访问mysql并执行查询视图的sql语句,将查询结果同步到es中
    input {
    stdin {
    }
    jdbc {
    jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/cml"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
    schedule => "* * * * *" 
    type => "type_cml"
    }
    }
    filter {
    json {
    source => "message"
    remove_field => ["message"]
    }
    }
    output {
    elasticsearch {
    hosts => "192.168.1.88:9955"
    index => "cml"
    document_id => "%{id}"
    }
    stdout {
    codec => json_lines
    }
    }
    

配置解释:

  • statement_filepath:要执行的sql语句所在文件,本例中该文件内容为:select * from view_person_info
  • schedule:同步周期,这里并不是cron表达式,5个星分别代表分、时、天、月、年,本例中即每分钟第0秒执行一次
  • type:为本input指定一个类型,此类型并不是指同步到es中,在es中新建的类型
  • document_id:es中文档的id,一般使用mysql数据库中的id

启动logstash命令:

进入bin,执行  sh logstash -f ../config/test_person_info.conf

多个input同步mysql数据到不同es索引库

本例在windows中实现,5个input分别查询mysql中不同的5个视图,将查询结果分别同步到5个不同的es索引库


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\standard_address_info.sql"
  schedule => "* * * * *"
  type => "type_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\person_info.sql"
  schedule => "* * * * *"
  type => "type_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  type => "type_house_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\company_info.sql"
  schedule => "* * * * *"
  type => "type_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_cml
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\adcode_fence_info.sql"
  schedule => "* * * * *"
  type => "type_adcode_fence_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  if[type]=="type_adcode_fence_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_adcode_fence_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_company_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_company_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_house_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_house_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_person_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_person_info"
      document_id => "%{id}"
    }
  }
  if[type]=="type_standard_address_info"{
    elasticsearch {
      hosts => "192.168.1.78:9955"
      index => "cml_type_standard_address_info"
      document_id => "%{id}"
    }
  }
  stdout {
  codec => json_lines
  }
}

启动命令:

进入 bin 目录,执行 .\logstash.bat -f ..\config\cml.conf

增量同步

增量同步是指根据数据库中某表的某列作为标识,根据该标识的值同步数据,同步一次之后将标识的值更新为和当前数据库一致,下次同步的时候按照大于当前标识值进行同步。常见的在电商系统中,根据商品id增量同步数据。

本例演示根据id同步数据,需按照如下要求配置 input-jdbc 插件:

  • use_column_value 设置为 true,表示新增一个追踪标识,该标识使用数据库字段列的值。设置为时false,:sql_last_value反映上一次执行查询的时间
  • tracking_column 设置为 id ,即使用数据库中的id 列作为追踪标识。
    如果 use_column_value 为真,需配置此参数. 数据库中,该 column 必须是递增的。
  • tracking_column_type:可选的值有两个["numeric", "timestamp"],也就是说我们只能根据数据库中某一个数值类型或者时间类型设置 user_column_value。
  • record_last_run 设置为 true ,表示记录上次执行结果, 如果为真,将会把上次执行的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  • last_run_metadata_path 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  • clean_run 设置为false
  • 在sql中添加where条件,如 SELECT * from test_table where id > :sql_last_value

6.png

house_info.conf


input {
  stdin {
  }
  jdbc {
  #mysql jdbc connection string to our backup databse  后面的test对应mysql中的test数据库
  jdbc_connection_string => "jdbc:mysql://192.168.1.152:3306/test_table"
  jdbc_user => "root"
  jdbc_password => "123456"
  jdbc_driver_library => "D:\Application\logstash-7.14.0-windows-x86_64\JdbcDriver\mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "5"
  statement_filepath => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\sql\house_info.sql"
  schedule => "* * * * *"
  #使用其它字段追踪,而不是用时间
  use_column_value => true
  #追踪的字段,如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的.比如:ID.
  tracking_column => id
  #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  record_last_run => true
  #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值.比如上次数据库有 10000 条记录,查询完后该文件中就会有数字 10000 这样的记录,下次执行 SQL 查询可以从 10001 条处开始.我们只需要在 SQL 语句中 WHERE MY_ID > :sql_last_value 即可. 其中 :sql_last_value 取得就是该文件中的值(10000).
  last_run_metadata_path => "D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\lastRunMetadata\house_info.txt"
  #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  clean_run => false
  #是否将 column 名称转小写
  lowercase_column_names => false
  #设定ES索引类型
  type => "type_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
  elasticsearch {
  hosts => "192.168.1.78:9955"
  index => "increment_house_info"
  document_id => "%{id}"
  }
  stdout {
  codec => json_lines
  }
}

house_info.sql


SELECT * from test_table where id > :sql_last_value

house_info.txt


--- 1440

若运行过程中,修改house_info.txt 中的追踪标识的值,无法生效,需要重新启动logstash

多个input增量同步

increment.conf


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  type => "_doc_house_info"
  }
}
filter {
  json {
  source => "message"
  remove_field => ["message"]
  }
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

5个input对应5个sql、5个txt。5个sql中使用 sql_last_value 作为变量

7.png

txt的内容为:

8.png

sql的内容为:

9.png

解决时区问题

logstash默认使用的时区是 UTC (世界标准时间),此时间比中国地区时间早8小时,那么就会出现一下问题:

  • ES中的@timestamp 字段的值比当前时间小8小时
  • 数据库中的update_time、create_time等日期、时间类型的字段值同步到ES后,ES中显示的数据和数据库数据不一致
  • 做增量同步的时候,若根据 update_time 字段进行查询,会出现不准确的情况
  • ......

解决办法:

  • jdbc_default_timezone :input-jdbc 插件的此配置需设置为中国时区,如 "Asia/Shanghai"
  • plugin_timezone: input-jdbc 插件的此配置需设置为 local
  • 增加过滤器,在过滤器中编写 ruby 脚本,读取数据库中的日期、时间字段、并增加8小时

test_total.conf 配置参考:


input {
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/person_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/person_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_person_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/adcode_fence_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/adcode_fence_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_adcode_fence_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/standard_address_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/standard_address_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_standard_address_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/company_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/company_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_company_info"
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://192.168.1.88:3306/ngh-smart-data-center"
  jdbc_user => "root"
  jdbc_password => "root"
  jdbc_driver_library => "/usr/local/logstash-7.14.0/driver/mysql-connector-java-5.1.49.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
#以下对应着要执行的sql的绝对路径。
  statement_filepath => "/usr/local/logstash-7.14.0/config/sql/house_info.sql"
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新(测试结果,不同的话请留言指出)
  schedule => "* * * * *"
  use_column_value => true
  tracking_column => id
  record_last_run => true
  last_run_metadata_path => "/usr/local/logstash-7.14.0/config/lastRunMetadata/house_info.txt"
  clean_run => false
  lowercase_column_names => false
  jdbc_default_timezone => "Asia/Shanghai"
  plugin_timezone => "local"
  type => "_doc_house_info"
  }
}
filter {
  #start,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #将@timestamp字段加8小时,赋值给一个变量temptimestamp
  ruby {
    code => "event.set('temptimestamp', event.get('@timestamp').time.localtime + 8*60*60)"
  }
  #将变量temptimestamp的值赋值给@timestamp
  ruby {
    code => "event.set('@timestamp',event.get('temptimestamp'))"
  }
  #删除临时变量temptimestamp
  mutate {
    remove_field => ["temptimestamp"]
  }
  #end,解决es中系统级字段@timestamp时间比本地时间少8小时的问题
  #start,解决mysql中的update_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_update_time', event.get('update_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('update_time',event.get('temp_update_time'))"
  }
  mutate {
    remove_field => ["temp_update_time"]
  }
  #end,解决mysql中的update_time字段的值同步到es后少8小时的问题
  #start,解决mysql中的create_time字段的值同步到es后少8小时的问题
  ruby {
    code => "event.set('temp_create_time', event.get('create_time').time.localtime + 8*60*60)"
  }
  ruby {
    code => "event.set('create_time',event.get('temp_create_time'))"
  }
  mutate {
    remove_field => ["temp_create_time"]
  }
  #end,解决mysql中的create_time字段的值同步到es后少8小时的问题
}
output {
if [type]=="_doc_person_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_person_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_adcode_fence_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_adcode_fence_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_standard_address_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_standard_address_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_company_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_company_info"
  document_id => "%{id}"
  }
stdout {
  codec => json_lines
}
}
if [type]=="_doc_house_info" {
  elasticsearch {
  hosts => "192.168.1.88:9955"
  index => "test_house_info"
  document_id => "%{id}"
  }
}
stdout {
  codec => json_lines
}
}

多库同步、增加过滤器

//todo

多库同步,将两个mysql库中的数据同步到同一个es索引库中,如从商品所在数据库获取商品数据,从库存所数据库获取库存数据,然后将他们的信息同步到es中。

增加过滤器,如使用过滤器进行脱敏处理;增加非数据库字段

json同步

//todo
mysql8 , 表字段为json,存储数据为json(非json字符串,就是json),同步到es失败问题

插件

简介

logstash通过各种插件的配合来实现数据同步的功能,插件可分为三类:输入插件、过滤插件、输出插件。如 input-jdbc 就是一个输入插件,用来从数据库中获取数据。

常用命令

查看可用插件
bin/logstash-plugin list

安装新插件
方案1、从https://github.com/logstash-plugins/安装新插件
bin/logstash-plugin install logstash-output-exec

方案2、本地安装
bin/logstash-plugin install /data/my-plugin.gem

schedule

logstash中的 schedule 使用的并不是 linux的cron表达式,5个星分别代表分、时、天、月、年,可以支持时区。

例子:

  • " *":每年每月每天每小时每分钟执行一次
  • " 5 1-3 *":每年1至3月每天上午5点的每分钟执行一次
  • "0 ":每年每月每日每小时的第0分钟执行一次
  • "0 6 * America/Chicago":将在美国芝加哥时间(即UTC/GMT -5)的每天上午6点0分执行一次

pipelines

windows中,在配置 path.config 的时候路径需要使用【\】,如: path.config: D:\Application\logstash-7.14.0-windows-x86_64\logstash-7.14.0\config\test_total_increment.conf

简介

piplines 用于以多实例的方式启动 logstash

配置参考

以下示例配置了两个 pipelines,第一个 syncmain 根据id同步数据库数据到es,第二个 syncupdate 根据 update_time 同步数据库数据到es。

pipelines.yml

- pipeline.id: sync_main
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_total.conf"
- pipeline.id: sync_update
  pipeline.workers: 4
  pipeline.batch.size: 1
  queue.type: persisted
  path.config: "/usr/local/logstash-7.14.0/config/test_update.conf"

配置说明:

  • pipeline.workers: 有几个逻辑cpu就设置为几个
  • path.config:logstash 配置文件

启动命令:

进入bin目录,执行     sh logstash -r

添加 pipelines 后,logstash 将不以 -f test_total.conf 的方式启动。-r 表示自动 reload 配置文件的更新,也就是说更新了 test_total.conf 之后,无需更新启动logstash ,配置项会动态自动加载。

引用

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
目录
相关文章
|
消息中间件 SQL 分布式计算
一篇文章搞定数据同步工具SeaTunnel
一篇文章搞定数据同步工具SeaTunnel
10459 1
|
计算机视觉
U盘使用技巧:U盘自动启运行应用程序(autorun.inf无法运行终极解决方案)
U盘使用技巧:U盘自动启运行应用程序(autorun.inf无法运行终极解决方案)
U盘使用技巧:U盘自动启运行应用程序(autorun.inf无法运行终极解决方案)
|
4月前
|
人工智能 安全 算法
【云故事探索】NO.18:易点天下:以全栈AI营销能力引领全球增长新周期,阿里云“全球一张网”筑牢中国企业出海底座
位于西安的易点天下,以算法与数据驱动,助力中国品牌出海。自2011年成立以来,业务覆盖全球220+国家,通过AIGC、Agentic AI等技术,携手阿里云构建智能营销全链路,推动跨境电商、新能源等领域全球化布局,成为中国企业走向世界的重要推手。
|
5月前
|
编解码 缓存 Java
Java 高效实现 WAV 音频拼接彻底摆脱 FFmpeg 的轻量本地方案
本文介绍一种纯Java实现的高效WAV音频拼接方案,无需依赖FFmpeg。通过解析WAV文件结构,利用内存映射与流式写入,实现零转码、低CPU占用的高性能拼接,适用于TTS、播客、嵌入式等场景,具备跨平台、易部署、高稳定性的优势。
Java 高效实现 WAV 音频拼接彻底摆脱 FFmpeg 的轻量本地方案
|
人工智能 自然语言处理 API
【活动系列】在阿里云百炼构建企业级多模态应用,发布作品赢取礼品
本次活动旨在鼓励开发者围绕AI应用开发实训课中的音视频交互和多模态RAG能力,在实训群内上传智能体效果截图或视频。活动时间为2025年1月22日至3月31日,分为作品提交、评审和结果公布三个阶段。参与者需在阿里云百炼平台上创建应用,并在规定时间内提交作品。奖项设置包括磁吸充电宝、定制保温杯和折叠雨伞等丰厚礼品。所有作品必须为原创,且需使用阿里云百炼平台完成。详细操作指南及注意事项请参见活动页面。
4313 10
|
10月前
|
数据采集 搜索推荐 应用服务中间件
如何通过 noindex 阻止网页被搜索引擎编入索引?
在一些网站中,通过`robots.txt`可以控制哪些站点资源或目录能被搜索引擎索引,但是随着站点页面增加,之前允许的索引页面常常不希望被索引,如果直接修改 `robots.txt`,影响会比较大,所以页面级的控制就很有必要。
255 0
如何通过 noindex 阻止网页被搜索引擎编入索引?
|
数据采集
使用 Puppeteer 绕过 Captcha:实现商家数据自动化采集
本文介绍了如何使用Puppeteer结合代理IP和用户伪装技术,轻松绕过大众点评的Captcha验证,实现商家信息的高效采集。通过配置Puppeteer、设置代理和用户伪装参数、模拟人类操作等步骤,成功提取了目标页面的数据。该方法不仅提高了爬虫的稳定性和隐蔽性,还为市场研究和商业分析提供了有力支持。注意,数据采集需遵守法律法规及网站政策。
509 1
使用 Puppeteer 绕过 Captcha:实现商家数据自动化采集
|
机器学习/深度学习 自然语言处理
预训练语义模型作为特征提取器的方法
预训练语义模型作为特征提取器的方法
|
数据采集 存储 监控
使用Java构建实时监控和警报系统的最佳实践
使用Java构建实时监控和警报系统的最佳实践
|
机器学习/深度学习 人工智能 安全
SentinelOne监测中隔离的文件,人工如何取消隔离
SentinelOne 的 Agent 在终端设备上实时监测系统的活动,包括文件操作、网络通信、内存访问等, SentinelOne 使用人工智能和机器学习技术对监测到的活动进行行为分析,识别潜在的威胁,包括已知的恶意软件和未知的零日攻击。 基于行为分析和实时监测,SentinelOne 快速识别出可能的威胁,并进行准确的威胁分类,包括病毒、勒索软件、恶意脚本等。 SentinelOne 可以自动采取响应措施,如隔离受感染的设备、终止恶意进程、删除恶意文件等,以尽快减轻威胁带来的影响。当技术人员发现隔离的文件没有危害时,可以手动隔离。文章阐述了怎么手动撤销的过程。
2392 0
SentinelOne监测中隔离的文件,人工如何取消隔离

热门文章

最新文章

下一篇
开通oss服务