相关mysql数据库脚本文件
mysql.conf配置文件
input { # jdbc { # jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar" # jdbc_driver_class => "com.mysql.jdbc.Driver" # 8.0以上版本:一定要把serverTimezone=UTC天加上 &useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true # jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8" # jdbc_user => "root" # jdbc_password => "password" # schedule => "* * * * *" # #数据库的查询语句,两种方法:1、statement_filepath指定sql路径;2、sql语句 # # statement_filepath => "filename.sql" # statement => "SELECT * FROM cwdz_bsfbzjbank_flow WHERE acct_date >= :sql_last_value" # #id,project_id,project_code,project_name,package_id,package_code,deptid,org_name,order_no,supplier_code,company_name,amount,DATE_FORMAT(acct_date,'%Y-%m-%d %T') as acct_date,payee_name,payee_acctno,payee_bankno,payee_bankname,pay_type,tenant_id # #是否分页 # jdbc_paging_enabled => "true" # jdbc_page_size => "5000" # # 是否需要记录某列的值,用于实时同步更新。 # use_column_value => true # # 需要记录的字段,一般用于记录主键id,或者更新时间,用于sql查询最新。 # tracking_column => "acct_date" # tracking_column_type => "timestamp" # # 是否清除记录的字段。 # clean_run => false # #记录字段保存的位置。 # last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata2" # #写入es数据的key,默认会被转成小写,该字段用于控制是否小写。 # lowercase_column_names => true # # jdbc_default_timezone => "Asia/Shanghai" # plugin_timezone => "local" # type => "pfflow" # # } jdbc { jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" # 8.0以上版本:一定要把serverTimezone=UTC天加上 jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" jdbc_user => "root" jdbc_password => "password" schedule => "* * * * *" # statement_filepath => "filename.sql" #解决相差8小时 时差 statement => "SELECT * FROM cwdz_bsfbzjbank_flow WHERE acct_date > convert_tz(:sql_last_value, '+00:00','-08:00') order by acct_date asc" jdbc_paging_enabled => "true" jdbc_page_size => "5000" use_column_value => true tracking_column => "acct_date" tracking_column_type => "timestamp" clean_run => false last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata15" lowercase_column_names => true jdbc_default_timezone => "Asia/Shanghai" plugin_timezone => "local" type => "pufa" } # jdbc { # jdbc_driver_library => "F:/Install/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar" # jdbc_driver_class => "com.mysql.jdbc.Driver" # 8.0以上版本:一定要把serverTimezone=UTC天加上 # jdbc_connection_string => "jdbc:mysql://10.134.22.232:3307/cpmcbiddingx_report?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" # jdbc_user => "root" # jdbc_password => "password" # schedule => "* * * * *" # # statement_filepath => "filename.sql" # statement => "SELECT * FROM cpmc_dz_report_kpinfo WHERE created_time >= :sql_last_value" # # jdbc_paging_enabled => "true" # jdbc_page_size => "5000" # use_column_value => true # tracking_column => "created_time" # tracking_column_type => "timestamp" # clean_run => false # last_run_metadata_path => "D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/logstash_metadata2" # lowercase_column_names => true # jdbc_default_timezone => "Asia/Shanghai" # plugin_timezone => "local" # type => "kpinfo" # } } #ElasticSearch中默认使用UTC时间,和中国时间相差8小时,加入以下配置 #filter { # ruby { # code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" # } # ruby { # code => "event.set('@timestamp',event.get('timestamp'))" # } # mutate { # remove_field => ["timestamp"] # } # ruby { # code => "event.set('acct_date', event.get('acct_date').time.localtime + 8*60*60)" # } #} output { if[type] == "pfflow" { elasticsearch { hosts => ["localhost:9200"] index => "pfflow_0611" document_id => "%{id}" } } if[type] == "kpinfo" { elasticsearch { hosts => ["localhost:9200"] index => "kpinfo_0615" #document_id => "%{id}" } } if[type] == "pufa" { elasticsearch { hosts => ["localhost:9200"] index => "pufa_0615" document_id => "%{id}" } } #elasticsearch { # ES的IP地址及端口 # hosts => ["127.0.0.1:9200"] # 索引名称 可自定义 # index => "0603logstash" # 需要关联的数据库中有有一个id字段,对应类型中的id #document_id => "%{id}" #document_type => "cpmc_dz_project" #template =>"D:/software/elasticsearch/03.elasticsearch/logstash-7.13.1/config/nm_course_template.json" #template_name =>"nm_course" #template_overwrite =>"true" #} stdout { # JSON格式输出 codec => json_lines } }
启动:进入logstash bin目录文件 cmd命令执行logstash -f “mysql.conf”