ELK+Kafka搭建分布式日志收集系统

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
日志服务 SLS,月写入数据量 50GB 1个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: ELK+Kafka搭建分布式日志收集系统

正文


一、传统日志收集的弊端


       我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。


二、ELK收集系统过程


       基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。


     

333.png


在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。


三、搭建ELK系统


Zookeeper搭建


Kafka搭建


ElasticSearch搭建


Kibana搭建


Logstash搭建


本文演示基于Docker-compose,所有的均为单机


1、搭建docker-compose


#下载docker-compose文件
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
#授权
sudo chmod +x /usr/local/bin/docker-compose


2、创建目录


mkdir -p /usr/local/docker-compose/elk


3、在上面目录创建docker-compose.yml文件  


version: '2'
services:
  zookeeper:
    image: zookeeper:latest
    container_name: zookeper
    ports:
      - "2181:2181"                 
  kafka:
    image: wurstmeister/kafka:latest 
    container_name: kafka
    volumes: 
        - /etc/localtime:/etc/localtime 
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160   
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000                
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
    restart: always
    container_name: elasticsearch
    environment:
     - discovery.type=single-node #单点启动,实际生产不允许
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
    - 9200:9200
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.2
    restart: always
    container_name: kibana
    ports:
    - 5601:5601
    environment:
      - elasticsearch_url=http://192.168.139.160:9200
    depends_on:
      - elasticsearch
  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.2
    volumes:
        -  /data/logstash/pipeline/:/usr/share/logstash/pipeline/
        -  /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml 
        -  /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml 
    restart: always
    container_name: logstash
    ports:
    - 9600:9600
    depends_on:
      - elasticsearch


4、启动.


1. #进入docker-compose所在的目录执行
2. [root@localhost elk]# docker-compose  up


四、代码


切面类


package com.xiaojie.elk.aop;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.elk.pojo.RequestPojo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 日志切面类
 * @date 2021/12/5 16:51
 */
@Aspect
@Component
public class AopLogAspect {
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 申明一个切点 里面是 execution表达式
    @Pointcut("execution(* com.xiaojie.elk.service.*.*(..))")
    private void serviceAspect() {
    }
    @Autowired
    private LogContainer logContainer;
    // 请求method前打印内容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        RequestPojo requestPojo = new RequestPojo();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    // 在方法执行完结后打印返回内容
/*    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        respJSONObject.put("response", jsonObject);
        logContainer.put(respJSONObject.toJSONString());
    }*/
    /**
     * 异常通知
     *
     * @param point
     */
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint joinPoint, Exception e) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        RequestPojo requestPojo = new RequestPojo();
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        requestPojo.setError(e.toString());
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    public static String getIpAddr(HttpServletRequest request) {
        //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
        String ipAddress = request.getHeader("x-forwarded-for");
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getRemoteAddr();
            if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
                //根据网卡取本机配置的IP
                InetAddress inet = null;
                try {
                    inet = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                ipAddress = inet.getHostAddress();
            }
        }
        //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
        if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
            if (ipAddress.indexOf(",") > 0) {
                ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
            }
        }
        return ipAddress;
    }
}


异步线程


package com.xiaojie.elk.aop;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 开启异步线程发送日志
 * @date 2021/12/5 16:50
 */
@Component
public class LogContainer {
    private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public LogContainer() {
        // 初始化
        new LogThreadKafka().start();
    }
    /**
     * 存入日志
     *
     * @param log
     */
    public void put(String log) {
        logDeque.offer(log);
    }
    class LogThreadKafka extends Thread {
        @Override
        public void run() {
            while (true) {
                String log = logDeque.poll();
                if (!StringUtils.isEmpty(log)) {
                    // 将消息投递kafka中
                    kafkaTemplate.send("kafka-log", log);
                }
            }
        }
    }
}


五、验证效果

111.png


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块

相关文章
|
1月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
86 7
|
19天前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
|
1月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
62 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
1月前
|
存储 Linux Docker
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
54 2
|
2月前
|
XML JSON 监控
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
61 1
|
2月前
|
监控 网络协议 安全
Linux系统日志管理
Linux系统日志管理
60 3
|
2月前
|
监控 应用服务中间件 网络安全
#637481#基于django和neo4j的日志分析系统
#637481#基于django和neo4j的日志分析系统
39 4
|
2月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
47 4
|
2月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
51 1
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
下一篇
DataWorks