match
值类型为数组,默认值为空
用于将指定的字段按照指定的格式解析.比如:
match =>["createtime", "yyyyMMdd","yyyy-MM-dd"]
target
值类型是字符串,默认值为“@timestamp”
将匹配的时间戳存储到给定的目标字段中。如果未提供,则默认更新@timestamp事件的字段。
timezone
值类型是字符串,没有默认值
用于为要被解析的时间指定一个时区,值为时区的canonical ID,一般不用设置,因为会根据当前系统的时区获取这个值.
这里设置的时区并不是logstash最终储存的时间的时区,logstash最终储存的时间为UTC标准时间
时间格式
Symbol |
Meaning |
Presentation |
Examples |
G |
era |
text |
AD |
C |
century of era (>=0) |
number |
20 |
Y |
year of era (>=0) |
year |
1996 |
x |
weekyear |
year |
1996 |
w |
week of weekyear |
number |
27 |
e |
day of week |
number |
2 |
E |
day of week |
text |
Tuesday; Tue |
y |
year |
year |
1996 |
D |
day of year |
number |
189 |
M |
month of year |
month |
July; Jul; 07 |
d |
day of month |
number |
10 |
a |
halfday of day |
text |
PM |
K |
hour of halfday (0~11) |
number |
0 |
h |
clockhour of halfday (1~12) |
number |
12 |
H |
hour of day (0~23) |
number |
0 |
k |
clockhour of day (1~24) |
number |
24 |
m |
minute of hour |
number |
30 |
s |
second of minute |
number |
55 |
S |
fraction of second |
number |
978 |
z |
time zone |
text |
Pacific Standard Time; PST |
Z |
time zone offset/id |
zone |
-0800; -08:00; America/Los_Angeles |
' |
escape for text |
delimiter |
|
'' |
single quote |
literal |
' |
IP分析插件geoip
描述
geoip插件能分析访问的ip分析出访问者的地址信息,GeoLite2数据库是免费的IP地理定位数据库,与MaxMind的GeoIP2数据库相当,但不太准确,geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的geoip 插件对 null 结果的处理是:不生成对应的geoip.字段。
source
这是必须设置的值,值类型是字符串
包含要通过geoip映射的IP地址或主机名的字段。如果此字段是数组,则仅使用第一个值。
database
指定数据库的路径,值类型是路径
Logstash应该使用的Maxmind数据库文件的路径。默认数据库是GeoLite2-City。GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind提供的免费数据库,受支持。GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。
如果未指定,则默认为Logstash附带的GeoLite2 City数据库。
field
GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的,比如:
fields =>["city_name", "continent_code", "country_code2","country_code3", "country_name", "dma_code","ip", "latitude", "longitude","postal_code", "region_name", "timezone"]
default_database_type
值类型是字符串,默认值为city,可选的值为city和ASN
该插件现在包括GeoLite2-City和GeoLite2-ASN数据库。如果database和default_database_type未设置,将选择GeoLite2-City数据库。要使用包含的GeoLite2-ASN数据库,请设置default_database_type为ASN。
多行编解码插件multiline
描述
此编解码器的最初目标是允许将来自文件的多行消息连接到单个事件中。例如,将Java异常和堆栈跟踪消息加入单个事件中。
negate
值类型是布尔值,默认值为false
否定正则表达式模式。negate是对pattern的结果做判断是否匹配,默认值是false代表匹配,而true代表不匹配,这里并没有反,因为negate本身是否定的意思,双重否定表肯定。
pattern
必须设置的,值类型是字符串
pattern后面加要匹配的正则表达式,可以使用grok正则表达式的模板来配置该选项。
what
这是必须的设置,值可以是任何的:previous,next
如果模式匹配,事件是否属于下一个或上一个事件,previous 值指定行匹配pattern选项的内容是上一行的一部分。 next 指定行匹配pattern选项的内容是下一行的一部分。
pattern_dir
值类型是数组,默认值为[]
Logstash默认带有一堆模式,如果你要添加其他模式,可以将匹配模式写到文件里
例如
NUMBER \d+
示例
codec=>multiline { pattern =>"^\[" negate => true what =>"previous" }
如果没有匹配到以“[”开头的行,那肯定是属于前一行的。
使用logstash收集日志操作
使用logstash收集NGINX日志
vim logstash-nginx.conf input { file { #使用file插件收集日志 path =>"/var/log/nginx/*.log" #指定日志路径 start_position =>"beginning" #指定日志读取的位置,默认是end,
beginning表示从文件开始的位置读取,而end表示从上次读取结束后的日志文件开始读取,但是如果记录过文件的读取信息,这个配置也就失去作用了。
} } filter { if [path] =~ "access" { #判断是否是access日志 mutate { replace => { type =>"nginx_access" } } #如果是access日志则添加一个type字段 grok { match => { "message"=> "%{COMBINEDAPACHELOG}" } #使用grok插件过滤access日志,将其转化成多个字段,%{COMBINEDAPACHELOG}是grok定义好的表达式 } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] #将收集到的日志日期作为时间戳 } } else if [path] =~ "error"{ #判断是否是错误日志 mutate { replace => { type =>"nginx_error" } } #如果是错误日志,则添加一个type字段 grok { match => { "message"=> "(?<datetime>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+),(?<errinfo>.*$)" } #使用grok插件过滤error日志,将其转化成多个字段,这里的表达式是自己定义的 } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] #将收集到的日志作为时间戳 } } } output { if [type] =~ "access" { #如果收集到的日志为access类型,那么就将日志输出到access的索引 elasticsearch { hosts =>["192.168.179.134:9200"] index =>"nginx_access-%{+YYYY.MM.dd}" } } else if [type] =~ "error"{ #如果收集到的日志为error类型,那么就将日志输出到error的索引 elasticsearch { hosts =>["192.168.179.134:9200"] index =>"nginx_error-%{+YYYY.MM.dd}" } } }
使用logstash结合rsyslog收集系统日志
rsyslog是日志收集工具,现在很多Linux都自带rsyslog,用其替换掉syslog。
rsyslog本身有一个配置文件/etc/rsyslog.conf,里面定义了日志文件,以及相应保存的地址。
一般通过rsyslog来收集日志信息,并发送到logstash。
配置rsyslog
vim /etc/rsrslog.conf *.* @@192.168.179.134:5514
配置logstash
vim rsyslog.conf input { syslog { host =>"192.168.179.134" port => 5514 } } output { stdout { codec => rubydebug } }
在命令行输入logger "helo world"可以查看logstash的输出
logger是一个shell命令接口,可以通过该接口使用Syslog的系统日志模块,还可以从命令行直接向系统日志文件写入一行信息。
使用Redis作为消息队列来收集日志
redis服务器是logstash官方推荐的broker(代理人)选择,broker角色也就意味着会同时存在输入和输出两个插件,发送消息的也就是输出插件被称作生产者,而接收消息的也就是输入插件被称作消费者。
redis消息队列作用说明:
- 防止Logstash和ES无法正常通信,从而丢失日志。
- 防止日志量过大导致ES无法承受大量写操作从而丢失日志。
- 应用程序(php,java)在输出日志时,可以直接输出到消息队列,从而 完成日志收集。
补充:如果redis使用的消息队列出现扩展瓶颈,可以使用更加强大的kafka,flume来代替。
安装Redis
wget http://download.redis.io/releases/redis-4.0.11.tar.gz #下载Redis源码
tar -zxfredis-4.0.11.tar.gz #解压Redis源码 cp -r redis-4.0.11 /usr/local/ mv redis-4.0.11/ redis cd /usr/local/redis/ cp redis.conf bin/ make PREFIX=/usr/local/redis install #编译安装Redis echo "export PATH=$PATH:/usr/local/redis/bin" >> /etc/profile 将Redis加入环境变量
启动Redis
1.前端模式启动
直接运行bin/redis-server将以前端模式启动,前端模式启动的缺点是ssh命令窗口关闭则redis-server程序结束,不推荐使用此方法
redis-server
2.后端模式启动
修改redis.conf配置文件, daemonize yes 以后端模式启动
vim /usr/local/redis/bin/redis.conf daemonize yes
执行如下命令启动redis:
redis-serverredis.conf
连接redis
redis-cli
关闭Redis
强行终止redis进程可能会导致redis持久化数据丢失。正确停止Redis的方式应该是向Redis发送SHUTDOWN命令,命令为:
redis-cli shutdown
写redis生产者和消费者配置文件
vim redis-consumer.conf input { redis { #定义redis消费者的配置 data_type => "list" #定义redis数据类型,redis支持三种数据类型,list,channel,pattern_channel,同的数据类型会导致实际采用不同的 Redis 命令操作,其中list,相当于队列;channel相当于发布订阅的某个特定的频道;pattern_channel相当于发布订阅某组频道。 key =>"redis_logstash" #定义redis列表或者频道的名称 host =>"192.168.179.134" #定义redis服务器的主机名 port => 6379 #定义redis服务器的端口 db => 1 #定义redis数据库编号 } } output { elasticsearch { #输出到elastic上 hosts =>"192.168.179.134" #定义elasticsearch的地址 index => "logstash_redis-%{+YYYY.MM.dd}" #添加一个elasticsearch索引 } stdout { codec => rubydebug } } vim redis-producer.conf input { stdin { } } output{ #定义redis生产者的配置 redis { host =>"192.168.179.134" #同redis消费者的配置 data_type => "list" db => 1 port => 6379 key => "logstash_redis" } }
启动redis和elasticsearch,logstash
redis-server/usr/local/redis/bin/redis.conf #后台运行redis redis-climonitor #开启redis监控 $ /elasticsearch/elasticsearch-6.4.0/bin/elasticsearch #开启elasticsearch [root@elasticelasticsearch-head]# grunt server #开启elastic-head插件 logstash -f/etc/logstash/conf.d/ #启动logstash netstat-ntlp #查看端口是否监听
在head插件上查看索引是否创建成功
使用logspout结合elk收集docker日志
随着容器的大量使用,现在docker已经在很多生产环境得到实践,不过,容器的日志,状态,确是一个大问题,我们知道,一般可以使用命令docker logs 来查看一个特定的容器,那如果想要收集当前机器所有容器的日志呢?或许我们可以将日志的输出记录到主机磁盘中,然后使用logstash 去收集,在你不考虑服务器性能的情况下,这当然也是一种方法,在这里我要介绍的使用logspout去进行docker日志的收集,这需要在你的主机上运行一个logspout的容器,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端,它是一个无状态的容器化程序,并不是用来管理日志文件或查看日志的,它主要是用于将主机上容器的日志发送到其它地方。目前它只捕获其它容器中的程序发送到stdout和stderr的日志。
Logspout 是 Docker 流行和轻量级的基于Alpine Linux构建的日志路由器,它将附加到主机中的所有容器,并将 Docker 日志流输出到 syslog 服务器
安装docker
1、安装依赖包
yum install-y yum-utils device-mapper-persistent-data lvm2
2、添加国内yum源
yum-config-manager--add-repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo
3、更新yum软件源缓存,并安装docker-ce
yummakecache fast && yum install docker-ce
4.设置镜像加速器,在/etc/docker/daemon.json中写入如下内容
{
"registry-mirrors": [
"https://registry.docker-cn.com"
]
}
阿里云镜像加速器地址 https://cr.console.aliyun.com/cn-hangzhou/mirrors
5.启动docker服务
systemctlstart docker
安装docker-comose (不是必须的)
curl -L "https://github.com/docker/compose/releases/download/1.22.0/docker-compose-$(uname-s)-$(uname -m)" -o /usr/local/bin/docker-compose
pull一个logspout的镜像
docker pull gliderlabs/logspout
写logstash配置文件并启动
vimlogspout.conf input { tcp { port => 5140 } udp { port => 5140 } } output { stdout { codec => rubydebug } elasticsearch { hosts =>"192.168.179.134" index => "logspout" } } logstash -f logspout.conf
启动logspout的容器
docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock --publish=192.168.179.134:8000:80
gliderlabs/logspout syslog+tcp:192.168.179.134:5140 #将收集到的日志转发至192.168.179.134:5140,默认采用udp协议,我修改成了tcp协议
logspout容器一启动就开始进行容器日志的转发,并打印到终端和elastic上
logspout将从没有-t选项启动的其他容器中收集日志,并且这些容器的日志驱动配置为系统默认的兼容journald和json-file的日志类型(即这些容器的日志可以通过docker logs命令查看)。
关于logspout的详细用法可以查看logspout的docker仓库https://hub.docker.com/r/gliderlabs/logspout/
我再启动一个新的centos容器,并输出一些信息
docker run -d --name=cen7 centos echo "helloworld" >>/var/log/lastlog
使用curl检查日志流
通过使用logspout的httpstream模块,可以通过curl时时查看logspout路由的日志,甚至可以不提供日志路由的URI:
curl 192.168.179.134:8000/logs
忽略指定的容器
可以通过在启动容器时设置环境变量来告诉logspout忽略特定容器,如下所示:
docker run-d -e 'LOGSPOUT=ignore' image
或者,通过在运行logspout时通过设置环境变量来添加您定义的标签:
docker run --name="logspout" \ -e EXCLUDE_LABEL=logspout.exclude \ --volume=/var/run/docker.sock:/var/run/docker.sock\ gliderlabs/logspout docker run -d --label logspout.exclude=trueimage
指定特定的容器
可以通过在URI上设置过滤器参数来告诉logspout仅包含某些容器:
指定容器名包含db的容器
docker run \ --volume=/var/run/docker.sock:/var/run/docker.sock\ gliderlabs/logspout\ raw://192.168.10.10:5000?filter.name=*_db
或者指定容器id:
docker run \ --volume=/var/run/docker.sock:/var/run/docker.sock\ gliderlabs/logspout\ raw://192.168.10.10:5000?filter.id=3b6ba57db54a
将容器日志直接路由至logstash
这样需要修改模块配置文件modules.go
添加logspout-logstash模块
_ "github.com/gliderlabs/logspout/transports/udp" _"github.com/looplab/logspout-logstash"
重新编写dockerfile并构建镜像
FROM gliderlabs/logspout:latest COPY./modules.go /src/modules.go
重新构建logspout后,可以通过指定环境变量ROUTE_URIS=logstash://host:port来给定logstash服务器的URI,当然也可以直接在dockerfile中指定logstash URI的环境变量
ENVROUTE_URIS=logstash://host:port。 docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock -e ROUTE_URIS=logstash://192.168.179.134:5140 registry.cn-beijing.aliyuncs.com/andyyoung01/logspout-logstash
在https://github.com/andyyoung01/logspout-logstash/tree/test_branch/custom上已经配置好了,可以直接clone重新构建