ELK+Kafka搭建分布式日志收集系统

简介: ELK+Kafka搭建分布式日志收集系统

正文


一、传统日志收集的弊端


       我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。


二、ELK收集系统过程


       基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。


     

333.png


在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。


三、搭建ELK系统


Zookeeper搭建


Kafka搭建


ElasticSearch搭建


Kibana搭建


Logstash搭建


本文演示基于Docker-compose,所有的均为单机


1、搭建docker-compose


#下载docker-compose文件
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
#授权
sudo chmod +x /usr/local/bin/docker-compose


2、创建目录


mkdir -p /usr/local/docker-compose/elk


3、在上面目录创建docker-compose.yml文件  


version: '2'
services:
  zookeeper:
    image: zookeeper:latest
    container_name: zookeper
    ports:
      - "2181:2181"                 
  kafka:
    image: wurstmeister/kafka:latest 
    container_name: kafka
    volumes: 
        - /etc/localtime:/etc/localtime 
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160   
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000                
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
    restart: always
    container_name: elasticsearch
    environment:
     - discovery.type=single-node #单点启动,实际生产不允许
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
    - 9200:9200
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.2
    restart: always
    container_name: kibana
    ports:
    - 5601:5601
    environment:
      - elasticsearch_url=http://192.168.139.160:9200
    depends_on:
      - elasticsearch
  logstash:
    image: docker.elastic.co/logstash/logstash:7.15.2
    volumes:
        -  /data/logstash/pipeline/:/usr/share/logstash/pipeline/
        -  /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml 
        -  /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml 
    restart: always
    container_name: logstash
    ports:
    - 9600:9600
    depends_on:
      - elasticsearch


4、启动.


1. #进入docker-compose所在的目录执行
2. [root@localhost elk]# docker-compose  up


四、代码


切面类


package com.xiaojie.elk.aop;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.elk.pojo.RequestPojo;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 日志切面类
 * @date 2021/12/5 16:51
 */
@Aspect
@Component
public class AopLogAspect {
    @Value("${server.port}")
    private String serverPort;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 申明一个切点 里面是 execution表达式
    @Pointcut("execution(* com.xiaojie.elk.service.*.*(..))")
    private void serviceAspect() {
    }
    @Autowired
    private LogContainer logContainer;
    // 请求method前打印内容
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        RequestPojo requestPojo = new RequestPojo();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    // 在方法执行完结后打印返回内容
/*    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        // IP地址信息
        jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
        respJSONObject.put("response", jsonObject);
        logContainer.put(respJSONObject.toJSONString());
    }*/
    /**
     * 异常通知
     *
     * @param point
     */
    @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
    public void serviceAspect(JoinPoint joinPoint, Exception e) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        RequestPojo requestPojo = new RequestPojo();
        requestPojo.setRequestTime(df.format(new Date()));
        requestPojo.setUrl(request.getRequestURL().toString());
        requestPojo.setMethod(request.getMethod());
        requestPojo.setSignature(joinPoint.getSignature().toString());
        requestPojo.setArgs(Arrays.toString(joinPoint.getArgs()));
        // IP地址信息
        requestPojo.setAddress(getIpAddr(request) + ":" + serverPort);
        requestPojo.setError(e.toString());
        // 将日志信息投递到kafka中
        String log = JSONObject.toJSONString(requestPojo);
        logContainer.put(log);
    }
    public static String getIpAddr(HttpServletRequest request) {
        //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
        String ipAddress = request.getHeader("x-forwarded-for");
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
            ipAddress = request.getRemoteAddr();
            if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
                //根据网卡取本机配置的IP
                InetAddress inet = null;
                try {
                    inet = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                ipAddress = inet.getHostAddress();
            }
        }
        //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
        if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
            if (ipAddress.indexOf(",") > 0) {
                ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
            }
        }
        return ipAddress;
    }
}


异步线程


package com.xiaojie.elk.aop;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 开启异步线程发送日志
 * @date 2021/12/5 16:50
 */
@Component
public class LogContainer {
    private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public LogContainer() {
        // 初始化
        new LogThreadKafka().start();
    }
    /**
     * 存入日志
     *
     * @param log
     */
    public void put(String log) {
        logDeque.offer(log);
    }
    class LogThreadKafka extends Thread {
        @Override
        public void run() {
            while (true) {
                String log = logDeque.poll();
                if (!StringUtils.isEmpty(log)) {
                    // 将消息投递kafka中
                    kafkaTemplate.send("kafka-log", log);
                }
            }
        }
    }
}


五、验证效果

111.png


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块

相关文章
WGLOG日志管理系统是怎么收集日志的
WGLOG通过部署Agent客户端采集日志,Agent持续收集指定日志文件并上报Server,Server负责展示与分析。Agent与Server需保持相同版本。官网下载地址:www.wgstart.com
|
7月前
|
Prometheus 监控 Cloud Native
基于docker搭建监控系统&日志收集
Prometheus 是一款由 SoundCloud 开发的开源监控报警系统及时序数据库(TSDB),支持多维数据模型和灵活查询语言,适用于大规模集群监控。它通过 HTTP 拉取数据,支持服务发现、多种图表展示(如 Grafana),并可结合 Loki 实现日志聚合。本文介绍其架构、部署及与 Docker 集成的监控方案。
670 122
基于docker搭建监控系统&日志收集
|
8月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
384 7
|
10月前
|
监控 API 开发工具
HarmonyOS Next的HiLog日志系统完全指南:从入门到精通
本文深入解析HarmonyOS Next的HiLog日志系统,涵盖日志级别、核心API、隐私保护与高级回调功能,助你从入门到精通掌握这一重要开发工具。
|
7月前
|
Ubuntu
在Ubuntu系统上设置syslog日志轮替与大小限制
请注意,在修改任何系统级别配置之前,请务必备份相应得原始档案并理解每项变更可能带来得影响。
856 2
|
9月前
|
存储
WGLOG日志管理系统可以采集网络设备的日志吗
WGLOG日志审计系统提供开放接口,支持外部获取日志内容后发送至该接口,实现日志的存储与分析。详情请访问:https://www.wgstart.com/wglog/docs9.html
|
存储 消息中间件 缓存
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
基于阿里云SelectDB,MiniMax构建了覆盖国内及海外业务的日志可观测中台,总体数据规模超过数PB,日均新增日志写入量达数百TB。系统在P95分位查询场景下的响应时间小于3秒,峰值时刻实现了超过10GB/s的读写吞吐。通过存算分离、高压缩比算法和单副本热缓存等技术手段,MiniMax在优化性能的同时显著降低了建设成本,计算资源用量降低40%,热数据存储用量降低50%,为未来业务的高速发展和技术演进奠定了坚实基础。
580 1
MiniMax GenAI 可观测性分析 :基于阿里云 SelectDB 构建 PB 级别日志系统
|
存储 JSON Go
PHP 日志系统的最佳搭档:一个 Go 写的远程日志收集服务
为了不再 SSH 上去翻日志,我写了个 Go 小脚本,用来接收远程日志。PHP 负责记录日志,Go 负责存储和展示,按天存储、支持 API 访问、可远程管理,终于能第一时间知道项目炸了。
287 10
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
587 1