Elastic Stack日志收集系统笔记 (logstash部分)(下)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: Elastic Stack日志收集系统笔记 (logstash部分)(下)

match


值类型为数组,默认值为空


用于将指定的字段按照指定的格式解析.比如:


match =>["createtime", "yyyyMMdd","yyyy-MM-dd"]


target


值类型是字符串,默认值为“@timestamp”


将匹配的时间戳存储到给定的目标字段中。如果未提供,则默认更新@timestamp事件的字段。


timezone


值类型是字符串,没有默认值


用于为要被解析的时间指定一个时区,值为时区的canonical ID,一般不用设置,因为会根据当前系统的时区获取这个值.


这里设置的时区并不是logstash最终储存的时间的时区,logstash最终储存的时间为UTC标准时间


时间格式


Symbol

Meaning

Presentation

Examples

G

era

text

AD

C

century of era (>=0)

number

20

Y

year of era (>=0)

year

1996

x

weekyear

year

1996

w

week of weekyear

number

27

e

day of week

number

2

E

day of week

text

Tuesday; Tue

y

year

year

1996

D

day of year

number

189

M

month of year

month

July; Jul; 07

d

day of month

number

10

a

halfday of day

text

PM

K

hour of halfday (0~11)

number

0

h

clockhour of halfday (1~12)

number

12

H

hour of day (0~23)

number

0

k

clockhour of day (1~24)

number

24

m

minute of hour

number

30

s

second of minute

number

55

S

fraction of second

number

978

z

time zone

text

Pacific Standard Time; PST

Z

time zone offset/id

zone

-0800; -08:00; America/Los_Angeles

'

escape for text

delimiter


''

single quote

literal

'


IP分析插件geoip


描述


geoip插件能分析访问的ip分析出访问者的地址信息,GeoLite2数据库是免费的IP地理定位数据库,与MaxMind的GeoIP2数据库相当,但不太准确,geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的geoip 插件对 null 结果的处理是:不生成对应的geoip.字段。


source


这是必须设置的值,值类型是字符串


包含要通过geoip映射的IP地址或主机名的字段。如果此字段是数组,则仅使用第一个值。


database


指定数据库的路径,值类型是路径


Logstash应该使用的Maxmind数据库文件的路径。默认数据库是GeoLite2-City。GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind提供的免费数据库,受支持。GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。


如果未指定,则默认为Logstash附带的GeoLite2 City数据库。


field


GeoIP 库数据较多,如果你不需要这么多内容,可以通过 fields 选项指定自己所需要的,比如:


fields =>["city_name", "continent_code", "country_code2","country_code3", "country_name", "dma_code","ip", "latitude", "longitude","postal_code", "region_name", "timezone"]


default_database_type


值类型是字符串,默认值为city,可选的值为city和ASN


该插件现在包括GeoLite2-City和GeoLite2-ASN数据库。如果database和default_database_type未设置,将选择GeoLite2-City数据库。要使用包含的GeoLite2-ASN数据库,请设置default_database_type为ASN。


多行编解码插件multiline


描述


此编解码器的最初目标是允许将来自文件的多行消息连接到单个事件中。例如,将Java异常和堆栈跟踪消息加入单个事件中。


negate


值类型是布尔值,默认值为false


否定正则表达式模式。negate是对pattern的结果做判断是否匹配,默认值是false代表匹配,而true代表不匹配,这里并没有反,因为negate本身是否定的意思,双重否定表肯定。


pattern


必须设置的,值类型是字符串


pattern后面加要匹配的正则表达式,可以使用grok正则表达式的模板来配置该选项。


what


这是必须的设置,值可以是任何的:previous,next


如果模式匹配,事件是否属于下一个或上一个事件,previous 值指定行匹配pattern选项的内容是上一行的一部分。 next 指定行匹配pattern选项的内容是下一行的一部分。


pattern_dir


值类型是数组,默认值为[]


Logstash默认带有一堆模式,如果你要添加其他模式,可以将匹配模式写到文件里

例如


NUMBER \d+


示例


codec=>multiline  {
                        pattern =>"^\["
                        negate => true
                        what =>"previous"
}


