淘东电商项目(50) -ELK+Kafka分布式日志收集(实现篇)

本文涉及的产品
云原生网关 MSE Higress,422元/月
日志服务 SLS,月写入数据量 50GB 1个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 淘东电商项目(50) -ELK+Kafka分布式日志收集(实现篇)

引言

本文代码已提交至Github(版本号:da9e2df57a8650a18003bdbde1f24c3c4e21c8a4),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop

前面已经把分布式日志收集(ELK+Kafka)的原理以及环境搭建成功了,有兴趣的同学可以参阅下:

本文目录结构:

l____引言

l____ 1. 写在本文前

l____ 2. ELK+Kafka日志采集基础模块

l____ 3. 服务集成日志采集模块

l____ 4. Logstash配置

l____ 5. 测试

1. 写在本文前

在原理篇,ELK+Kafka实现分布式日志收集的原理图如下:

为了让大家更加清楚实际场景的应用,我修改了原理图:

流程:每个微服务以及logstash都有一个日志的主题topic,如上图商品搜索服务发布的日志主题为goods_logs,第一个logstash订阅的日志主题也为 logstash,当商品搜服务AOP拦截到日志时,会发布内容到Kafka集群,Kafka作为MQ服务器,会转发消息到订阅该主题的logstashlogstash接收到消息,会写入到ES集群,用户可以使用Kibana查看日志信息。

下文将以商品服务日志采集为案例来讲解:

2. ELK+Kafka日志采集基础模块

为了方便每个微服务使用ELK+kafka日志采集,所以把它做成一个基础设施模块,每个微服务只要依赖便可实现日志发送到Kafka进行采集。好,现在开始讲解整合过程。

1.创建taodong-shop-basics-elk-kafka基础设施模块:

2.maven添加kafka依赖:

<dependencies>
  <!-- springboot 整合Kafka -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
  </dependency>
</dependencies>

3.定义Kafka发送工具类(注意主题在这里写死,为product_log):

/**
 * description: kafka发送工具类
 * create by: YangLinWei
 * create time: 2020/4/29 4:05 下午
 */
@Component
@Slf4j
public class KafkaSender<T> {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    /**
     * kafka 发送消息
     *
     * @param obj 消息对象
     */
    public void send(T obj) {
        String jsonObj = JSON.toJSONString(obj);
        log.info("send message = {}", jsonObj);
        // 发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("product_log", jsonObj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("The message failed to be sent:" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                // TODO 业务处理
                log.info("The message was sent successfully: " + stringObjectSendResult.toString());
            }
        });
    }
}

4.定义AOP切面类,拦截日志:

/**
 * description: ELK拦截日志信息
 * create by: YangLinWei
 * create time: 2020/4/29 4:07 下午
 */
@Aspect
@Component
public class AopLogAspect {
    @Autowired
    private KafkaSender<JSONObject> kafkaSender;
  /**
   * description: 申明一个切点(里面是 execution表达式)
   */
    @Pointcut("execution(* com.ylw.service.*.service.impl.*.*(..))")
    private void serviceAspect() {
    }
  /**
   * description: 请求method前打印内容
   */
    @Before(value = "serviceAspect()")
    public void methodBefore(JoinPoint joinPoint) {
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("request_time", df.format(new Date()));
        jsonObject.put("request_url", request.getRequestURL().toString());
        jsonObject.put("request_method", request.getMethod());
        jsonObject.put("signature", joinPoint.getSignature());
        jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
        JSONObject requestJsonObject = new JSONObject();
        requestJsonObject.put("request", jsonObject);
        kafkaSender.send(requestJsonObject);
    }
  /**
   * description: 在方法执行完结后打印返回内容
   */
    @AfterReturning(returning = "o", pointcut = "serviceAspect()")
    public void methodAfterReturing(Object o) {
        JSONObject respJSONObject = new JSONObject();
        JSONObject jsonObject = new JSONObject();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
        jsonObject.put("response_time", df.format(new Date()));
        jsonObject.put("response_content", JSONObject.toJSONString(o));
        respJSONObject.put("response", jsonObject);
        kafkaSender.send(respJSONObject);
    }
}

3. 服务集成日志采集模块

1.在商品微服务添加日志采集基础模块taodong-shop-basics-elk-kafk

<dependency>
  <groupId>com.ylw</groupId>
  <artifactId>taodong-shop-basics-elk-kafka</artifactId>
  <version>1.0-RELEASE</version>
</dependency>

2.配置商品微服务的kafka信息(application.yml如下最后两行):

spring:
  application:
    name: taodong-shop-service-goods
  datasource:
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/taodong-goods?characterEncoding=utf8&useSSL=false
  redis:
    host: 127.0.0.1
    jedis:
      pool:
        max-active: 1000
        max-idle: 100
        max-wait: -1
        min-idle: 1
    port: 6379
  data:
    elasticsearch:
      cluster-name: elasticsearch-cluster
      cluster-nodes: 192.168.162.134:9301,192.168.162.134:9302
  kafka:
    bootstrap-servers: 192.168.162.134:9092

4. Logstash配置

1.新建配置文件product_log.conf,内容如下(里面有logstash订阅的主题):

input {
  kafka {
    bootstrap_servers => "192.168.162.134:9092"
    topics => ["product_log"]
  }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
       hosts => ["192.168.162.134:9201","192.168.162.134:9202"]
       index => "product_log"
    }
}

2.上传至logstash的config目录(/usr/local/logstash-5.6.12/config):

5. 测试

1.启动zookeeper、kafka(如果没有安装好,请看上一篇博客

):

docker start zookeeper
docker start kafka

2.启动es集群、kibana(如果没有安装好,请看前面的博客)

docker start ES01
docker start ES02
docker start kibana-cluster

3.启动logstash

cd /usr/local/logstash-5.6.12/bin/
./logstash -f ../config/product_log.conf

4.如果在kafka没有创建product_log主题,则需要创建,如下图创建成功:

docker exec -it kafka /bin/bash
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.162.134:2181 --replication-factor 1 --partitions 1 --topic product_log
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.162.134:2181

分割线 - - - - - - - - - - -上面的环境都启动好了,正式开始测试- - - - - - - - - - - -

5.启动商品服务

6.浏览器请求:http://localhost:8500/search?name=pg

7.在longstah的控制台,可以看到接收到kafka发送过来的内容:

请求内容:

响应内容:

8.在kibana查看,也可以查询到日志:

成功,本文完!

目录
相关文章
|
3月前
|
存储 消息中间件 网络协议
日志平台-ELK实操系列(一)
日志平台-ELK实操系列(一)
|
24天前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
43 1
|
1月前
|
存储 监控 安全
|
3月前
|
NoSQL Java Redis
面试官:项目中如何实现分布式锁?
面试官:项目中如何实现分布式锁?
102 6
面试官:项目中如何实现分布式锁?
|
2月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
50 4
|
2月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
52 1
|
2月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
33 1
|
2月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
57 1
|
3月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
39 3
|
3月前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
218 11