搭建ELK日志收集,保姆级教程

简介: 本文介绍了分布式日志采集的背景及ELK与Kafka的整合应用。传统多服务器环境下,日志查询效率低下,因此需要集中化日志管理。ELK(Elasticsearch、Logstash、Kibana)应运而生,但单独使用ELK在性能上存在瓶颈,故结合Kafka实现高效的日志采集与处理。文章还详细讲解了基于Docker Compose构建ELK+Kafka环境的方法、验证步骤,以及如何在Spring Boot项目中整合ELK+Kafka,并通过Logback配置实现日志的采集与展示。

分布式日志采集产生背景

在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。

因此我们需要集中化的管理日志,ELK则应运而生。

为什么ELK需要结合Kafka

如果只整合elk 不结合kafka这样的话 每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。

ELK+Kafka环境构建

基于docker-compose构建ELK+Kafka环境

注:所有的ip地址不能是localhost或者127.0.0.1

version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.0
    container_name: kibana
    depends_on:
      - elasticsearch
    ports:
      - "5601:5601"
  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.0
    container_name: logstash
#    command: --config /etc/logstash/conf.d/*.conf
    volumes:
      - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    ports:
      - "5044:5044"      
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.1:9092
      - KAFKA_LISTENERS=PLAINTEXT://:9092
  kafka-manager:  
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS: 192.168.137.1:2181                   ## 修改:宿主机IP
    ports:  
      - "9001:9000"

验证elk+kafka 环境

访问:zk http://127.0.0.1:2181
访问:es http://127.0.0.1:9200/
访问:kibana http://127.0.0.1:5601/app/kibana#/dev_tools/console?_g=()

SpringBoot项目整合ELK+Kafka

添加maven依赖

<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--logstash 整合logback-->
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>7.4</version>
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--logback 整合 kafka-->
        <dependency>
            <groupId>com.github.danielwegener</groupId>
            <artifactId>logback-kafka-appender</artifactId>
            <version>0.2.0-RC2</version>
            <scope>runtime</scope>
        </dependency>

配置文件配置

spring:
  kafka:
    listener:
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: single
    # 集群地址 不能是localhost/127.0.0.1
    bootstrap-servers: 192.168.137.1:9092
    # 生产者配置
    producer:
      # 重试次数
      retries: 3
      # 应答级别
      # acks=0 把消息发送到kafka就认为发送成功
      # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
      # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
      acks: all
      # 批量处理的最大大小 单位 byte
      batch-size: 4096
      # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      buffer-memory: 33554432
      # 客户端ID
      client-id: logstash
      # Key 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      compression-type: gzip
      properties:
        linger:
          # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          ms: 1000
        max:
          block:
            # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
            ms: 6000
    # 消费者配置
    consumer:
      # 默认消费者组
      group-id: logstash
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
      properties:
        session:
          timeout:
            # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
            ms: 120000
        request:
          timeout:
            # 请求超时
            ms: 120000
    template:
      default-topic: tiger-log
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
logging:
  config: classpath:logback-spring.xml