如果没有匹配到以“[”开头的行,那肯定是属于前一行的。


使用logstash收集日志操作


使用logstash收集NGINX日志


vim logstash-nginx.conf
input {
        file {                                                   #使用file插件收集日志
          path =>"/var/log/nginx/*.log"           #指定日志路径
          start_position =>"beginning"            #指定日志读取的位置,默认是end,


beginning表示从文件开始的位置读取,而end表示从上次读取结束后的日志文件开始读取,但是如果记录过文件的读取信息,这个配置也就失去作用了。


 

}
}
filter {
        if [path] =~ "access" {                                            #判断是否是access日志
          mutate { replace => { type =>"nginx_access" } }     #如果是access日志则添加一个type字段
          grok {
            match => { "message"=> "%{COMBINEDAPACHELOG}" }     #使用grok插件过滤access日志,将其转化成多个字段,%{COMBINEDAPACHELOG}是grok定义好的表达式
          }
          date {
            match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]      #将收集到的日志日期作为时间戳
          }
        }
        else if [path] =~ "error"{                                        #判断是否是错误日志
          mutate { replace => { type =>"nginx_error" } }       #如果是错误日志,则添加一个type字段
          grok {
            match => { "message"=> "(?<datetime>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})\[(?<errtype>\w+)\] \S+: \*\d+ (?<errmsg>[^,]+),(?<errinfo>.*$)" }           #使用grok插件过滤error日志,将其转化成多个字段,这里的表达式是自己定义的  
          }
          date {
            match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]      #将收集到的日志作为时间戳
          }
        }
}
output {
        if [type] =~ "access" {                   #如果收集到的日志为access类型,那么就将日志输出到access的索引
          elasticsearch {
            hosts =>["192.168.179.134:9200"]
            index =>"nginx_access-%{+YYYY.MM.dd}"
          }
        }
        else if [type] =~ "error"{               #如果收集到的日志为error类型,那么就将日志输出到error的索引
          elasticsearch {
            hosts =>["192.168.179.134:9200"]
            index =>"nginx_error-%{+YYYY.MM.dd}"
          }
        }
}


使用logstash结合rsyslog收集系统日志


rsyslog是日志收集工具,现在很多Linux都自带rsyslog,用其替换掉syslog。


rsyslog本身有一个配置文件/etc/rsyslog.conf,里面定义了日志文件,以及相应保存的地址。


一般通过rsyslog来收集日志信息,并发送到logstash。


配置rsyslog


vim /etc/rsrslog.conf
*.* @@192.168.179.134:5514


配置logstash


vim rsyslog.conf
input {
        syslog {
           host =>"192.168.179.134"
          port => 5514
        }
}
output {
        stdout {
codec => rubydebug
        }
}


在命令行输入logger "helo world"可以查看logstash的输出


logger是一个shell命令接口,可以通过该接口使用Syslog的系统日志模块,还可以从命令行直接向系统日志文件写入一行信息。


使用Redis作为消息队列来收集日志


redis服务器是logstash官方推荐的broker(代理人)选择,broker角色也就意味着会同时存在输入和输出两个插件,发送消息的也就是输出插件被称作生产者,而接收消息的也就是输入插件被称作消费者。


redis消息队列作用说明:


  1.    防止Logstash和ES无法正常通信,从而丢失日志。
  2.    防止日志量过大导致ES无法承受大量写操作从而丢失日志。
  3.    应用程序(php,java)在输出日志时,可以直接输出到消息队列,从而  完成日志收集。


补充:如果redis使用的消息队列出现扩展瓶颈,可以使用更加强大的kafka,flume来代替


安装Redis


wget http://download.redis.io/releases/redis-4.0.11.tar.gz  #下载Redis源码


tar -zxfredis-4.0.11.tar.gz      #解压Redis源码
cp -r redis-4.0.11 /usr/local/   
mv redis-4.0.11/ redis
cd /usr/local/redis/
cp redis.conf bin/
make PREFIX=/usr/local/redis install     #编译安装Redis
echo "export PATH=$PATH:/usr/local/redis/bin" >> /etc/profile   将Redis加入环境变量


启动Redis


1.前端模式启动


直接运行bin/redis-server将以前端模式启动,前端模式启动的缺点是ssh命令窗口关闭则redis-server程序结束,不推荐使用此方法


redis-server


2.后端模式启动


修改redis.conf配置文件, daemonize yes 以后端模式启动


vim /usr/local/redis/bin/redis.conf
daemonize yes


执行如下命令启动redis:


redis-serverredis.conf


连接redis


redis-cli


关闭Redis


强行终止redis进程可能会导致redis持久化数据丢失。正确停止Redis的方式应该是向Redis发送SHUTDOWN命令,命令为:


redis-cli shutdown


写redis生产者和消费者配置文件


