淘东电商项目(50) -ELK+Kafka分布式日志收集(实现篇)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 淘东电商项目(50) -ELK+Kafka分布式日志收集(实现篇)

引言

本文代码已提交至Github(版本号:da9e2df57a8650a18003bdbde1f24c3c4e21c8a4),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop

前面已经把分布式日志收集(ELK+Kafka)的原理以及环境搭建成功了,有兴趣的同学可以参阅下:

本文目录结构:

l____引言

l____ 1. 写在本文前

l____ 2. ELK+Kafka日志采集基础模块

l____ 3. 服务集成日志采集模块

l____ 4. Logstash配置

l____ 5. 测试

1. 写在本文前

在原理篇,ELK+Kafka实现分布式日志收集的原理图如下:

为了让大家更加清楚实际场景的应用,我修改了原理图:

流程:每个微服务以及logstash都有一个日志的主题topic,如上图商品搜索服务发布的日志主题为goods_logs,第一个logstash订阅的日志主题也为 logstash,当商品搜服务AOP拦截到日志时,会发布内容到Kafka集群,Kafka作为MQ服务器,会转发消息到订阅该主题的logstashlogstash接收到消息,会写入到ES集群,用户可以使用Kibana查看日志信息。

下文将以商品服务日志采集为案例来讲解:

2. ELK+Kafka日志采集基础模块

为了方便每个微服务使用ELK+kafka日志采集,所以把它做成一个基础设施模块,每个微服务只要依赖便可实现日志发送到Kafka进行采集。好,现在开始讲解整合过程。

1.创建taodong-shop-basics-elk-kafka基础设施模块:

2.maven添加kafka依赖:

<dependencies>
  <!-- springboot 整合Kafka -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
  </dependency>
</dependencies>

3.定义Kafka发送工具类(注意主题在这里写死,为product_log):

/**
 * description: kafka发送工具类
 * create by: YangLinWei
 * create time: 2020/4/29 4:05 下午
 */
@Component
@Slf4j
public class KafkaSender<T> {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    /**
     * kafka 发送消息
     *
     * @param obj 消息对象
     */
    public void send(T obj) {
        String jsonObj = JSON.toJSONString(obj);
        log.info("send message = {}", jsonObj);
        // 发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("product_log", jsonObj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("The message failed to be sent:" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                // TODO 业务处理
                log.info("The message was sent successfully: " + stringObjectSendResult.toString());
            }
        });
    }
}

4.定义AOP切面类,拦截日志:

/**
 * description: ELK拦截日志信息
 * create by: YangLinWei
 * create time: 2020/4/29 4:07 下午
 */
@Aspect
@Component
public class AopLogAspect {
    @Autowired
    private KafkaSender<JSONObject> kafkaSender;
  /**
   * description: 申明一个切点(里面是 execution表达式)
   */
    @Pointcut("execution(* com.ylw.service.*.service.impl.*.*(..))")
    private void serviceAspect() {
    }
  /**
   * description: 请求method前打印内容
   */
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", joinPoint.getSignature());
        jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("request", jsonObject);
        kafkaSender.send(requestJsonObject);
    }
  /**
   * description: 在方法执行完结后打印返回内容
   */
    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        respJSONObject.put("response", jsonObject);
        kafkaSender.send(respJSONObject);
    }
}

3. 服务集成日志采集模块

1.在商品微服务添加日志采集基础模块taodong-shop-basics-elk-kafk

<dependency>
  <groupId>com.ylw</groupId>
  <artifactId>taodong-shop-basics-elk-kafka</artifactId>
  <version>1.0-RELEASE</version>
</dependency>

2.配置商品微服务的kafka信息(application.yml如下最后两行):

spring:
  application:
    name: taodong-shop-service-goods
  datasource:
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/taodong-goods?characterEncoding=utf8&useSSL=false
  redis:
    host: 127.0.0.1
    jedis:
      pool:
        max-active: 1000
        max-idle: 100
        max-wait: -1
        min-idle: 1
    port: 6379
  data:
    elasticsearch:
      cluster-name: elasticsearch-cluster
      cluster-nodes: 192.168.162.134:9301,192.168.162.134:9302
  kafka:
    bootstrap-servers: 192.168.162.134:9092

