一. 前言
其实早前就想计划出这篇文章,但是最近主要精力在完善微服务、系统权限设计、微信小程序和管理前端的功能,不过好在有群里小伙伴的一起帮忙反馈问题,基础版的功能已经差不多,也在此谢过,希望今后大家还是能够相互学习,一起进步~
OK,回正题,ELK是Elasticsearch、Logstash、Kibana三个开源软件的组合,相信很多童鞋使用ELK有去做过分布式日志收集。流程概括为:微服务应用把Logback输出的日志通过HTTP传输至LogStash,然后经过分析过滤,转发至ES,再由Kibana提供检索和统计可视化界面。
在本实战案例中,使用Spring AOP、Logback横切认证接口来记录用户登录日志,收集到ELK,通过SpringBoot整合RestHighLevelClient实现对ElasticSearch数据检索和统计。从日志搜集到数据统计,一次性的走个完整,快速入门ElasticSearch。
本篇涉及的前后端全部源码已上传gitee和github,熟悉有来项目的童鞋快速过一下步骤即可。
项目名称 Github 码云
后台 youlai-mall youlai-mall
前端 youlai-mall-admin youlai-mall-admin
二. 需求
基于ELK的日志搜集的功能,本篇实现的需求如下:
记录系统用户登录日志,信息包括用户IP、登录耗时、认证令牌JWT
统计十天内用户登录次数、今日访问IP和总访问IP
充分利用记录的JWT信息,通过黑名单的方式让JWT失效实现强制下线
实现效果:
Kibana日志可视化统计
登录次数统计、今日访问IP统计、总访问IP统计
登录信息,强制用户下线,演示的是自己强制自己下线的效果
三. Docker快速搭建ELK环境
1. 拉取镜像
docker pull elasticsearch:7.10.1
docker pull kibana:7.10.1
docker pull logstash:7.10.1
2. elasticsearch部署
1. 环境准备
# 创建文件
mkdir -p /opt/elasticsearch/{plugins,data} /etc/elasticsearch
touch /etc/elasticsearch/elasticsearch.yml
chmod -R 777 /opt/elasticsearch/data/
vim /etc/elasticsearch/elasticsearch.yml
# 写入
cluster.name: elasticsearch
http.cors.enabled: true
http.cors.allow-origin: "*"
http.host: 0.0.0.0
node.max_local_storage_nodes: 100
2. 启动容器
docker run -d --name=elasticsearch --restart=always \
-e discovery.type=single-node \
-e ES_JAVA_OPTS="-Xms256m -Xmx256m" \
-p 9200:9200 \
-p 9300:9300 \
-v /etc/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /opt/elasticsearch/data:/usr/share/elasticsearch/data \
-v /opt/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:7.10.1
3. 验证和查看ElasticSearch版本
curl -XGET localhost:9200
1
2. kibana部署
1. 环境准备
# 创建文件
mkdir -p /etc/kibana
vim /etc/kibana/kibana.yml
# 写入
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
i18n.locale: "zh-CN"
2. 启动容器
docker run -d --restart always -p 5601:5601 --link elasticsearch \
-e ELASTICSEARCH_URL=http://elasticsearch:9200 \
-v /etc/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \
kibana:7.10.1
3. logstash部署
1. 环境准备
配置 logstash.yml
# 创建文件
mkdir -p /etc/logstash/config
vim /etc/logstash/config/logstash.yml
# 写入
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
xpack.management.pipeline.id: ["main"]
配置pipeline.yml
# 创建文件
vim /etc/logstash/config/pipeline.yml
# 写入(注意空格)
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline/logstash.config"
配置logstash.conf
# 创建文件
mkdir -p /etc/logstash/pipeline
vim /etc/logstash/pipeline/logstash.conf
# 写入
input {
tcp {
port => 5044
mode => "server"
host => "0.0.0.0"
codec => json_lines
}
}
filter{
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
# 索引名称,没有会自动创建
index => "%{[project]}-%{[action]}-%{+YYYY-MM-dd}"
}
}
2. 启动容器
docker run -d --restart always -p 5044:5044 -p 9600:9600 --name logstash --link elasticsearch \
-v /etc/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \
-v /etc/logstash/config/pipeline.yml:/usr/share/logstash/config/pipeline.yml \
-v /etc/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
logstash:7.10.1
4. 测试
访问: http://localhost:5601/
四. Spring AOP + Logback 横切打印登录日志
1. Spring AOP横切认证接口添加日志
代码坐标: common-web#LoginLogAspect
@Aspect
@Component
@AllArgsConstructor
@Slf4j
@ConditionalOnProperty(value = "spring.application.name", havingValue = "youlai-auth")
public class LoginLogAspect {
@Pointcut("execution(public * com.youlai.auth.controller.AuthController.postAccessToken(..))")
public void Log() {
}
@Around("Log()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
LocalDateTime startTime = LocalDateTime.now();
Object result = joinPoint.proceed();
// 获取请求信息
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 刷新token不记录
String grantType=request.getParameter(AuthConstants.GRANT_TYPE_KEY);
if(grantType.equals(AuthConstants.REFRESH_TOKEN)){
return result;
}
// 时间统计
LocalDateTime endTime = LocalDateTime.now();
long elapsedTime = Duration.between(startTime, endTime).toMillis(); // 请求耗时(毫秒)
// 获取接口描述信息
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String description = signature.getMethod().getAnnotation(ApiOperation.class).value();// 方法描述
String username = request.getParameter(AuthConstants.USER_NAME_KEY); // 登录用户名
String date = startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); // 索引名需要,因为默认生成索引的date时区不一致
// 获取token
String token = Strings.EMPTY;
if (request != null) {
JSONObject jsonObject = JSONUtil.parseObj(result);
token = jsonObject.getStr("value");
}
String clientIP = IPUtils.getIpAddr(request); // 客户端请求IP(注意:如果使用Nginx代理需配置)
String region = IPUtils.getCityInfo(clientIP); // IP对应的城市信息
// MDC 扩展logback字段,具体请看logback-spring.xml的自定义日志输出格式
MDC.put("elapsedTime", StrUtil.toString(elapsedTime));
MDC.put("description", description);
MDC.put("region", region);
MDC.put("username", username);
MDC.put("date", date);
MDC.put("token", token);
MDC.put("clientIP", clientIP);
log.info("{} 登录,耗费时间 {} 毫秒", username, elapsedTime); // 收集日志这里必须打印一条日志,内容随便吧,记录在message字段,具体看logback-spring.xml文件
return result;
}
}
2. Logback日志上传至LogStash
代码坐标:common-web#logback-spring.xml
localhost:5044
Asia/Shanghai
{
"project": "${APP_NAME}",
"date": "%X{date}",
"action":"login",
"pid": "${PID:-}",
"thread": "%thread",
"message": "%message",
"elapsedTime": "%X{elapsedTime}",
"username":"%X{username}",
"clientIP": "%X{clientIP}",
"region":"%X{region}",
"token":"%X{token}",
"loginTime": "%date{\"yyyy-MM-dd HH:mm:ss\"}",
"description":"%X{description}"
}
5 minutes
localhost:5044 Logstash配置的input收集数据的监听
%X{username} 输出MDC添加的username的值