配置logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
  <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
  <!-- 这里把整个测试环境的 模块复制过来了,里面包含 输入到 kafka 的配置-->
  <springProfile name="dev">
    <!-- 日志输出格式 -->
    <property name="console.log.pattern"
      value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%green(%thread)] %cyan(%-40.40(%logger{40})) : %msg%n"/>
    <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/>
    <property name="log_name_prefix" value="tiger-ucenter"/>
    <property name="log.path" value="logs/tiger-ucenter"/>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
        <pattern>${console.log.pattern}</pattern>
      </encoder>
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>INFO</level>
      </filter>
    </appender>
    <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
      <File>${log.path}/${log_name_prefix}-info.log</File>
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
        <pattern>${log.pattern}</pattern>
      </encoder>
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>INFO</level>
      </filter>
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <FileNamePattern>${log.path}/backup/${log_name_prefix}-info.%d{yyyy-MM-dd}.log
        </FileNamePattern>
      </rollingPolicy>
    </appender>
    <appender name="file_error"
      class="ch.qos.logback.core.rolling.RollingFileAppender">
      <File>${log.path}/${log_name_prefix}-error.log</File>
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
        <pattern>${log.pattern}</pattern>
      </encoder>
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
        <level>ERROR</level>
      </filter>
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <FileNamePattern>${log.path}/backup/${log_name_prefix}-error.%d{yyyy-MM-dd}.log
        </FileNamePattern>
      </rollingPolicy>
    </appender>
    <!--        kafka的appender配置-->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
      <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
        <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
          <pattern>
            <pattern>
              {"app":"${APP}",
              "profile":"${PROFILES_ACTIVE}",
              "thread": "%thread",
              "logger": "%logger{5}",
              "message":"%msg",
              "app_name":"${APP_NAME}",
              "env_name":"${ENV_NAME}",
              "hostname":"${HOSTNAME}",
              "captain_seq":"${CAPTAIN_SEQ}",
              "captain_gen":"${CAPTAIN_GEN}",
              "build_name":"${BUILD_NAME}",
              "build_git_version":"${BUILD_GIT_VERSION}",
              "build_git_hash":"${BUILD_GIT_HASH}",
              "build_timestamp":"${BUILD_TIMESTAMP}",
              "date":"%d{yyyy-MM-dd HH:mm:ss.SSS}",
              "level":"%level",
              "stack_trace":"%exception"
              }
            </pattern>
                    </pattern>
                </providers>
            </encoder>
            <topic>tiger-log</topic>
            <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
            <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
            <producerConfig>bootstrap.servers=192.168.137.1:9092</producerConfig>
                 
            <producerConfig>retries=1</producerConfig>
                 
            <producerConfig>batch-size=16384</producerConfig>
                 
            <producerConfig>buffer-memory=33554432</producerConfig>
                 
            <producerConfig>properties.max.request.size==2097152</producerConfig>
            <appender-ref ref="console"/>
        </appender>
        <root level="INFO">
            <appender-ref ref="console"/>
            <appender-ref ref="file_error"/>
            <appender-ref ref="file_info"/>
            <appender-ref ref="kafkaAppender"/>
        </root>
    </springProfile>
</configuration>

打开kibana查看收集的日志

相关文章
|
14天前
|
缓存 Java 应用服务中间件
Spring Boot配置优化:Tomcat+数据库+缓存+日志,全场景教程
本文详解Spring Boot十大核心配置优化技巧,涵盖Tomcat连接池、数据库连接池、Jackson时区、日志管理、缓存策略、异步线程池等关键配置,结合代码示例与通俗解释,助你轻松掌握高并发场景下的性能调优方法,适用于实际项目落地。
203 4
|
3月前
|
Ubuntu 应用服务中间件 网络安全
ELK完整部署教程
本文介绍了在Ubuntu 22.04上部署ELK(Elasticsearch、Logstash、Kibana)及Filebeat的完整步骤。内容涵盖Elasticsearch安装与安全配置、Kibana启用SSL与注册、Logstash配置输入输出,以及Filebeat采集日志并发送至Logstash或Elasticsearch的方法。适用于本地非容器环境的日志收集与可视化搭建。
|
7月前
|
数据可视化 关系型数据库 MySQL
ELK实现nginx、mysql、http的日志可视化实验
通过本文的步骤,你可以成功配置ELK(Elasticsearch, Logstash, Kibana)来实现nginx、mysql和http日志的可视化。通过Kibana,你可以直观地查看和分析日志数据,从而更好地监控和管理系统。希望这些步骤能帮助你在实际项目中有效地利用ELK来处理日志数据。
511 90
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
11月前
|
存储 监控 安全
|
12月前
|
数据采集 监控 Java
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
本文是关于SpringBoot日志的详细教程,涵盖日志的定义、用途、SLF4J框架的使用、日志级别、持久化、文件分割及格式配置等内容。
739 0
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
|
存储 消息中间件 监控
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统ELK、日志收集分析
Java日志详解:日志级别,优先级、配置文件、常见日志管理系统、日志收集分析。日志级别从小到大的关系(优先级从低到高): ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 低级别的会输出高级别的信息,高级别的不会输出低级别的信息
|
5月前
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
622 54
|
10月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
259 9