Elastic Stack日志收集系统笔记 (logstash部分)(下)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: Elastic Stack日志收集系统笔记 (logstash部分)(下)

match


值类型为数组,默认值为空


用于将指定的字段按照指定的格式解析.比如:


match =>["createtime", "yyyyMMdd","yyyy-MM-dd"]


target


值类型是字符串,默认值为“@timestamp”


将匹配的时间戳存储到给定的目标字段中。如果未提供,则默认更新@timestamp事件的字段。


timezone


值类型是字符串,没有默认值


用于为要被解析的时间指定一个时区,值为时区的canonical ID,一般不用设置,因为会根据当前系统的时区获取这个值.


这里设置的时区并不是logstash最终储存的时间的时区,logstash最终储存的时间为UTC标准时间


时间格式


Symbol

Meaning

Presentation

Examples

G

era

text

AD

C

century of era (>=0)

number

20

Y

year of era (>=0)

year

1996

x

weekyear

year

1996

w

week of weekyear

number

27

e

day of week

number

2

E

day of week

text

Tuesday; Tue

y

year

year

1996

D

day of year

number

189

M

month of year

month

July; Jul; 07

d

day of month

number

10

a

halfday of day

text

PM

K

hour of halfday (0~11)

number

0

h

clockhour of halfday (1~12)

number

12

H

hour of day (0~23)

number

0

k

clockhour of day (1~24)

number

24

m

minute of hour

number

30

s

second of minute

number

55

S

fraction of second

number

978

z

time zone

text

Pacific Standard Time; PST

Z

time zone offset/id

zone

-0800; -08:00; America/Los_Angeles

'

escape for text

delimiter


''

single quote

literal

'


IP分析插件geoip


描述


geoip插件能分析访问的ip分析出访问者的地址信息,GeoLite2数据库是免费的IP地理定位数据库,与MaxMind的GeoIP2数据库相当,但不太准确,geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的geoip 插件对 null 结果的处理是:不生成对应的geoip.字段。


source


这是必须设置的值,值类型是字符串


包含要通过geoip映射的IP地址或主机名的字段。如果此字段是数组,则仅使用第一个值。


database


指定数据库的路径,值类型是路径


Logstash应该使用的Maxmind数据库文件的路径。默认数据库是GeoLite2-City。GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind提供的免费数据库,受支持。GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。


如果未指定,则默认为Logstash附带的GeoLite2 City数据库。


field


GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的,比如:


fields =>["city_name", "continent_code", "country_code2","country_code3", "country_name", "dma_code","ip", "latitude", "longitude","postal_code", "region_name", "timezone"]


default_database_type


值类型是字符串,默认值为city,可选的值为city和ASN


该插件现在包括GeoLite2-City和GeoLite2-ASN数据库。如果database和default_database_type未设置,将选择GeoLite2-City数据库。要使用包含的GeoLite2-ASN数据库,请设置default_database_type为ASN。


多行编解码插件multiline


描述


此编解码器的最初目标是允许将来自文件的多行消息连接到单个事件中。例如,将Java异常和堆栈跟踪消息加入单个事件中。


negate


值类型是布尔值,默认值为false


否定正则表达式模式。negate是对pattern的结果做判断是否匹配,默认值是false代表匹配,而true代表不匹配,这里并没有反,因为negate本身是否定的意思,双重否定表肯定。


pattern


必须设置的,值类型是字符串


pattern后面加要匹配的正则表达式,可以使用grok正则表达式的模板来配置该选项。


what


这是必须的设置,值可以是任何的:previous,next


如果模式匹配,事件是否属于下一个或上一个事件,previous 值指定行匹配pattern选项的内容是上一行的一部分。 next 指定行匹配pattern选项的内容是下一行的一部分。


pattern_dir


值类型是数组,默认值为[]


Logstash默认带有一堆模式,如果你要添加其他模式,可以将匹配模式写到文件里

例如


NUMBER \d+


示例


codec=>multiline  {
                        pattern =>"^\["
                        negate => true
                        what =>"previous"
}