vim redis-consumer.conf
input {
        redis {         #定义redis消费者的配置
          data_type => "list"          #定义redis数据类型,redis支持三种数据类型,list,channel,pattern_channel,同的数据类型会导致实际采用不同的 Redis 命令操作,其中list,相当于队列;channel相当于发布订阅的某个特定的频道;pattern_channel相当于发布订阅某组频道。
          key =>"redis_logstash"       #定义redis列表或者频道的名称
          host =>"192.168.179.134"   #定义redis服务器的主机名
          port => 6379                      #定义redis服务器的端口
          db => 1                              #定义redis数据库编号
        }
}
output {
        elasticsearch {                           #输出到elastic上
          hosts =>"192.168.179.134"     #定义elasticsearch的地址
          index => "logstash_redis-%{+YYYY.MM.dd}"     #添加一个elasticsearch索引
        }
        stdout { codec => rubydebug }
}
vim redis-producer.conf 
 input { stdin { } }
output{                                #定义redis生产者的配置
        redis {
           host =>"192.168.179.134"     #同redis消费者的配置
           data_type => "list"
           db => 1
           port => 6379
           key => "logstash_redis"
        }
}


启动redis和elasticsearch,logstash


redis-server/usr/local/redis/bin/redis.conf     #后台运行redis
redis-climonitor                #开启redis监控
$ /elasticsearch/elasticsearch-6.4.0/bin/elasticsearch    #开启elasticsearch
[root@elasticelasticsearch-head]# grunt server          #开启elastic-head插件
logstash -f/etc/logstash/conf.d/       #启动logstash
netstat-ntlp                                  #查看端口是否监听


640.jpg


在head插件上查看索引是否创建成功


640.jpg


使用logspout结合elk收集docker日志


随着容器的大量使用,现在docker已经在很多生产环境得到实践,不过,容器的日志,状态,确是一个大问题,我们知道,一般可以使用命令docker logs 来查看一个特定的容器,那如果想要收集当前机器所有容器的日志呢?或许我们可以将日志的输出记录到主机磁盘中,然后使用logstash 去收集,在你不考虑服务器性能的情况下,这当然也是一种方法,在这里我要介绍的使用logspout去进行docker日志的收集,这需要在你的主机上运行一个logspout的容器,负责将同一个主机上其他容器的日志,根据route设定,转发给不同的接收端,它是一个无状态的容器化程序,并不是用来管理日志文件或查看日志的,它主要是用于将主机上容器的日志发送到其它地方。目前它只捕获其它容器中的程序发送到stdout和stderr的日志。


Logspout 是 Docker 流行和轻量级的基于Alpine Linux构建的日志路由器,它将附加到主机中的所有容器,并将 Docker 日志流输出到 syslog 服务器


安装docker


1、安装依赖包


yum install-y yum-utils device-mapper-persistent-data lvm2


2、添加国内yum源


yum-config-manager--add-repo https://mirrors.ustc.edu.cn/docker-ce/linux/centos/docker-ce.repo


3、更新yum软件源缓存,并安装docker-ce


yummakecache fast && yum install docker-ce


4.设置镜像加速器,在/etc/docker/daemon.json中写入如下内容


{

 "registry-mirrors": [

   "https://registry.docker-cn.com"

 ]

}


阿里云镜像加速器地址 https://cr.console.aliyun.com/cn-hangzhou/mirrors


5.启动docker服务


systemctlstart docker


安装docker-comose   (不是必须的)


curl -L "https://github.com/docker/compose/releases/download/1.22.0/docker-compose-$(uname-s)-$(uname -m)" -o /usr/local/bin/docker-compose


pull一个logspout的镜像


docker pull gliderlabs/logspout


写logstash配置文件并启动


vimlogspout.conf
input {
        tcp {
          port => 5140
        }
        udp {
          port => 5140
        }
}
output {
        stdout {
          codec => rubydebug
        }
        elasticsearch {
          hosts =>"192.168.179.134"
          index => "logspout"
        }
}
logstash -f logspout.conf

启动logspout的容器


docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock --publish=192.168.179.134:8000:80


gliderlabs/logspout syslog+tcp:192.168.179.134:5140       #将收集到的日志转发至192.168.179.134:5140,默认采用udp协议,我修改成了tcp协议


logspout容器一启动就开始进行容器日志的转发,并打印到终端和elastic上


logspout将从没有-t选项启动的其他容器中收集日志,并且这些容器的日志驱动配置为系统默认的兼容journald和json-file的日志类型(即这些容器的日志可以通过docker logs命令查看)。


关于logspout的详细用法可以查看logspout的docker仓库https://hub.docker.com/r/gliderlabs/logspout/


640.jpg


我再启动一个新的centos容器,并输出一些信息

docker run -d --name=cen7 centos echo "helloworld" >>/var/log/lastlog

640.png


使用curl检查日志流


通过使用logspout的httpstream模块,可以通过curl时时查看logspout路由的日志,甚至可以不提供日志路由的URI:


curl 192.168.179.134:8000/logs


