引言
本文代码已提交至Github(版本号:
da9e2df57a8650a18003bdbde1f24c3c4e21c8a4
),有兴趣的同学可以下载来看看:https://github.com/ylw-github/taodong-shop
前面已经把分布式日志收集(ELK+Kafka
)的原理以及环境搭建成功了,有兴趣的同学可以参阅下:
本文目录结构:
1. 写在本文前
在原理篇,ELK+Kafka实现分布式日志收集的原理图如下:
为了让大家更加清楚实际场景的应用,我修改了原理图:
流程:每个微服务以及logstash都有一个日志的主题topic,如上图商品搜索服务发布的日志主题为goods_logs
,第一个logstash订阅的日志主题也为 logstash
,当商品搜服务AOP
拦截到日志时,会发布内容到Kafka
集群,Kafka
作为MQ服务器,会转发消息到订阅该主题的logstash
。logstash
接收到消息,会写入到ES
集群,用户可以使用Kibana
查看日志信息。
下文将以商品服务日志采集为案例来讲解:
2. ELK+Kafka日志采集基础模块
为了方便每个微服务使用ELK+kafka
日志采集,所以把它做成一个基础设施模块,每个微服务只要依赖便可实现日志发送到Kafka进行采集。好,现在开始讲解整合过程。
1.创建taodong-shop-basics-elk-kafka
基础设施模块:
2.maven添加kafka依赖:
<dependencies> <!-- springboot 整合Kafka --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies>
3.定义Kafka发送工具类(注意主题在这里写死,为product_log
):
/** * description: kafka发送工具类 * create by: YangLinWei * create time: 2020/4/29 4:05 下午 */ @Component @Slf4j public class KafkaSender<T> { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * kafka 发送消息 * * @param obj 消息对象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); log.info("send message = {}", jsonObj); // 发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("product_log", jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { // TODO 业务处理 log.info("The message was sent successfully: " + stringObjectSendResult.toString()); } }); } }
4.定义AOP切面类,拦截日志:
/** * description: ELK拦截日志信息 * create by: YangLinWei * create time: 2020/4/29 4:07 下午 */ @Aspect @Component public class AopLogAspect { @Autowired private KafkaSender<JSONObject> kafkaSender; /** * description: 申明一个切点(里面是 execution表达式) */ @Pointcut("execution(* com.ylw.service.*.service.impl.*.*(..))") private void serviceAspect() { } /** * description: 请求method前打印内容 */ @Before(value = "serviceAspect()") public void methodBefore(JoinPoint joinPoint) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 jsonObject.put("request_time", df.format(new Date())); jsonObject.put("request_url", request.getRequestURL().toString()); jsonObject.put("request_method", request.getMethod()); jsonObject.put("signature", joinPoint.getSignature()); jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs())); JSONObject requestJsonObject = new JSONObject(); requestJsonObject.put("request", jsonObject); kafkaSender.send(requestJsonObject); } /** * description: 在方法执行完结后打印返回内容 */ @AfterReturning(returning = "o", pointcut = "serviceAspect()") public void methodAfterReturing(Object o) { JSONObject respJSONObject = new JSONObject(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 jsonObject.put("response_time", df.format(new Date())); jsonObject.put("response_content", JSONObject.toJSONString(o)); respJSONObject.put("response", jsonObject); kafkaSender.send(respJSONObject); } }
3. 服务集成日志采集模块
1.在商品微服务添加日志采集基础模块taodong-shop-basics-elk-kafk
:
<dependency> <groupId>com.ylw</groupId> <artifactId>taodong-shop-basics-elk-kafka</artifactId> <version>1.0-RELEASE</version> </dependency>
2.配置商品微服务的kafka信息(application.yml
如下最后两行):
spring: application: name: taodong-shop-service-goods datasource: username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/taodong-goods?characterEncoding=utf8&useSSL=false redis: host: 127.0.0.1 jedis: pool: max-active: 1000 max-idle: 100 max-wait: -1 min-idle: 1 port: 6379 data: elasticsearch: cluster-name: elasticsearch-cluster cluster-nodes: 192.168.162.134:9301,192.168.162.134:9302 kafka: bootstrap-servers: 192.168.162.134:9092
4. Logstash配置
1.新建配置文件product_log.conf
,内容如下(里面有logstash
订阅的主题):
input { kafka { bootstrap_servers => "192.168.162.134:9092" topics => ["product_log"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.162.134:9201","192.168.162.134:9202"] index => "product_log" } }
2.上传至logstash的config目录(/usr/local/logstash-5.6.12/config
):
5. 测试
1.启动zookeeper、kafka(如果没有安装好,请看上一篇博客
):
docker start zookeeper docker start kafka
2.启动es集群、kibana(如果没有安装好,请看前面的博客)
docker start ES01 docker start ES02 docker start kibana-cluster
3.启动logstash
cd /usr/local/logstash-5.6.12/bin/ ./logstash -f ../config/product_log.conf
4.如果在kafka没有创建product_log
主题,则需要创建,如下图创建成功:
docker exec -it kafka /bin/bash /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.162.134:2181 --replication-factor 1 --partitions 1 --topic product_log /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.162.134:2181
分割线 - - - - - - - - - - -上面的环境都启动好了,正式开始测试- - - - - - - - - - - -
5.启动商品服务
6.浏览器请求:http://localhost:8500/search?name=pg
7.在longstah的控制台,可以看到接收到kafka发送过来的内容:
请求内容:
响应内容:
8.在kibana查看,也可以查询到日志:
成功,本文完!