如果没有匹配到以“[”开头的行,那肯定是属于前一行的。


使用logstash收集日志操作


使用logstash收集NGINX日志


vim logstash-nginx.conf
input {
        file {                                                   #使用file插件收集日志
          path =>"/var/log/nginx/*.log"           #指定日志路径
          start_position =>"beginning"            #指定日志读取的位置,默认是end,


beginning表示从文件开始的位置读取,而end表示从上次读取结束后的日志文件开始读取,但是如果记录过文件的读取信息,这个配置也就失去作用了。


 

}
}
filter {
        if [path] =~ "access" {                                            #判断是否是access日志
          mutate { replace => { type =>"nginx_access" } }     #如果是access日志则添加一个type字段
          grok {
            match => { "message"=> "%{COMBINEDAPACHELOG}" }     #使用grok插件过滤access日志,将其转化成多个字段,%{COMBINEDAPACHELOG}是grok定义好的表达式
          }
          date {
            match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]      #将收集到的日志日期作为时间戳
          }
        }
        else if [path] =~ "error"{                                        #判断是否是错误日志
          mutate { replace => { type =>"nginx_error" } }       #如果是错误日志,则添加一个type字段
          grok {
            match => { "message"=> "(?<datetime>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+),(?<errinfo>.*$)" }           #使用grok插件过滤error日志,将其转化成多个字段,这里的表达式是自己定义的  
          }
          date {
            match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]      #将收集到的日志作为时间戳
          }
        }
}
output {
        if [type] =~ "access" {                   #如果收集到的日志为access类型,那么就将日志输出到access的索引
          elasticsearch {
            hosts =>["192.168.179.134:9200"]
            index =>"nginx_access-%{+YYYY.MM.dd}"
          }
        }
        else if [type] =~ "error"{               #如果收集到的日志为error类型,那么就将日志输出到error的索引
          elasticsearch {
            hosts =>["192.168.179.134:9200"]
            index =>"nginx_error-%{+YYYY.MM.dd}"
          }
        }
}


使用logstash结合rsyslog收集系统日志


rsyslog是日志收集工具,现在很多Linux都自带rsyslog,用其替换掉syslog。


rsyslog本身有一个配置文件/etc/rsyslog.conf,里面定义了日志文件,以及相应保存的地址。


一般通过rsyslog来收集日志信息,并发送到logstash。


配置rsyslog


vim /etc/rsrslog.conf
*.* @@192.168.179.134:5514


配置logstash


vim rsyslog.conf
input {
        syslog {
           host =>"192.168.179.134"
          port => 5514
        }
}
output {
        stdout {
codec => rubydebug
        }
}


在命令行输入logger "helo world"可以查看logstash的输出


logger是一个shell命令接口,可以通过该接口使用Syslog的系统日志模块,还可以从命令行直接向系统日志文件写入一行信息。


使用Redis作为消息队列来收集日志


redis服务器是logstash官方推荐的broker(代理人)选择,broker角色也就意味着会同时存在输入和输出两个插件,发送消息的也就是输出插件被称作生产者,而接收消息的也就是输入插件被称作消费者。


redis消息队列作用说明:


  1.    防止Logstash和ES无法正常通信,从而丢失日志。
  2.    防止日志量过大导致ES无法承受大量写操作从而丢失日志。
  3.    应用程序(php,java)在输出日志时,可以直接输出到消息队列,从而  完成日志收集。


补充:如果redis使用的消息队列出现扩展瓶颈,可以使用更加强大的kafka,flume来代替


安装Redis


wget http://download.redis.io/releases/redis-4.0.11.tar.gz  #下载Redis源码


tar -zxfredis-4.0.11.tar.gz      #解压Redis源码
cp -r redis-4.0.11 /usr/local/   
mv redis-4.0.11/ redis
cd /usr/local/redis/
cp redis.conf bin/
make PREFIX=/usr/local/redis install     #编译安装Redis
echo "export PATH=$PATH:/usr/local/redis/bin" >> /etc/profile   将Redis加入环境变量


启动Redis


1.前端模式启动


直接运行bin/redis-server将以前端模式启动,前端模式启动的缺点是ssh命令窗口关闭则redis-server程序结束,不推荐使用此方法


redis-server


2.后端模式启动


修改redis.conf配置文件, daemonize yes 以后端模式启动


vim /usr/local/redis/bin/redis.conf
daemonize yes


执行如下命令启动redis:


redis-serverredis.conf


连接redis


redis-cli


关闭Redis


强行终止redis进程可能会导致redis持久化数据丢失。正确停止Redis的方式应该是向Redis发送SHUTDOWN命令,命令为:


redis-cli shutdown


写redis生产者和消费者配置文件