640.png


忽略指定的容器


可以通过在启动容器时设置环境变量来告诉logspout忽略特定容器,如下所示:


docker run-d -e 'LOGSPOUT=ignore' image


或者,通过在运行logspout时通过设置环境变量来添加您定义的标签:

docker run --name="logspout" \
    -e EXCLUDE_LABEL=logspout.exclude \
    --volume=/var/run/docker.sock:/var/run/docker.sock\
    gliderlabs/logspout
 docker run -d --label logspout.exclude=trueimage

指定特定的容器


可以通过在URI上设置过滤器参数来告诉logspout仅包含某些容器:


指定容器名包含db的容器


docker run \
--volume=/var/run/docker.sock:/var/run/docker.sock\
gliderlabs/logspout\
raw://192.168.10.10:5000?filter.name=*_db


或者指定容器id:


docker run \
--volume=/var/run/docker.sock:/var/run/docker.sock\
gliderlabs/logspout\
raw://192.168.10.10:5000?filter.id=3b6ba57db54a


将容器日志直接路由至logstash


这样需要修改模块配置文件modules.go


添加logspout-logstash模块


_ "github.com/gliderlabs/logspout/transports/udp"
_"github.com/looplab/logspout-logstash"


重新编写dockerfile并构建镜像


FROM  gliderlabs/logspout:latest
COPY./modules.go /src/modules.go


重新构建logspout后,可以通过指定环境变量ROUTE_URIS=logstash://host:port来给定logstash服务器的URI,当然也可以直接在dockerfile中指定logstash URI的环境变量


ENVROUTE_URIS=logstash://host:port。
docker run --name="logspout" --volume=/var/run/docker.sock:/var/run/docker.sock -e ROUTE_URIS=logstash://192.168.179.134:5140 registry.cn-beijing.aliyuncs.com/andyyoung01/logspout-logstash


https://github.com/andyyoung01/logspout-logstash/tree/test_branch/custom上已经配置好了,可以直接clone重新构建

相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
1月前
|
Prometheus 监控 Cloud Native
基于docker搭建监控系统&日志收集
Prometheus 是一款由 SoundCloud 开发的开源监控报警系统及时序数据库(TSDB),支持多维数据模型和灵活查询语言,适用于大规模集群监控。它通过 HTTP 拉取数据,支持服务发现、多种图表展示(如 Grafana),并可结合 Loki 实现日志聚合。本文介绍其架构、部署及与 Docker 集成的监控方案。
261 122
基于docker搭建监控系统&日志收集
WGLOG日志管理系统是怎么收集日志的
WGLOG通过部署Agent客户端采集日志,Agent持续收集指定日志文件并上报Server,Server负责展示与分析。Agent与Server需保持相同版本。官网下载地址:www.wgstart.com
|
4月前
|
监控 API 开发工具
HarmonyOS Next的HiLog日志系统完全指南:从入门到精通
本文深入解析HarmonyOS Next的HiLog日志系统,涵盖日志级别、核心API、隐私保护与高级回调功能,助你从入门到精通掌握这一重要开发工具。
246 1
|
27天前
|
Ubuntu
在Ubuntu系统上设置syslog日志轮替与大小限制
请注意,在修改任何系统级别配置之前,请务必备份相应得原始档案并理解每项变更可能带来得影响。
94 2
|
3月前
|
存储
WGLOG日志管理系统可以采集网络设备的日志吗
WGLOG日志审计系统提供开放接口,支持外部获取日志内容后发送至该接口,实现日志的存储与分析。详情请访问:https://www.wgstart.com/wglog/docs9.html
|
7月前
|
存储 消息中间件 缓存
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
基于阿里云SelectDB,MiniMax构建了覆盖国内及海外业务的日志可观测中台,总体数据规模超过数PB,日均新增日志写入量达数百TB。系统在P95分位查询场景下的响应时间小于3秒,峰值时刻实现了超过10GB/s的读写吞吐。通过存算分离、高压缩比算法和单副本热缓存等技术手段,MiniMax在优化性能的同时显著降低了建设成本,计算资源用量降低40%,热数据存储用量降低50%,为未来业务的高速发展和技术演进奠定了坚实基础。
287 1
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
|
7月前
|
存储 JSON Go
PHP 日志系统的最佳搭档:一个 Go 写的远程日志收集服务
为了不再 SSH 上去翻日志,我写了个 Go 小脚本,用来接收远程日志。PHP 负责记录日志,Go 负责存储和展示,按天存储、支持 API 访问、可远程管理,终于能第一时间知道项目炸了。
123 10
|
5月前
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
644 54
|
10月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
270 9

热门文章

最新文章