Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

本文涉及的产品
MSE Nacos 企业版免费试用,1600元额度,限量50份
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

一、前言

随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过 Filebeat+Kafka+Logstash+Elasticsearch 采集日志数据到 Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。

本文介绍具体的实现方法。

二、背景信息

Kafka 是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。在实际应用场景中,为了满足大数据实时检索的需求,一般可以使用 Filebeat 采集日志数据,将 Kafka 作为 Filebeat 的输出端。Kafka 实时接收到 Filebeat 采集的数据后,以 Logstash 作为输出端输出。输出到 Logstash 中的数据在格式或内容上可能不能满足你的需求,此时可以通过 Logstash 的 filter 插件过滤数据。最后将满足需求的数据输出到 ES 中进行分布式检索,并通过 Kibana 进行数据分析与展示。

简单处理流程如下:
image.png

三、操作流程

  1. 准备工作
    • 完成环境准备
    • 包括创建对应服务
    • 安装 Filebeat 。
  2. 配置 Filebeat:配置 Filebeat 的 input 为系统日志,outpu 为 Kafka,将日志数据采集到 Kafka 的指定 Topic 中。
  3. 配置 Logstash 管道:配置 Logstash 管道的 input 为 Kafka,output 为ES,使用 Logstash 消费 Topic 中的数据并传输到ES 中。
  4. 查看日志消费状态:在消息队列 Kafka 中查看日志数据的消费的状态,验证日志数据是否采集成功。
  5. 通过 Kibana 过滤日志数据:在 Kibana 控制台的 Discover 页面,通过 Filter 过滤出 Kafka 相关的日志。

四、准备工作

CenterOS 7.6 版本,推荐 8G 以上内存。

1、Docker 环境

执行命令如下:

# 在 docker 节点执行
# 腾讯云 docker hub 镜像
# export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
# DaoCloud 镜像
# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
# 阿里云 docker hub 镜像
export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com

# 安装 docker
# 参考文档如下
# https://docs.docker.com/install/linux/docker-ce/centos/ 
# https://docs.docker.com/install/linux/linux-postinstall/

# 卸载旧版本
yum remove -y docker \
docker-client \
docker-client-latest \
docker-ce-cli \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine

# 设置 yum repository
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

# 安装并启动 docker
yum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13

mkdir /etc/docker || true

cat > /etc/docker/daemon.json <<EOF
{
  "registry-mirrors": ["${REGISTRY_MIRROR}"],
  "exec-opts": ["native.cgroupdriver=systemd"],
  "log-driver": "json-file",
  "log-opts": {
    "max-size": "100m"
  },
  "storage-driver": "overlay2",
  "storage-opts": [
    "overlay2.override_kernel_check=true"
  ]
}
EOF

mkdir -p /etc/systemd/system/docker.service.d

# Restart Docker
systemctl daemon-reload
systemctl enable docker
systemctl restart docker

# 关闭 防火墙
systemctl stop firewalld
systemctl disable firewalld

# 关闭 SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

# 关闭 swap
swapoff -a
yes | cp /etc/fstab /etc/fstab_bak
cat /etc/fstab_bak |grep -v swap > /etc/fstab
AI 代码解读

验证下 docker info:

[root@vm-1]# docker info
Client:
 Debug Mode: false

Server:
 Containers: 16
  Running: 11
  Paused: 0
  Stopped: 5
 Images: 22
 Server Version: 19.03.11
 Storage Driver: overlay2
  Backing Filesystem: xfs
  Supports d_type: true
  Native Overlay Diff: true
 Logging Driver: json-file
 Cgroup Driver: systemd
 Plugins:
  Volume: local
  Network: bridge host ipvlan macvlan null overlay
  Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
 Swarm: inactive
 Runtimes: runc
 Default Runtime: runc
 Init Binary: docker-init
 containerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc version: dc9208a3303feef5b3839f4323d9beb36df0a9dd
 init version: fec3683
 Security Options:
  seccomp
   Profile: default
 Kernel Version: 3.10.0-1127.el7.x86_64
 Operating System: CentOS Linux 7 (Core)
 OSType: linux
 Architecture: x86_64
 CPUs: 4
 Total Memory: 11.58GiB
 Name: vm-autotest-server
 ID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLON
 Docker Root Dir: /var/lib/docker
 Debug Mode: false
 Registry: https://index.docker.io/v1/
 Labels:
 Experimental: false
 Insecure Registries:
  172.16.62.179:5000
  127.0.0.0/8
 Registry Mirrors:
  https://registry.cn-hangzhou.aliyuncs.com/
 Live Restore Enabled: false
AI 代码解读

2、Docker Compose 环境