vim redis-consumer.conf
input {
        redis {         #定义redis消费者的配置
          data_type => "list"          #定义redis数据类型,redis支持三种数据类型,list,channel,pattern_channel,同的数据类型会导致实际采用不同的 Redis 命令操作,其中list,相当于队列;channel相当于发布订阅的某个特定的频道;pattern_channel相当于发布订阅某组频道。
          key =>"redis_logstash"       #定义redis列表或者频道的名称
          host =>"192.168.179.134"   #定义redis服务器的主机名
          port => 6379                      #定义redis服务器的端口
          db => 1                              #定义redis数据库编号
        }
}
output {
        elasticsearch {                           #输出到elastic上
          hosts =>"192.168.179.134"     #定义elasticsearch的地址
          index => "logstash_redis-%{+YYYY.MM.dd}"     #添加一个elasticsearch索引
        }
        stdout { codec => rubydebug }
}
vim redis-producer.conf 
 input { stdin { } }
output{                                #定义redis生产者的配置
        redis {
           host =>"192.168.179.134"     #同redis消费者的配置
           data_type => "list"
           db => 1
           port => 6379
           key => "logstash_redis"
        }
}


启动redis和elasticsearch,logstash


redis-server/usr/local/redis/bin/redis.conf     #后台运行redis
redis-climonitor                #开启redis监控
$ /elasticsearch/elasticsearch-6.4.0/bin/elasticsearch    #开启elasticsearch
[root@elasticelasticsearch-head]# grunt server          #开启elastic-head插件
logstash -f/etc/logstash/conf.d/       #启动logstash
netstat-ntlp                                  #查看端口是否监听


640.jpg


在head插件上查看索引是否创建成功


640.jpg


使用logspout结合elk收集docker日志


随着容器的大量使用,现在docker已经在很多生产环境得到实践,不过,容器的日志,状态,确是一个大问题,我们知道,一般可以使用命令docker logs 来查看一个特定的容器,那如果想要收集当前机器所有容器的日志呢?或许我们可以将日志的输出记录到主机磁盘中,然后使用logstash 去收集,在你不考虑服务器性能的情况下,这当然也是一种方法,在这里我要介绍的使用logspout去进行docker日志的收集,这需要在你的主机上运行一个logspout的容器,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端,它是一个无状态的容器化程序,并不是用来管理日志文件或查看日志的,它主要是用于将主机上容器的日志发送到其它地方。目前它只捕获其它容器中的程序发送到stdout和stderr的日志。


Logspout 是 Docker 流行和轻量级的基于Alpine Linux构建的日志路由器,它将附加到主机中的所有容器,并将 Docker 日志流输出到 syslog 服务器


安装docker


1、安装依赖包


yum install-y yum-utils device-mapper-persistent-data lvm2


2、添加国内yum源


yum-config-manager--add-repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo


3、更新yum软件源缓存,并安装docker-ce


yummakecache fast && yum install docker-ce


4.设置镜像加速器,在/etc/docker/daemon.json中写入如下内容


{

 "registry-mirrors": [

   "https://registry.docker-cn.com"

 ]

}


阿里云镜像加速器地址 https://cr.console.aliyun.com/cn-hangzhou/mirrors


5.启动docker服务


systemctlstart docker


安装docker-comose   (不是必须的)


curl -L "https://github.com/docker/compose/releases/download/1.22.0/docker-compose-$(uname-s)-$(uname -m)" -o /usr/local/bin/docker-compose


pull一个logspout的镜像


docker pull gliderlabs/logspout


写logstash配置文件并启动


vimlogspout.conf
input {
        tcp {
          port => 5140
        }
        udp {
          port => 5140
        }
}
output {
        stdout {
          codec => rubydebug
        }
        elasticsearch {
          hosts =>"192.168.179.134"
          index => "logspout"
        }
}
logstash -f logspout.conf

启动logspout的容器


docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock --publish=192.168.179.134:8000:80


gliderlabs/logspout syslog+tcp:192.168.179.134:5140       #将收集到的日志转发至192.168.179.134:5140,默认采用udp协议,我修改成了tcp协议


logspout容器一启动就开始进行容器日志的转发,并打印到终端和elastic上


logspout将从没有-t选项启动的其他容器中收集日志,并且这些容器的日志驱动配置为系统默认的兼容journald和json-file的日志类型(即这些容器的日志可以通过docker logs命令查看)。


关于logspout的详细用法可以查看logspout的docker仓库https://hub.docker.com/r/gliderlabs/logspout/


