root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}" Helloworld ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: 2017-08-03T10:03:38.375Z localhost Helloworld 18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
rubydebug提供以json格式输出到屏幕
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' My name is neo ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: { "@timestamp" => 2017-08-03T10:05:02.764Z, "@version" => "1", "host" => "localhost", "message" => "My name is neo" } 18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
input { file { type => "syslog" path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ] start_position => "beginning" } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] } }
start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。
input { file { path =>"/var/log/messages" type =>"syslog"} file { path =>"/var/log/apache/access.log" type =>"apache"} }
input { file { type => "syslog" path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ] } tcp { port => "5145" type => "syslog-network" } udp { port => "5145" type => "syslog-network" } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } }
input { redis { host => "127.0.0.1" port => "6379" key => "logstash:demo" data_type => "list" codec => "json" type => "logstash-redis-demo" tags => ["logstashdemo"] } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } }
指定 Database 10
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf input { redis { codec => json host => "localhost" port => 6379 db => 10 key => "logstash:redis" data_type => "list" } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-api" } }
input { kafka { zk_connect => "kafka:2181" group_id => "logstash" topic_id => "apache_logs" consumer_threads => 16 } }
root@netkiller /etc/logstash/conf.d % cat jdbc.conf input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" statement => "select * from article where id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/var/tmp/article.last" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where ctime > :sql_last_value" use_column_value => true tracking_column => "ctime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-ctime.last" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" action => "update" doc_as_upsert => true } }
系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:
filter { date { match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } date { match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } }
创建匹配文件 /usr/share/logstash/patterns
mkdir /usr/share/logstash/patterns vim /usr/share/logstash/patterns NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSER %{NGUSERNAME} NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
filter { if [type] == "nginx-access" { grok { match => { "message" => "%{NGINXACCESS}" } } } }
input { file { type => "syslog" path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] sincedb_path => "/opt/logstash/sincedb-access" } syslog { type => "syslog" port => "5544" } } filter { grok { type => "syslog" match => [ "message", "%{SYSLOGBASE2}" ] add_tag => [ "syslog", "grokked" ] } } output { elasticsearch { host => "elk.netkiller.cn" } }
input { file { type => "SSRCode" path => "/SD/2015*/01*/*.csv" start_position => "beginning" } } filter { csv { columns => ["Code","Source"] separator => "," } kv { source => "uri" field_split => "&?" value_split => "=" } } # output logs to console and to elasticsearch output { stdout {} elasticsearch { hosts => ["172.16.1.1:9200"] } }
input { stdin {} } filter { ruby { init => " begin @@csv_file = 'output.csv' @@csv_headers = ['A','B','C'] if File.zero?(@@csv_file) || !File.exist?(@@csv_file) CSV.open(@@csv_file, 'w') do |csv| csv << @@csv_headers end end end " code => " begin event['@metadata']['csv_file'] = @@csv_file event['@metadata']['csv_headers'] = @@csv_headers end " } csv { columns => ["a", "b", "c"] } } output { csv { fields => ["a", "b", "c"] path => "%{[@metadata][csv_file]}" } stdout { codec => rubydebug { metadata => true } } }
测试
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
输出结果
A,B,C 1,2,3 4,5,6 7,8,9
日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S
保存下面内容到配置文件data.conf
input { stdin{} } filter { ruby { init => "require 'time'" code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } ruby { init => "require 'time'" code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { stdout { codec => rubydebug } }
/usr/share/logstash/bin/logstash -f date.conf
output { file { path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz" message_format => "%{message}" gzip => true } }
output { stdout { codec => rubydebug } elasticsearch { hosts => ["127.0.0.1:9200"] index => "logging" } }
配置实现每日切割一个 index
index => "logstash-%{+YYYY.MM.dd}" "_index" : "logstash-2017.03.22"
index 自定义 logstash-%{type}-%{+YYYY.MM.dd}
input { redis { data_type => "list" key => "logstash:redis" host => "127.0.0.1" port => 6379 threads => 5 codec => "json" } } filter { } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" workers => 1 flush_size => 20 idle_flush_time => 1 template_overwrite => true } stdout{} }
原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。