Docker Compose是一个用于定义和运行多个 docker 容器应用的工具。使用 Compose 你可以用 YAML 文件来配置你的应用服务,然后使用一个命令,你就可以部署你配置的所有服务了。

 # 下载 Docker Compose
 sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

 # 修改该文件的权限为可执行
 chmod +x /usr/local/bin/docker-compose

 # 验证信息
docker-compose --version
AI 代码解读

3、版本准备

组件 版本 部署方式
elasticsearch 7.6.2 Docker Compose
logstash 7.6.2 Docker Compose
kibana 7.6.2 Docker Compose
zookeeper latest Docker Compose
kafka latest Docker Compose
filebeat 7.4.2 二进制

4、环境初始化

执行命令如下:

# 需要设置系统内核参数,否则 ES 会因为内存不足无法启动
# 改变设置
sysctl -w vm.max_map_count=262144
# 使之立即生效
sysctl -p



# 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录
mkdir -p /mydata/logstash

# 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败
mkdir -p /mydata/elasticsearch/data/
chmod 777 /mydata/elasticsearch/data/
AI 代码解读

5、服务安装

docker-compose.yml 文件内容为:

version: '3'
services:
  elasticsearch:
    image: elasticsearch:7.6.2
    container_name: elasticsearch
    user: root
    environment:
      - "cluster.name=elasticsearch" #设置集群名称为elasticsearch
      - "discovery.type=single-node" #以单一节点模式启动
    volumes:
      - /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
      - /mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    ports:
      - 9200:9200
      - 9300:9300
    networks:
      - elastic

  logstash:
    image: logstash:7.6.2
    container_name: logstash
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件
    depends_on:
      - elasticsearch #kibana在elasticsearch启动之后再启动
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    ports:
      - 5044:5044
    networks:
      - elastic

  kibana:
    image: kibana:7.6.2
    container_name: kibana
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    depends_on:
      - elasticsearch #kibana在elasticsearch启动之后再启动
    environment:
      - "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    ports:
      - 5601:5601
    networks:
      - elastic

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    volumes:
      - /mydata/zookeeper/data:/data
      - /mydata/zookeeper/log:/datalog
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    networks:
      - elastic
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /mydata/kafka:/kafka
      - /etc/localtime:/etc/localtime:ro
    links:
      - zookeeper
    ports:
      - "9092:9092"
    networks:
      - elastic
    environment:
      - KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=OUT
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_MESSAGE_MAX_BYTES=2000000
      - KAFKA_CREATE_TOPICS=logs:1:1

networks:
  elastic:
AI 代码解读

将该文件上传的 linux 服务器上,执行 docker-compose up 命令即可启动所有服务。

[root@vm-1]# docker-compose -f docker-compose.yml up -d
[root@vm-1]# docker-compose ps
    Name                   Command               State                         Ports                       
-----------------------------------------------------------------------------------------------------------
elasticsearch   /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp    
kafka           start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp                            
kibana          /usr/local/bin/dumb-init - ...   Up      0.0.0.0:5601->5601/tcp                            
logstash        /usr/local/bin/docker-entr ...   Up      0.0.0.0:5044->5044/tcp, 9600/tcp                  
zookeeper       /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
[root@vm-autotest-server elk]#
AI 代码解读

filebeat 客户端安装方式:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz

tar xzvf filebeat-7.4.2-linux-x86_64.tar.gz
cd filebeat-7.4.2-linux-x86_64
AI 代码解读

6、服务设置

当所有依赖服务启动完成后,需要对以下服务进行一些设置。

# elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。
docker exec -it elasticsearch /bin/bash
#此命令需要在容器中运行
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
docker restart elasticsearch

# logstas h需要安装 json_lines 插件,并重新启动。
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
docker restart logstash
AI 代码解读

五、配置 Filebeat

修改 filebeat.yml 文件内容

ilebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/*.log


filebeat.config.modules:
  path: ${
   
   path.config}/modules.d/*.yml
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

setup.dashboards.enabled: false

setup.kibana:
  host: "http://kafka:5601"
output.kafka:
    hosts: ["kafka:9092"]
    topic: 'logs'
    codec.json:
      pretty: false
AI 代码解读

参数说明:

参数 说明
type 输入类型。设置为log,表示输入源为日志。
enabled 设置配置是否生效。true表示生效,false表示不生效。
paths 需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。
hosts 消息队列Kafka实例的接入点。
topic 日志输出到消息队列Kafka的Topic,请指定为已创建的Topic。

注意:
客户端 hosts 添加 kafka 对应 server 的 ip 地址 以及 filebeat 配置建议使用 ansible。

[root@vm-1# cat /etc/hosts
172.16.62.179 kafka

# 客户端启动服务
[root@vm-1#./filebeat &
AI 代码解读

更多配置请参见:

六、配置 Logstash 管道

修改 logstash.conf 内容:

input {
   
   
#    # 来源beats
#    beats {
   
   
        # 端口
#        port => "5044"
#    }
  kafka {
   
   
    bootstrap_servers => "kafka:29092"
    topics => ["logs"]
    group_id => "logstash"
    codec => json
  }

}


# 分析、过滤插件,可以多个
# filter {
   
   
#    grok {
   
   
#        match => { "message" => "%{COMBINEDAPACHELOG}"}
#    }
#    geoip {
   
   
#        source => "clientip"
#    }
# }


output {
   
   
    # 选择elasticsearch
    elasticsearch {
   
   
        hosts => ["http://es:9200"]
        #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        index => "logs-%{+YYYY.MM.dd}"
    }
}
AI 代码解读

input 参数说明:

参数 说明
bootstrap_servers 消息队列 Kafka 实例的接入点
group_id 指定已创建的 Consumer Group 的名称。
topics 指定为已创建的 Topic 的名称,需要与 Filebeat 中配置的 Topic 名称保持一致。
codec 设置为 json,表示解析 JSON 格式的字段,便于在 Kibana 中分析。

output 参数说明:

参数 说明
hosts ES的访问地址,取值为http://:9200
user 访问 ES 的用户名,默认为 elastic。
password 访问 ES 的密码。
index 索引名称。设置为 logs‐%{+YYYY.MM.dd} 表示索引名称以 logs 为前缀,以日期为后缀,例如 logs-2021.09.28

注意:
logstash 中最为关键的地方在于 filter,为了调试 filter 的配置。

更多配置请参见:

七、查看 kafka 日志消费状态

操作命令如下:

# 进入容器
docker exec -it kafka bash

# kafka 默认安装在 /opt/kafka
cd opt/kafka

# 要想查询消费数据,必须要指定组
bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
logstash

# 查看 topic
bash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181
__consumer_offsets
logs

# 查看消费情况
bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstash

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
logstash        logs            0          107335          107335          0               logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2     logstash-0

#参数解释:
#--describe  显示详细信息
#--bootstrap-server 指定kafka连接地址
#--group 指定组。
AI 代码解读

字段解释:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic名字 分区id 当前已消费的条数 总条数 未消费的条数 消费id 主机ip 客户端id

从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.

八、查看 ES 内容

通过 elasticsearch-head 插件查看 ES 中是否收到了由 logstash 发送过来的日志

在这里插入图片描述

九、通过 Kibana 过滤日志数据

1、创建 index-pattern

打开 es,进入首页后,点击“connect to your Elasticsearch index”
image.png

填入 es 中的索引名,支持正则匹配,输入 Index pattern(本文使用 logs-*),单击 Next step。
image.png

选择“@timestamp”作为时间过滤字段,然后点击“create index pattern”:
image.png

创建完成后:
image.png

2、查看日志

在左侧导航栏,单击 Discover。
image.png

从页面左侧的下拉列表中,选择已创建的索引模式(logs-*)。
在页面右上角,选择一段时间,查看对应时间段内的 Filebeat 采集的日志数据。

十、小结

在企业实际项目中,elk 是比较成熟且广泛使用的技术方案。logstash 性能稍弱于 filebeat,一般不直接运行于采集点,推荐使用filebeat。在日志进入elk前,从经验性角度,前置 kafka,一方面作为队列和缓冲,另一方面提供了统一的入口渠道。

源码地址:

目录
打赏
0
4
3
2
113
分享
相关文章
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
390 5
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
250 4
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
基于阿里云SelectDB,MiniMax构建了覆盖国内及海外业务的日志可观测中台,总体数据规模超过数PB,日均新增日志写入量达数百TB。系统在P95分位查询场景下的响应时间小于3秒,峰值时刻实现了超过10GB/s的读写吞吐。通过存算分离、高压缩比算法和单副本热缓存等技术手段,MiniMax在优化性能的同时显著降低了建设成本,计算资源用量降低40%,热数据存储用量降低50%,为未来业务的高速发展和技术演进奠定了坚实基础。
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
海量日志接入 Elasticsearch Serverless 应用降本70%以上
本文将探讨在日志场景下,使用阿里云Elasticsearch Serverless相较于基于ECS自建Elasticsearch集群的成本与性能优势,展示如何通过Serverless架构实现高达 70%以上的成本节约。
368 0
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
189 1
开源日志分析Elasticsearch
【10月更文挑战第22天】
139 5
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问