640.jpg


我再启动一个新的centos容器,并输出一些信息

docker run -d --name=cen7 centos echo "helloworld" >>/var/log/lastlog

640.png


使用curl检查日志流


通过使用logspout的httpstream模块,可以通过curl时时查看logspout路由的日志,甚至可以不提供日志路由的URI:


curl 192.168.179.134:8000/logs


640.png


忽略指定的容器


可以通过在启动容器时设置环境变量来告诉logspout忽略特定容器,如下所示:


docker run-d -e 'LOGSPOUT=ignore' image


或者,通过在运行logspout时通过设置环境变量来添加您定义的标签:

docker run --name="logspout" \
    -e EXCLUDE_LABEL=logspout.exclude \
    --volume=/var/run/docker.sock:/var/run/docker.sock\
    gliderlabs/logspout
 docker run -d --label logspout.exclude=trueimage

指定特定的容器


可以通过在URI上设置过滤器参数来告诉logspout仅包含某些容器:


指定容器名包含db的容器


docker run \
--volume=/var/run/docker.sock:/var/run/docker.sock\
gliderlabs/logspout\
raw://192.168.10.10:5000?filter.name=*_db


或者指定容器id:


docker run \
--volume=/var/run/docker.sock:/var/run/docker.sock\
gliderlabs/logspout\
raw://192.168.10.10:5000?filter.id=3b6ba57db54a


将容器日志直接路由至logstash


这样需要修改模块配置文件modules.go


添加logspout-logstash模块


_ "github.com/gliderlabs/logspout/transports/udp"
_"github.com/looplab/logspout-logstash"


重新编写dockerfile并构建镜像


FROM  gliderlabs/logspout:latest
COPY./modules.go /src/modules.go


重新构建logspout后,可以通过指定环境变量ROUTE_URIS=logstash://host:port来给定logstash服务器的URI,当然也可以直接在dockerfile中指定logstash URI的环境变量


ENVROUTE_URIS=logstash://host:port。
docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock -e ROUTE_URIS=logstash://192.168.179.134:5140 registry.cn-beijing.aliyuncs.com/andyyoung01/logspout-logstash


https://github.com/andyyoung01/logspout-logstash/tree/test_branch/custom上已经配置好了,可以直接clone重新构建

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
存储 前端开发 数据可视化
Grafana Loki,轻量级日志系统
本文介绍了基于Grafana、Loki和Alloy构建的轻量级日志系统。Loki是一个由Grafana Labs开发的日志聚合系统,具备高可用性和多租户支持,专注于日志而非指标,通过标签索引而非内容索引实现高效存储。Alloy则是用于收集和转发日志至Loki的强大工具。文章详细描述了系统的架构、组件及其工作流程,并提供了快速搭建指南,包括准备步骤、部署命令及验证方法。此外,还展示了如何使用Grafana查看日志,以及一些基本的LogQL查询示例。最后,作者探讨了Loki架构的独特之处,提出了“巨型单体模块化”的概念,即一个应用既可单体部署也可分布式部署,整体协同实现全部功能。
435 69
Grafana Loki,轻量级日志系统
|
2月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
99 8
|
3月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
131 2
|
4月前
|
存储 监控 安全
|
4月前
|
存储 Linux Docker
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
395 2
|
4月前
|
存储 JSON 监控
开源日志分析Logstash
【10月更文挑战第22天】
92 1
|
5月前
|
XML JSON 监控
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
113 1
|
5月前
|
监控 网络协议 安全
Linux系统日志管理
Linux系统日志管理
103 3
|
18天前
|
存储 缓存 关系型数据库
图解MySQL【日志】——Redo Log
Redo Log(重做日志)是数据库中用于记录数据页修改的物理日志,确保事务的持久性和一致性。其主要作用包括崩溃恢复、提高性能和保证事务一致性。Redo Log 通过先写日志的方式,在内存中缓存修改操作,并在适当时候刷入磁盘,减少随机写入带来的性能损耗。WAL(Write-Ahead Logging)技术的核心思想是先将修改操作记录到日志文件中,再择机写入磁盘,从而实现高效且安全的数据持久化。Redo Log 的持久化过程涉及 Redo Log Buffer 和不同刷盘时机的控制参数(如 `innodb_flush_log_at_trx_commit`),以平衡性能与数据安全性。
27 5
图解MySQL【日志】——Redo Log
|
4月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
1154 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板