4. Logstash配置

1.新建配置文件product_log.conf,内容如下(里面有logstash订阅的主题):

input {
  kafka {
    bootstrap_servers => "192.168.162.134:9092"
    topics => ["product_log"]
  }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
       hosts => ["192.168.162.134:9201","192.168.162.134:9202"]
       index => "product_log"
    }
}

2.上传至logstash的config目录(/usr/local/logstash-5.6.12/config):

5. 测试

1.启动zookeeper、kafka(如果没有安装好,请看上一篇博客

):

docker start zookeeper
docker start kafka

2.启动es集群、kibana(如果没有安装好,请看前面的博客)

docker start ES01
docker start ES02
docker start kibana-cluster

3.启动logstash

cd /usr/local/logstash-5.6.12/bin/
./logstash -f ../config/product_log.conf

4.如果在kafka没有创建product_log主题,则需要创建,如下图创建成功:

docker exec -it kafka /bin/bash
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.162.134:2181 --replication-factor 1 --partitions 1 --topic product_log
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.162.134:2181

分割线 - - - - - - - - - - -上面的环境都启动好了,正式开始测试- - - - - - - - - - - -

5.启动商品服务

6.浏览器请求:http://localhost:8500/search?name=pg

7.在longstah的控制台,可以看到接收到kafka发送过来的内容:

请求内容:

响应内容:

8.在kibana查看,也可以查询到日志:

成功,本文完!

目录
相关文章
|
2月前
|
存储 监控 数据可视化
日志分析对决:揭示 ELK 与 GrayLog 的优势和差异
日志分析对决:揭示 ELK 与 GrayLog 的优势和差异
243 0
|
3月前
|
存储 Prometheus 监控
Prometheus vs. ELK Stack:容器监控与日志管理工具的较量
随着容器化技术的广泛应用,容器监控与日志管理成为了关键任务。本文将对两种常用工具进行比较与选择,分别是Prometheus和ELK Stack。Prometheus是一款开源的监控系统,专注于时序数据的收集和告警。而ELK Stack则是一套完整的日志管理解决方案,由Elasticsearch、Logstash和Kibana三个组件组成。通过比较它们的特点、优势和适用场景,读者可以更好地了解如何选择适合自己需求的工具。
|
3月前
|
Go 数据处理 Docker
elk stack部署自动化日志收集分析平台
elk stack部署自动化日志收集分析平台
80 0
|
3月前
|
存储 监控 安全
ELK7.x日志系统搭建 1. elk基础搭建
ELK7.x日志系统搭建 1. elk基础搭建
71 0
|
3月前
|
消息中间件 数据可视化 关系型数据库
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
ELK7.x日志系统搭建 4. 结合kafka集群完成日志系统
152 0
|
17天前
|
消息中间件 存储 运维
更优性能与性价比,从自建 ELK 迁移到 SLS 开始
本文介绍了 SLS 基本能力,并和开源自建 ELK 做了对比,可以看到 SLS 相比开源 ELK 有较大优势。
54837 138
|
2月前
|
存储 监控 关系型数据库
ELK架构监控MySQL慢日志
ELK架构监控MySQL慢日志
|
3月前
|
Prometheus 监控 Cloud Native
Prometheus VS ELK Stack:容器监控与日志管理工具的比较与选择
在容器化时代,有效的容器监控与日志管理工具对于确保应用程序的可靠性和可维护性至关重要。本文将比较两个主流工具,Prometheus和ELK Stack,探讨它们在容器监控和日志管理方面的特点、优势和适用场景,帮助读者做出明智的选择。
|
3月前
|
监控 NoSQL Redis
ELK7.x日志系统搭建 3. 采用轻量级日志收集Filebeat
ELK7.x日志系统搭建 3. 采用轻量级日志收集Filebeat
105 0
|
3月前
|
消息中间件 JSON 负载均衡
ELK7.x日志系统搭建 2. Nginx、Cluster等日志收集
ELK7.x日志系统搭建 2. Nginx、Cluster等日志收集
112 0

热门文章

最新文章