概述
项目中的需求,需要在浏览器前端页面输出执行日志,也就是说将后台的log中的info,error日志输,如下图,,帮助快速定位问题。
那么该如何实现呢?
整体设计
该功能的实现主要采用下面的技术:
- 通过websocket实现日志消息的传输
- 通过redis的发布、订阅模式实现消息的流转
上图是指标测试的整个时序流程图。
- 用户点击指标测试按钮,会同时建立两个请求,上图中的1和4,一个是http请求,
rs/sd/feature/test/run
,执行功能。 同时建立长连接websocket请求,sockjs/webSocketServer/info
, 他们有个一个共同的参数websocketKey, 值须要保持一致,这样才能够识别此次指标测试的数据推送到哪个webSocket中。 - rs/sd/feature/test/run接口执行过程中的info, warn, error日志都会被推送redis中,这些日志中包含了websocketKey, 标记这个日志属于哪个指标测试的,见上图中的2。
- 应用订阅redis主题,获取日志消息,解析,根据websocketKey字段,从sdm推送数据回浏览器,见图7。
代码实现
建立websocket请求
- websocket配置类
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Value("${websocket.num:100}") private int sessionNum; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(webSocketHandler(), "/webSocketServer") .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*"); registry.addHandler(webSocketHandler(), "/sockjs/webSocketServer") .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS(); } @Bean public SpringWebSocketHandler webSocketHandler() { return new SpringWebSocketHandler(sessionNum); } @Bean public RequestUpgradeStrategy strategy() { return new TomcatRequestUpgradeStrategy(); } }
public class SpringWebSocketHandler extends TextWebSocketHandler { private static final String SESSION_ID = "HTTP.SESSION.ID"; private Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class); private Map<String, WebSocketSession> sessions; private Pattern URL_PATTERN = Pattern.compile("(\?|&+)(.+?)=([^&]*)"); public SpringWebSocketHandler(int maxCacheSzie) { sessions = Collections.synchronizedMap(new LRUCache<String, WebSocketSession>(maxCacheSzie)); } /** * 连接建立之后回调 * * @param session * @throws Exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String key = getSessionKey(session); sessions.put(key, session); logger.info("session[{}] connected, current cached websocket session size: {}", key, sessions.size()); } /** * 连接关闭后回调 * * @param session * @param closeStatus * @throws Exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { String key = getSessionKey(session); sessions.remove(key); logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", key, sessions.size()); } private String getSessionKey(WebSocketSession session) { Matcher matcher = URL_PATTERN.matcher(session.getUri().toString()); String key = null; while (matcher.find()) { if(Objects.equals(matcher.group(2), "websocketKey")) { key = matcher.group(3); } } if (StringUtils.isEmpty(key)) { key = String.valueOf(session.getAttributes().get(SESSION_ID)); } return key; } /** * 发送消息处理 * * @param session * @param message * @throws Exception */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { super.handleTextMessage(session, message); } /** * 传输错误处理 * * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { logger.warn("websocket handleTransportError: {}", exception.getMessage()); if (session.isOpen()) { session.close(); } String sessionId = (String)session.getAttributes().get("HTTP.SESSION.ID"); sessions.remove(sessionId); logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", sessionId, sessions.size()); } /** * 是否支持处理部分的消息 * * @return */ @Override public boolean supportsPartialMessages() { return false; } /** * 对指定session发送消息 * * @param sessionId * @param message */ public void sendMessageToSession(String sessionId, TextMessage message) { WebSocketSession session = sessions.get(sessionId); if (session == null) { return; } try { if (session.isOpen()) { session.sendMessage(message); } else { sessions.remove(sessionId); } } catch (IOException e) { logger.error("sendMessageToSession error", e); } } /** * 群发消息 * * @param message */ public void sendMessageToSessions(TextMessage message) { Iterator<Map.Entry<String, WebSocketSession>> it = sessions.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String, WebSocketSession> entry = it.next(); String sessionId = entry.getKey(); WebSocketSession session = entry.getValue(); try { if (session.isOpen()) { session.sendMessage(message); } else { sessions.remove(sessionId); } } catch (IOException e) { logger.error("sendMessageToSessions error", e); } } } }
添加日志的appender
- 定义webconsole appender
@Slf4j @Plugin(name = "WebConsoleAppender", category = "core", elementType = "appender", printObject = true) public class WebConsoleLogAppender extends AbstractAppender { public WebConsoleLogAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, Property[] properties) { super(name, filter, layout, ignoreExceptions, properties); } @Override public void append(LogEvent logEvent) { String websocketkey = ServiceContext.getWebsocketKey(); if (StrUtil.isNotEmpty(websocketkey) && !websocketkey.toUpperCase().startsWith("SL") && !websocketkey.toUpperCase().startsWith("PKG")) { String content = new String(this.getLayout().toByteArray(logEvent)); LoggerEvent loggerEvent = new LoggerEvent(websocketkey, logEvent.getLevel().toString(), "webconsole", content, 50000000); WebConsoleLogMsgManager.sendMsg(loggerEvent); } } @PluginFactory public static WebConsoleLogAppender createAppender(@PluginAttribute("name") String name, @PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) { if (name == null) { log.warn("No name provided for WebConsoleAppender"); return null; } if (layout == null) { layout = PatternLayout.createDefaultLayout(); } return new WebConsoleLogAppender(name, filter, layout, ignoreExceptions, null); } }
任何日志打印,该类的append方法都会接受到,然后推送到redis中,详见append方法。
- 将appender添加到log4j.xml或者logback.xml中
通过redis的消费订阅模式推送日志消息
- redis发布订阅配置
@Configuration @ConditionalOnBean(RedisConfig.class) public class RedisSubcrbieConfig { /** * 订阅主题 */ public static final String SUB_KEY = "WEB_CONSOLE_LOG"; @Bean(name = "stringRedisTemplate") public RedisTemplate stringRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); redisTemplate.setConnectionFactory(lettuceConnectionFactory); return redisTemplate; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic(SUB_KEY)); return container; } @Bean MessageListenerAdapter listenerAdapter(WebConsoleLogMsgReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMsg"); } }
public interface WebConsoleLogMsgReceiver { /** * 接受日志消息 * @param message 日志消息内容 */ void receiveMsg(String message); }
@Component public class RedisLogMsgReceiver implements WebConsoleLogMsgReceiver { @Autowired private SpringWebSocketHandler handler; @Override public void receiveMsg(String message) { LoggerEvent loggerEvent = JSON.parseObject(message, LoggerEvent.class); TextMessage textMessage = new TextMessage(JSON.toJSONBytes(loggerEvent, SerializerFeature.PrettyFormat)); handler.sendMessageToSession(loggerEvent.getSessionId(), textMessage); } }
- 发布日志消息
public interface WebConsoleLogMsgSender { /** * 发送webconsole日志数据 * @param loggerEvent 日志数据 */ void send(LoggerEvent loggerEvent); }
@Component @Slf4j public class RedisLogMsgSender implements WebConsoleLogMsgSender { @Autowired @Qualifier("stringRedisTemplate") private RedisTemplate redisTemplate; @Override public void send(LoggerEvent loggerEvent) { redisTemplate.convertAndSend(RedisSubcrbieConfig.SUB_KEY, JSON.toJSONString(loggerEvent)); } }
@Slf4j public class WebConsoleLogMsgManager { /** * 发送消息 * @param loggerEvent 日志对象 */ public static void sendMsg(LoggerEvent loggerEvent) { if(loggerEvent == null) { log.warn("logger event is empty"); return; } WebConsoleLogMsgSender sender = SdBeanFactory.getBean(WebConsoleLogMsgSender.class); sender.send(loggerEvent); } }
- 在appender中发布日志消息
总结
大家想过,这里采用了redis作为发布订阅模式,为什么不是通过kafka的队列呢?
因为应用为了高可用,需要部署多个节点,每个节点去kafkfa消费的group必须不一样,也就是说,sdm每个节点必须都要消费全部的日志。为什么这样?
试想如果应用是多个节点,会存在什么问题呢?比如有两个应用节点, A节点, B节点。 用户点击指标测试按钮,如果sockjs/webSocketServer/info被转发到A节点,如果AB节点共享同一个group的话,此时正好B节点消费了kafka数据,那么A节点就消费不到,转发到A节点的websocket也就获取不到日志数据,无法展示了,所以要保证不同的节点都要消费到kafkfa数据,sdm节点的group要不一样,但是实际情况下,group一般都是比较固定的。