21.4. logstash 配置项

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介:

21.4.1. input

21.4.1.1. 标准输入输出
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}"
Helloworld
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:03:38.340 [[main]-pipeline-manager] INFO  logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:03:38.356 [[main]-pipeline-manager] INFO  logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
2017-08-03T10:03:38.375Z localhost Helloworld
18:03:38.384 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
				
21.4.1.2. rubydebug

rubydebug提供以json格式输出到屏幕

root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
My name is neo
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:05:02.734 [[main]-pipeline-manager] INFO  logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:05:02.747 [[main]-pipeline-manager] INFO  logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
{
    "@timestamp" => 2017-08-03T10:05:02.764Z,
      "@version" => "1",
          "host" => "localhost",
       "message" => "My name is neo"
}
18:05:02.782 [Api Webserver] INFO  logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
				
21.4.1.3. 本地文件
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ]
    start_position => "beginning"
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}		
				
				

start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。

21.4.1.3.1. 指定文件类型
					
input { 
 file { path =>"/var/log/messages" type =>"syslog"} 
 file { path =>"/var/log/apache/access.log" type =>"apache"} 
}					 
					
					
21.4.1.3.1.1. Nginx
						
input {
        file {
                type => "nginx_access"
                path => ["/usr/share/nginx/logs/test.access.log"]
        }
}
output {
        redis {
                host => "localhost"
                data_type => "list"
                key => "logstash:redis"
        }
}						
						
						
21.4.1.4. TCP/UDP
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ]
  }
  tcp {
    port => "5145"
    type => "syslog-network"
  }
  udp {
    port => "5145"
    type => "syslog-network"
  }
}
output {
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}
				
				
21.4.1.5. Redis
				
input {
  redis {
    host => "127.0.0.1"
    port => "6379" 
    key => "logstash:demo"
    data_type => "list"
    codec  => "json"
    type => "logstash-redis-demo"
    tags => ["logstashdemo"]
  }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}
				
				

指定 Database 10

				
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  db => 10
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-api"
  }
}
				
				
21.4.1.6. Kafka

				
input {
  kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}		
				
				
21.4.1.7. jdbc
				
root@netkiller /etc/logstash/conf.d % cat jdbc.conf 
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "select * from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where ctime > :sql_last_value"
    use_column_value => true
    tracking_column => "ctime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-ctime.last"
  }

}
output {
    elasticsearch {
    	hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
    }
}				
				
				

21.4.2. filter

21.4.2.1. 日期格式化

系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:

				

filter {
  date {
    match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  }
 date {
    match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  } 
}				
				
				
21.4.2.2. patterns

创建匹配文件 /usr/share/logstash/patterns

				
mkdir /usr/share/logstash/patterns
vim /usr/share/logstash/patterns

NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
				
				
				
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}
				
				
21.4.2.3. syslog
				
input {
  file {
    type => "syslog" 
    path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
    sincedb_path => "/opt/logstash/sincedb-access"
  }
  syslog {
    type => "syslog"
    port => "5544"
  }
}
 
filter {
  grok {
    type => "syslog"
    match => [ "message", "%{SYSLOGBASE2}" ]
    add_tag => [ "syslog", "grokked" ]
  }
}
 
output {
 elasticsearch { host => "elk.netkiller.cn" }
}				
				
				
21.4.2.4. csv
				
input {
    file {
        type => "SSRCode"
        path => "/SD/2015*/01*/*.csv"
        start_position => "beginning"
    }
}
 
filter {
	csv {
		columns => ["Code","Source"]
		separator => ","
	}
	kv {
		source => "uri"
		field_split => "&?"
		value_split => "="
	}
 
}
 
# output logs to console and to elasticsearch
output {
    stdout {}
    elasticsearch {
        hosts => ["172.16.1.1:9200"]
    }
}				
				
				
21.4.2.5. 使用ruby 处理 CSV文件
				
