Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

一、前言

随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过 Filebeat+Kafka+Logstash+Elasticsearch 采集日志数据到 Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。

本文介绍具体的实现方法。

二、背景信息

Kafka 是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。在实际应用场景中,为了满足大数据实时检索的需求,一般可以使用 Filebeat 采集日志数据,将 Kafka 作为 Filebeat 的输出端。Kafka 实时接收到 Filebeat 采集的数据后,以 Logstash 作为输出端输出。输出到 Logstash 中的数据在格式或内容上可能不能满足你的需求,此时可以通过 Logstash 的 filter 插件过滤数据。最后将满足需求的数据输出到 ES 中进行分布式检索,并通过 Kibana 进行数据分析与展示。

简单处理流程如下:
image.png

三、操作流程

  1. 准备工作
    • 完成环境准备
    • 包括创建对应服务
    • 安装 Filebeat 。
  2. 配置 Filebeat:配置 Filebeat 的 input 为系统日志,outpu 为 Kafka,将日志数据采集到 Kafka 的指定 Topic 中。
  3. 配置 Logstash 管道:配置 Logstash 管道的 input 为 Kafka,output 为ES,使用 Logstash 消费 Topic 中的数据并传输到ES 中。
  4. 查看日志消费状态:在消息队列 Kafka 中查看日志数据的消费的状态,验证日志数据是否采集成功。
  5. 通过 Kibana 过滤日志数据:在 Kibana 控制台的 Discover 页面,通过 Filter 过滤出 Kafka 相关的日志。

四、准备工作

CenterOS 7.6 版本,推荐 8G 以上内存。

1、Docker 环境

执行命令如下:

# 在 docker 节点执行
# 腾讯云 docker hub 镜像
# export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
# DaoCloud 镜像
# export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
# 阿里云 docker hub 镜像
export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com

# 安装 docker
# 参考文档如下
# https://docs.docker.com/install/linux/docker-ce/centos/ 
# https://docs.docker.com/install/linux/linux-postinstall/

# 卸载旧版本
yum remove -y docker \
docker-client \
docker-client-latest \
docker-ce-cli \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine

# 设置 yum repository
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

# 安装并启动 docker
yum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13

mkdir /etc/docker || true

cat > /etc/docker/daemon.json <<EOF
{
  "registry-mirrors": ["${REGISTRY_MIRROR}"],
  "exec-opts": ["native.cgroupdriver=systemd"],
  "log-driver": "json-file",
  "log-opts": {
    "max-size": "100m"
  },
  "storage-driver": "overlay2",
  "storage-opts": [
    "overlay2.override_kernel_check=true"
  ]
}
EOF

mkdir -p /etc/systemd/system/docker.service.d

# Restart Docker
systemctl daemon-reload
systemctl enable docker
systemctl restart docker

# 关闭 防火墙
systemctl stop firewalld
systemctl disable firewalld

# 关闭 SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

# 关闭 swap
swapoff -a
yes | cp /etc/fstab /etc/fstab_bak
cat /etc/fstab_bak |grep -v swap > /etc/fstab

验证下 docker info:

[root@vm-1]# docker info
Client:
 Debug Mode: false

Server:
 Containers: 16
  Running: 11
  Paused: 0
  Stopped: 5
 Images: 22
 Server Version: 19.03.11
 Storage Driver: overlay2
  Backing Filesystem: xfs
  Supports d_type: true
  Native Overlay Diff: true
 Logging Driver: json-file
 Cgroup Driver: systemd
 Plugins:
  Volume: local
  Network: bridge host ipvlan macvlan null overlay
  Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
 Swarm: inactive
 Runtimes: runc
 Default Runtime: runc
 Init Binary: docker-init
 containerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc version: dc9208a3303feef5b3839f4323d9beb36df0a9dd
 init version: fec3683
 Security Options:
  seccomp
   Profile: default
 Kernel Version: 3.10.0-1127.el7.x86_64
 Operating System: CentOS Linux 7 (Core)
 OSType: linux
 Architecture: x86_64
 CPUs: 4
 Total Memory: 11.58GiB
 Name: vm-autotest-server
 ID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLON
 Docker Root Dir: /var/lib/docker
 Debug Mode: false
 Registry: https://index.docker.io/v1/
 Labels:
 Experimental: false
 Insecure Registries:
  172.16.62.179:5000
  127.0.0.0/8
 Registry Mirrors:
  https://registry.cn-hangzhou.aliyuncs.com/
 Live Restore Enabled: false

2、Docker Compose 环境

