概述
- 你可以通过使用$在Logstash插件的配置中设置环境变量引用。
- 在Logstash启动时,每个引用将被环境变量的值所取代。
- 该替换是区分大小写的。
- 对未定义变量的引用会引发Logstash配置错误。
- 你可以通过使用$的形式给出一个默认值。如果环境变量是未定义的,Logstash会使用默认值。
- 你可以在任何插件选项类型中添加环境变量引用:字符串、数字、布尔值、数组或哈希。
- 环境变量是不可改变的。如果你更新了环境变量,你将不得不重新启动Logstash以获取更新的值。
环境变量引用只支持在插件配置中,不支持在条件中。解决这个限制的一个方法是添加一个带有环境变量值的新事件字段,并在条件中使用该新字段。
例子
下面的例子告诉你如何使用环境变量来设置一些常用的配置选项的值。
设置TCP端口
下面是一个使用环境变量设置TCP端口的示例:
input { tcp { port => "${TCP_PORT}" } }
现在让我们设置TCP_PORT的值:
export TCP_PORT=12345
在启动时,Logstash使用以下配置:
input { tcp { port => 12345 } }
如果没有设置TCP_PORT环境变量,Logstash将返回一个配置错误。
你可以通过指定一个默认值来解决这个问题:
input { tcp { port => "${TCP_PORT:54321}" } }
现在,如果变量未定义,Logstash不会返回一个配置错误,而是使用默认值:
input { tcp { port => 54321 } }
如果定义了环境变量,Logstash将使用为变量指定的值而不是默认值。
设置标签的值
下面是一个使用环境变量来设置标签值的例子:
filter { mutate { add_tag => [ "tag1", "${ENV_TAG}" ] } }
让我们设置ENV_TAG的值:
export ENV_TAG="tag2"
在启动时,Logstash使用以下配置:
filter { mutate { add_tag => [ "tag1", "tag2" ] } }
设置文件路径
下面是一个使用环境变量来设置日志文件路径的例子:
filter { mutate { add_field => { "my_path" => "${HOME}/file.log" } } }
让我们设置HOME的值:
export HOME="/path"
在启动时,Logstash使用以下配置:
filter { mutate { add_field => { "my_path" => "/path/file.log" } } }
Logstach配置案例
这些示例说明了如何配置Logstash来过滤事件、处理Apache日志和syslog消息,并使用条件来控制过滤器或输出所处理的事件。
如果您需要帮助构建grok模式,请尝试grok调试器。Grok调试器是基于基本许可证的X-Pack特性,因此可以免费使用。
配置过滤
过滤器是一种内联处理机制,可以灵活地对数据进行切片,以满足您的需求。让我们来看看一些过滤器的作用。下面的配置文件设置grok和date过滤器。
input { stdin { } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
运行Logstash配置如下:
bin/logstash -f logstash-filter.conf
现在,将以下行粘贴到终端并按回车键,以便它将由stdin输入处理:
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你应该看到一些返回到标准输出的东西,看起来像这样:
{ "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"", "@timestamp" => "2013-12-11T08:01:45.000Z", "@version" => "1", "host" => "cadenza", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "11/Dec/2013:00:01:45 -0800", "verb" => "GET", "request" => "/xampp/status.php", "httpversion" => "1.1", "response" => "200", "bytes" => "3891", "referrer" => "\"http://cadenza/xampp/navi.php\"", "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"" }
如您所见,Logstash(在grok过滤器的帮助下)能够解析日志行(它碰巧是Apache的“组合日志”格式),并将其分解为许多不同的离散信息位。一旦您开始查询和分析日志数据,这将非常有用。例如,您将能够轻松地运行关于HTTP响应代码、IP地址、引用者等的报告。Logstash中包含了许多现成的grok模式,因此,如果您需要解析一种常见的日志格式,很可能已经有人为您完成了这些工作。有关更多信息,请参阅GitHub上的Logstash grok模式列表。
本例中使用的另一个过滤器是日期过滤器。这个过滤器解析出一个时间戳,并将其用作事件的时间戳(无论您在什么时候接收日志数据)。您会注意到,本例中的@timestamp字段被设置为2013年12月11日,即使Logstash在之后的某个时间点接收事件。当回填日志时,这很方便。它使您能够告诉Logstash“使用此值作为此事件的时间戳”。
处理Apache日志
让我们做一些实际有用的事情:处理apache2访问日志文件!我们将从本地主机上的文件中读取输入,并根据需要使用条件来处理事件。首先,创建一个类似logstash-apache.conf的文件,包含以下内容(你可以根据需要更改日志的文件路径):
input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
然后,用以下日志条目创建上面配置的输入文件(在这个例子中,"/tmp/access_log")(或者从你自己的web服务器上使用一些):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在,运行带有-f指令的Logstash以传递到配置文件中:
bin/logstash -f logstash-apache.conf
现在你应该在Elasticsearch中看到你的apache日志数据了! Logstash打开并读取指定的输入文件,处理它遇到的每个事件。任何记录在该文件中的额外行也将被捕获,由Logstash作为事件处理,并存储在Elasticsearch中。作为额外的奖励,它们在存储时,字段 "type "被设置为 "apache_access"(这是由输入配置中的type⇒"apache_access "行完成的)。
在这个配置中,Logstash只监视apache access_log,但要同时监视access_log和error_log(实际上是任何与*log匹配的文件)是很容易的,只要改变上述配置中的一行即可:
input { file { path => "/tmp/*_log" ...
当您重新启动Logstash时,它将同时处理错误和访问日志。但是,如果您检查数据(可能使用elasticsearch-kopf),您将看到access_log被分割成离散的字段,而error_log则没有。这是因为我们使用了grok过滤器来匹配标准的组合apache日志格式,并自动将数据分割为单独的字段。如果我们可以根据它的格式来控制一行的解析方式,那不是很好吗?嗯,我们可以…
注意,Logstash没有重新处理在access_log文件中已经看到的事件。当从文件中读取数据时,Logstash会保存它的位置,并且只在添加新行时处理它们。整洁!
使用条件
您可以使用条件来控制筛选器或输出处理哪些事件。例如,可以根据事件出现在哪个文件(access_log、error_log和其他以“log”结尾的随机文件)对每个事件进行标记。
input { file { path => "/tmp/*_log" } } filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
这个示例使用type字段标记所有事件,但实际上并没有解析错误或随机文件。有很多类型的错误日志,它们应该如何标记取决于你使用的是什么日志。
类似地,您可以使用条件将事件指向特定的输出。例如,你可以:
- 提醒nagios任何状态为5xx的apache事件
- 记录任何4xx状态到Elasticsearch
- 通过statsd记录所有状态代码
要告诉nagios任何具有5xx状态码的http事件,首先需要检查type字段的值。如果是apache,那么你可以检查status字段是否包含一个5xx错误。如果是,将其发送到nagios。如果不是5xx错误,检查状态字段是否包含4xx错误。如果是,将其发送到Elasticsearch。最后,不管status字段包含什么,将所有apache状态代码发送到statsd:
output { if [type] == "apache" { if [status] =~ /^5\d\d/ { nagios { ... } } else if [status] =~ /^4\d\d/ { elasticsearch { ... } } statsd { increment => "apache.%{status}" } } }
处理系统日志
Syslog是Logstash最常见的用例之一,而且它处理得非常好(只要日志行大致符合RFC3164的要求)。Syslog是事实上的UNIX网络日志标准,从客户机发送消息到本地文件,或者通过rsyslog发送到集中的日志服务器。在这个例子中,你不需要一个有效的syslog实例;我们将从命令行中伪造它,这样你就能感觉到会发生什么。
首先,让我们为Logstash + syslog制作一个简单的配置文件,称为logstash-syslog.conf。
input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
通常,客户机将连接到端口5000上的Logstash实例并发送其消息。对于本例,我们将telnet到Logstash并输入日志行(类似于我们之前在STDIN中输入日志行的方式)。打开另一个shell窗口与Logstash syslog输入进行交互,并输入以下命令:
telnet localhost 5000
复制并粘贴下面的行作为示例。(您可以自己尝试一些,但请记住,如果grok过滤器对您的数据不正确,它们可能无法解析)。
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'. Dec 9 12:11:43 test main[31499]: xxxx
现在您应该看到Logstash在处理和解析消息时的原始shell中的输出!
{ "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "@timestamp" => "2013-12-23T22:30:01.000Z", "@version" => "1", "type" => "syslog", "host" => "0:0:0:0:0:0:0:1:52617", "syslog_timestamp" => "Dec 23 14:30:01", "syslog_hostname" => "louis", "syslog_program" => "CRON", "syslog_pid" => "619", "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "received_at" => "2013-12-23 22:49:22 UTC", "received_from" => "0:0:0:0:0:0:0:1:52617", "syslog_severity_code" => 5, "syslog_facility_code" => 1, "syslog_facility" => "user-level", "syslog_severity" => "notice" }
安全配置
Logstash Elasticsearch插件(输出、输入、过滤和监控)支持通过HTTPS进行身份验证和加密。
要在安全集群中使用Logstash,您需要为Logstash配置身份验证凭据。如果身份验证失败,Logstash将抛出一个异常,处理管道将停止。
如果集群中启用了加密,还需要在Logstash配置中启用TLS/SSL。
如果希望使用X-Pack监控Logstash实例,并将监控数据存储在安全的Elasticsearch集群中,则必须为具有适当权限的用户配置Logstash的用户名和密码。
除了为Logstash配置身份验证凭据之外,还需要授予授权用户访问Logstash索引的权限。
配置Logstash使用Basic Authentication
Logstash需要能够管理索引模板,创建索引,以及在它创建的索引中写入和删除文档。
为Logstash设置身份验证凭据:
- 使用Kibana的Management > Roles UI或角色API创建一个
logstash_writer
角色。对于集群权限,添加manage_index_templates和monitor。对于索引添加权限write
,create
, andcreate_index
- 在kibana的管理端中,添加一个
logstash_internel
用户,关联logstash_writer
角色 - 将Logstash配置为您刚才创建的logstash_internal用户进行身份验证。您可以在Logstash.conf文件中为每个Elasticsearch插件分别配置凭据。例如:
input { elasticsearch { ... user => logstash_internal password => x-pack-test-password } } filter { elasticsearch { ... user => logstash_internal password => x-pack-test-password } } output { elasticsearch { ... user => logstash_internal password => x-pack-test-password } }
发送数据到Elasticsearch
Elasticsearch输出插件可以在Elasticsearch中存储时间序列数据集(如日志、事件和指标)和非时间序列数据。
建议使用数据流选项将时间序列数据集(如日志、指标和事件)索引到Elasticsearch中:
- data_stream
- data_stream_auto_routing
- data_stream_dataset
- data_stream_namespace
- data_stream_sync_fields
- data_stream_type
Data stream 配置案例
示例:基本默认配置
output { elasticsearch { hosts => "hostname" data_stream => "true" } }
这个例子展示了处理数据流的最小设置。与data_stream.* 事件字段被路由到相应的数据流。如果字段缺失,则将缺省路由到logs-generic-logstash。
例如:自定义数据流名称
output { elasticsearch { hosts => "hostname" data_stream => "true" data_stream_type => "metrics" data_stream_dataset => "foo" data_stream_namespace => "bar" } }
写入不同的索引:最佳实践
如果你发送事件到相同的Elasticsearch集群,但是你的目标是不同的索引,你可以:
- 使用不同的Elasticsearch输出,每个输出都有不同的索引参数值
- 使用一个Elasticsearch输出,并使用动态变量替换索引参数
每个Elasticsearch输出都是一个连接到集群的新客户端:
- 它必须初始化客户端并连接到Elasticsearch(如果你有更多的客户端,重启时间会更长)
- 它有一个关联的连接池
为了最小化打开到Elasticsearch的连接数量,最大化批量大小并减少“小”批量请求的数量(这很容易填满队列),通常使用单个Elasticsearch输出更有效。
例子:
output { elasticsearch { index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}" } }
如果事件中没有包含目标索引前缀的字段,该怎么办?
可以使用mutate筛选器和条件添加[@metadata]字段来为每个事件设置目标索引。[@metadata]字段将不会被发送到Elasticsearch。
例子:
filter { if [log_type] in [ "test", "staging" ] { mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } } } else if [log_type] == "production" { mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } } } else { mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } } } } output { elasticsearch { index => "%{[@metadata][target_index]}" } }
一个实际使用中的样例:
input { file { path => "/tmp/b_log" start_position => "beginning" } } filter { grok { patterns_dir => ["/opt/server/logstash-7.11.1/extrapattern"] match => { "message" => "%{LOGDATESTAMP:[@metadata][timestamp]}.*%{KQMARK:kqmark}\s*%{JAVALOGMESSAGE:javamessage}" } } date { match => [ "[@metadata][timestamp]", "yyyy-MM-dd HH:mm:ss.SSS" ] timezone => "Asia/Shanghai" locale => "zh-CN" } json { source => "javamessage" remove_field => [ "javamessage" ] } } output { if [kqmark] == "kqlog" { elasticsearch { hosts => ["127.0.0.1:9200","127.0.0.1:9300","127.0.0.1:9400"] index => "logstash-%{+YYYY.MM.dd}" user => "logstash_internal" password => "123456" } } }