input {
    stdin {}
}
filter {
    ruby {
        init => "
            begin
                @@csv_file    = 'output.csv'
                @@csv_headers = ['A','B','C']
                if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
                    CSV.open(@@csv_file, 'w') do |csv|
                        csv << @@csv_headers
                    end
                end
            end
        "
        code => "
            begin
                event['@metadata']['csv_file']    = @@csv_file
                event['@metadata']['csv_headers'] = @@csv_headers
            end
        "
    }
    csv {
        columns => ["a", "b", "c"]
    }
}
output {
    csv {
        fields => ["a", "b", "c"]
        path   => "%{[@metadata][csv_file]}"
    }
    stdout {
        codec => rubydebug {
            metadata => true
        }
    }
}				
				
				

测试

				
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf				
				
				

输出结果

				
A,B,C
1,2,3
4,5,6
7,8,9
				
				
21.4.2.6. 执行 ruby 代码

日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S

保存下面内容到配置文件data.conf

				
input {
	stdin{}
}
filter {

	ruby {
		init => "require 'time'"
        code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

	ruby {
		init => "require 'time'"
        code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}
output {

	stdout {
		codec => rubydebug
	}

}
				
				

/usr/share/logstash/bin/logstash -f date.conf

21.4.2.7. grok debug 工具

http://grokdebug.herokuapp.com

21.4.3. output

21.4.3.1. stdout
output {
	stdout { codec => rubydebug }
}
				
21.4.3.2. file 写入文件
				
output {
    file {
        path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}				
				
				
21.4.3.3. elasticsearch
				
output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logging"
  }
}				
				
				
21.4.3.3.1. 自定义 index

配置实现每日切割一个 index

					
index => "logstash-%{+YYYY.MM.dd}"

"_index" : "logstash-2017.03.22"	
					
					

index 自定义 logstash-%{type}-%{+YYYY.MM.dd}

					
input {

    redis {
        data_type => "list"
        key => "logstash:redis"
        host => "127.0.0.1"
        port => 6379
        threads => 5
        codec => "json"
    }
}
filter {

}
output {

    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        workers => 1
        flush_size => 20
        idle_flush_time => 1
        template_overwrite => true
    }
    stdout{}
}					
					
					
21.4.3.4. exec 执行脚本
				
output {
    exec {
        command => "sendsms.php \"%{message}\" -t %{user}"
    }
}
				
				





原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
消息中间件 存储 编解码
『Logstash』Logstash配置文件详解
📣读完这篇文章里你能收获到 - Logstash配置文件详解
1534 0
|
数据可视化 Java Linux
ElasticSerach学习(五)-基于ES+filebeat+logstash实现日志收集系统
ElasticSerach学习(五)-基于ES+filebeat+logstash实现日志收集系统
855 0
|
Java
Filebeat日志采集器实例 1
Filebeat日志采集器实例
237 1
|
索引
Filebeat日志采集器实例 2
Filebeat日志采集器实例
68 1
|
监控 容器
Filebeat配置顶级字段Logstash在output输出到Elasticsearch中的使用
Filebeat配置顶级字段Logstash在output输出到Elasticsearch中的使用
249 0
|
缓存 安全 算法
Elasticsearch参数配置汇总
Elasticsearch参数配置汇总
353 0
|
Java 应用服务中间件 nginx
1分钟系列-在 Kibana 自定义仪表盘
1分钟系列-在 Kibana 自定义仪表盘
LXJ
|
数据安全/隐私保护
filebeat部署+基本配置
filebeat部署+基本配置
LXJ
659 0
|
存储 Linux
elasticsearch插件四—— logstash插件安装详解
一、logstash插件介绍 logstash是一个用来管理事件和日志的工具,它的作用是收集日志,解析日志,存储日志为以后使用。
549 0
|
编解码 消息中间件 Kafka
filebeat-kafka-logstash-elasticsearch日志收集[中文时区问题、自己创建type类index]
logstash配置文件如下: 点击(此处)折叠或打开 input{     kafka{         bootstrap_servers => "xxxx:9092,xxxx:9092,dw72.
2401 0