ELK+Kafka搭建分布式日志收集系统

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
日志服务 SLS,月写入数据量 50GB 1个月
简介: ELK+Kafka搭建分布式日志收集系统

正文


一、传统日志收集的弊端


       我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。


二、ELK收集系统过程


       基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。


     

333.png


在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。


三、搭建ELK系统


Zookeeper搭建


Kafka搭建


ElasticSearch搭建


Kibana搭建


Logstash搭建


本文演示基于Docker-compose,所有的均为单机


1、搭建docker-compose


#下载docker-compose文件
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
#授权
sudo chmod +x /usr/local/bin/docker-compose


2、创建目录


mkdir -p /usr/local/docker-compose/elk


3、在上面目录创建docker-compose.yml文件  


version: '2'
services:
  zookeeper:
    image: zookeeper:latest
    container_name: zookeper
    ports:
      - "2181:2181"                 
  kafka:
    image: wurstmeister/kafka:latest 
    container_name: kafka
    volumes: 
        - /etc/localtime:/etc/localtime 
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160   
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000                
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
    restart: always
    container_name: elasticsearch
    environment:
     - discovery.type=single-node #单点启动,实际生产不允许
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
    - 9200:9200
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.2
    restart: always
    container_name: kibana
    ports:
    - 5601:5601
    environment:
      - elasticsearch_url=http://192.168.139.160:9200
    depends_on:
      - elasticsearch
  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.2
    volumes:
        -  /data/logstash/pipeline/:/usr/share/logstash/pipeline/
        -  /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml 
        -  /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml 
    restart: always
    container_name: logstash
    ports:
    - 9600:9600
    depends_on:
      - elasticsearch


4、启动.


1. #进入docker-compose所在的目录执行
2. [root@localhost elk]# docker-compose  up


四、代码


切面类


package com.xiaojie.elk.aop;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.elk.pojo.RequestPojo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 日志切面类
 * @date 2021/12/5 16:51
 */
@Aspect
@Component
public class AopLogAspect {
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 申明一个切点 里面是 execution表达式
    @Pointcut("execution(* com.xiaojie.elk.service.*.*(..))")
    private void serviceAspect() {
    }
    @Autowired
    private LogContainer logContainer;
    // 请求method前打印内容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        RequestPojo requestPojo = new RequestPojo();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    // 在方法执行完结后打印返回内容
/*    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        respJSONObject.put("response", jsonObject);
        logContainer.put(respJSONObject.toJSONString());
    }*/
    /**
     * 异常通知
     *
     * @param point
     */
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint joinPoint, Exception e) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        RequestPojo requestPojo = new RequestPojo();
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        requestPojo.setError(e.toString());
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    public static String getIpAddr(HttpServletRequest request) {
        //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
        String ipAddress = request.getHeader("x-forwarded-for");
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getRemoteAddr();
            if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
                //根据网卡取本机配置的IP
                InetAddress inet = null;
                try {
                    inet = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                ipAddress = inet.getHostAddress();
            }
        }
        //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
        if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
            if (ipAddress.indexOf(",") > 0) {
                ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
            }
        }
        return ipAddress;
    }
}


异步线程


package com.xiaojie.elk.aop;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 开启异步线程发送日志
 * @date 2021/12/5 16:50
 */
@Component
public class LogContainer {
    private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public LogContainer() {
        // 初始化
        new LogThreadKafka().start();
    }
    /**
     * 存入日志
     *
     * @param log
     */
    public void put(String log) {
        logDeque.offer(log);
    }
    class LogThreadKafka extends Thread {
        @Override
        public void run() {
            while (true) {
                String log = logDeque.poll();
                if (!StringUtils.isEmpty(log)) {
                    // 将消息投递kafka中
                    kafkaTemplate.send("kafka-log", log);
                }
            }
        }
    }
}


五、验证效果

111.png


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块

相关文章
|
11天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
39 7
|
7天前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
51 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
11天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
29 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
7天前
|
存储 监控 安全
|
26天前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
41 3
|
9天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
1月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
65 2
|
22天前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
1月前
|
存储 分布式计算 监控
C# 创建一个分布式文件存储系统需要怎么设计??
C# 创建一个分布式文件存储系统需要怎么设计??
29 0
|
23天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。