Docker Compose是一个用于定义和运行多个 docker 容器应用的工具。使用 Compose 你可以用 YAML 文件来配置你的应用服务,然后使用一个命令,你就可以部署你配置的所有服务了。

 # 下载 Docker Compose
 sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

 # 修改该文件的权限为可执行
 chmod +x /usr/local/bin/docker-compose

 # 验证信息
docker-compose --version

3、版本准备

组件 版本 部署方式
elasticsearch 7.6.2 Docker Compose
logstash 7.6.2 Docker Compose
kibana 7.6.2 Docker Compose
zookeeper latest Docker Compose
kafka latest Docker Compose
filebeat 7.4.2 二进制

4、环境初始化

执行命令如下:

# 需要设置系统内核参数,否则 ES 会因为内存不足无法启动
# 改变设置
sysctl -w vm.max_map_count=262144
# 使之立即生效
sysctl -p



# 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录
mkdir -p /mydata/logstash

# 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败
mkdir -p /mydata/elasticsearch/data/
chmod 777 /mydata/elasticsearch/data/

5、服务安装

docker-compose.yml 文件内容为:

version: '3'
services:
  elasticsearch:
    image: elasticsearch:7.6.2
    container_name: elasticsearch
    user: root
    environment:
      - "cluster.name=elasticsearch" #设置集群名称为elasticsearch
      - "discovery.type=single-node" #以单一节点模式启动
    volumes:
      - /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
      - /mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    ports:
      - 9200:9200
      - 9300:9300
    networks:
      - elastic

  logstash:
    image: logstash:7.6.2
    container_name: logstash
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件
    depends_on:
      - elasticsearch #kibana在elasticsearch启动之后再启动
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    ports:
      - 5044:5044
    networks:
      - elastic

  kibana:
    image: kibana:7.6.2
    container_name: kibana
    links:
      - elasticsearch:es #可以用es这个域名访问elasticsearch服务
    depends_on:
      - elasticsearch #kibana在elasticsearch启动之后再启动
    environment:
      - "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    ports:
      - 5601:5601
    networks:
      - elastic

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    volumes:
      - /mydata/zookeeper/data:/data
      - /mydata/zookeeper/log:/datalog
      - /etc/localtime:/etc/localtime:ro
      - /usr/share/zoneinfo:/usr/share/zoneinfo
    networks:
      - elastic
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - /mydata/kafka:/kafka
      - /etc/localtime:/etc/localtime:ro
    links:
      - zookeeper
    ports:
      - "9092:9092"
    networks:
      - elastic
    environment:
      - KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=OUT
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_MESSAGE_MAX_BYTES=2000000
      - KAFKA_CREATE_TOPICS=logs:1:1

networks:
  elastic:

将该文件上传的 linux 服务器上,执行 docker-compose up 命令即可启动所有服务。

[root@vm-1]# docker-compose -f docker-compose.yml up -d
[root@vm-1]# docker-compose ps
    Name                   Command               State                         Ports                       
-----------------------------------------------------------------------------------------------------------
elasticsearch   /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp    
kafka           start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp                            
kibana          /usr/local/bin/dumb-init - ...   Up      0.0.0.0:5601->5601/tcp                            
logstash        /usr/local/bin/docker-entr ...   Up      0.0.0.0:5044->5044/tcp, 9600/tcp                  
zookeeper       /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
[root@vm-autotest-server elk]#

filebeat 客户端安装方式:

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz

tar xzvf filebeat-7.4.2-linux-x86_64.tar.gz
cd filebeat-7.4.2-linux-x86_64

6、服务设置

当所有依赖服务启动完成后,需要对以下服务进行一些设置。

# elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。
docker exec -it elasticsearch /bin/bash
#此命令需要在容器中运行
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
docker restart elasticsearch

# logstas h需要安装 json_lines 插件,并重新启动。
docker exec -it logstash /bin/bash
logstash-plugin install logstash-codec-json_lines
docker restart logstash

五、配置 Filebeat

修改 filebeat.yml 文件内容

ilebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/*.log


filebeat.config.modules:
  path: ${
   
   path.config}/modules.d/*.yml
  reload.enabled: false

setup.template.settings:
  index.number_of_shards: 1

setup.dashboards.enabled: false

setup.kibana:
  host: "http://kafka:5601"
output.kafka:
    hosts: ["kafka:9092"]
    topic: 'logs'
    codec.json:
      pretty: false

参数说明:

参数 说明
type 输入类型。设置为log,表示输入源为日志。
enabled 设置配置是否生效。true表示生效,false表示不生效。
paths 需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。
hosts 消息队列Kafka实例的接入点。
topic 日志输出到消息队列Kafka的Topic,请指定为已创建的Topic。

注意:
客户端 hosts 添加 kafka 对应 server 的 ip 地址 以及 filebeat 配置建议使用 ansible。

[root@vm-1# cat /etc/hosts
172.16.62.179 kafka

# 客户端启动服务
[root@vm-1#./filebeat &

更多配置请参见:

六、配置 Logstash 管道

修改 logstash.conf 内容:

input {
   
   
#    # 来源beats
#    beats {
   
   
        # 端口
#        port => "5044"
#    }
  kafka {
   
   
    bootstrap_servers => "kafka:29092"
    topics => ["logs"]
    group_id => "logstash"
    codec => json
  }

}


# 分析、过滤插件,可以多个
# filter {
   
   
#    grok {
   
   
#        match => { "message" => "%{COMBINEDAPACHELOG}"}
#    }
#    geoip {
   
   
#        source => "clientip"
#    }
# }


output {
   
   
    # 选择elasticsearch
    elasticsearch {
   
   
        hosts => ["http://es:9200"]
        #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        index => "logs-%{+YYYY.MM.dd}"
    }
}

input 参数说明:

参数 说明
bootstrap_servers 消息队列 Kafka 实例的接入点
group_id 指定已创建的 Consumer Group 的名称。
topics 指定为已创建的 Topic 的名称,需要与 Filebeat 中配置的 Topic 名称保持一致。
codec 设置为 json,表示解析 JSON 格式的字段,便于在 Kibana 中分析。

output 参数说明:

参数 说明
hosts ES的访问地址,取值为http://:9200
user 访问 ES 的用户名,默认为 elastic。
password 访问 ES 的密码。
index 索引名称。设置为 logs‐%{+YYYY.MM.dd} 表示索引名称以 logs 为前缀,以日期为后缀,例如 logs-2021.09.28

注意:
logstash 中最为关键的地方在于 filter,为了调试 filter 的配置。

更多配置请参见:

七、查看 kafka 日志消费状态

操作命令如下:

# 进入容器
docker exec -it kafka bash

# kafka 默认安装在 /opt/kafka
cd opt/kafka

# 要想查询消费数据,必须要指定组
bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
logstash

# 查看 topic
bash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181
__consumer_offsets
logs

# 查看消费情况
bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstash

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
logstash        logs            0          107335          107335          0               logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2     logstash-0

#参数解释:
#--describe  显示详细信息
#--bootstrap-server 指定kafka连接地址
#--group 指定组。

字段解释:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic名字 分区id 当前已消费的条数 总条数 未消费的条数 消费id 主机ip 客户端id

从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.

八、查看 ES 内容

通过 elasticsearch-head 插件查看 ES 中是否收到了由 logstash 发送过来的日志

在这里插入图片描述

九、通过 Kibana 过滤日志数据

1、创建 index-pattern

打开 es,进入首页后,点击“connect to your Elasticsearch index”
image.png

填入 es 中的索引名,支持正则匹配,输入 Index pattern(本文使用 logs-*),单击 Next step。
image.png

选择“@timestamp”作为时间过滤字段,然后点击“create index pattern”:
image.png

创建完成后:
image.png

2、查看日志

在左侧导航栏,单击 Discover。
image.png

从页面左侧的下拉列表中,选择已创建的索引模式(logs-*)。
在页面右上角,选择一段时间,查看对应时间段内的 Filebeat 采集的日志数据。

十、小结

在企业实际项目中,elk 是比较成熟且广泛使用的技术方案。logstash 性能稍弱于 filebeat,一般不直接运行于采集点,推荐使用filebeat。在日志进入elk前,从经验性角度,前置 kafka,一方面作为队列和缓冲,另一方面提供了统一的入口渠道。

源码地址:

目录
相关文章
|
11天前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
4天前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
17 11
|
23小时前
|
JSON 自然语言处理 数据库
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
概念、ik分词器、倒排索引、索引和文档的增删改查、RestClient对索引和文档的增删改查
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
|
26天前
|
数据可视化 Docker 容器
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
这篇文章提供了通过Docker安装Elasticsearch和Kibana的详细过程和图解,包括下载镜像、创建和启动容器、处理可能遇到的启动失败情况(如权限不足和配置文件错误)、测试Elasticsearch和Kibana的连接,以及解决空间不足的问题。文章还特别指出了配置文件中空格的重要性以及环境变量中字母大小写的问题。
一文教会你如何通过Docker安装elasticsearch和kibana 【详细过程+图解】
|
20天前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
30天前
|
自然语言处理 Docker 容器
ElasticSearch 实现分词全文检索 - ES、Kibana、IK分词器安装
ElasticSearch 实现分词全文检索 - ES、Kibana、IK分词器安装
19 0
|
21天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
57 9
|
1月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
50 3
|
16天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
16天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。