通过WebSocket实现日志打印功能

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 通过WebSocket实现日志打印功能

概述


项目中的需求,需要在浏览器前端页面输出执行日志,也就是说将后台的log中的info,error日志输,如下图,,帮助快速定位问题。

1671161323362.jpg

那么该如何实现呢?


整体设计


该功能的实现主要采用下面的技术:

  1. 通过websocket实现日志消息的传输
  2. 通过redis的发布、订阅模式实现消息的流转

image.png

上图是指标测试的整个时序流程图。

  1. 用户点击指标测试按钮,会同时建立两个请求,上图中的1和4,一个是http请求,rs/sd/feature/test/run,执行功能。 同时建立长连接websocket请求, sockjs/webSocketServer/info, 他们有个一个共同的参数websocketKey, 值须要保持一致,这样才能够识别此次指标测试的数据推送到哪个webSocket中。
  2. rs/sd/feature/test/run接口执行过程中的info, warn, error日志都会被推送redis中,这些日志中包含了websocketKey, 标记这个日志属于哪个指标测试的,见上图中的2。
  3. 应用订阅redis主题,获取日志消息,解析,根据websocketKey字段,从sdm推送数据回浏览器,见图7。


代码实现


建立websocket请求


  1. websocket配置类
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Value("${websocket.num:100}")
    private int sessionNum;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler(), "/webSocketServer")
            .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*");
        registry.addHandler(webSocketHandler(), "/sockjs/webSocketServer")
            .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();
    }
    @Bean
    public SpringWebSocketHandler webSocketHandler() {
        return new SpringWebSocketHandler(sessionNum);
    }
    @Bean
    public RequestUpgradeStrategy strategy() {
        return new TomcatRequestUpgradeStrategy();
    }
}
public class SpringWebSocketHandler extends TextWebSocketHandler {
    private static final String SESSION_ID = "HTTP.SESSION.ID";
    private Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);
    private Map<String, WebSocketSession> sessions;
    private Pattern URL_PATTERN = Pattern.compile("(\?|&+)(.+?)=([^&]*)");
    public SpringWebSocketHandler(int maxCacheSzie) {
        sessions = Collections.synchronizedMap(new LRUCache<String, WebSocketSession>(maxCacheSzie));
    }
    /**
     * 连接建立之后回调
     * 
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String key = getSessionKey(session);
        sessions.put(key, session);
        logger.info("session[{}] connected, current cached websocket session size: {}", key, sessions.size());
    }
    /**
     * 连接关闭后回调
     * 
     * @param session
     * @param closeStatus
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        String key = getSessionKey(session);
        sessions.remove(key);
        logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", key,
            sessions.size());
    }
    private String getSessionKey(WebSocketSession session) {
        Matcher matcher = URL_PATTERN.matcher(session.getUri().toString());
        String key = null;
        while (matcher.find()) {
            if(Objects.equals(matcher.group(2), "websocketKey")) {
                key = matcher.group(3);
            }
        }
        if (StringUtils.isEmpty(key)) {
            key = String.valueOf(session.getAttributes().get(SESSION_ID));
        }
        return key;
    }
    /**
     * 发送消息处理
     * 
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
    }
    /**
     * 传输错误处理
     * 
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        logger.warn("websocket handleTransportError: {}", exception.getMessage());
        if (session.isOpen()) {
            session.close();
        }
        String sessionId = (String)session.getAttributes().get("HTTP.SESSION.ID");
        sessions.remove(sessionId);
        logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", sessionId,
            sessions.size());
    }
    /**
     * 是否支持处理部分的消息
     * 
     * @return
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    /**
     * 对指定session发送消息
     * 
     * @param sessionId
     * @param message
     */
    public void sendMessageToSession(String sessionId, TextMessage message) {
        WebSocketSession session = sessions.get(sessionId);
        if (session == null) {
            return;
        }
        try {
            if (session.isOpen()) {
                session.sendMessage(message);
            } else {
                sessions.remove(sessionId);
            }
        } catch (IOException e) {
            logger.error("sendMessageToSession error", e);
        }
    }
    /**
     * 群发消息
     * 
     * @param message
     */
    public void sendMessageToSessions(TextMessage message) {
        Iterator<Map.Entry<String, WebSocketSession>> it = sessions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, WebSocketSession> entry = it.next();
            String sessionId = entry.getKey();
            WebSocketSession session = entry.getValue();
            try {
                if (session.isOpen()) {
                    session.sendMessage(message);
                } else {
                    sessions.remove(sessionId);
                }
            } catch (IOException e) {
                logger.error("sendMessageToSessions error", e);
            }
        }
    }
}


添加日志的appender


  1. 定义webconsole appender
@Slf4j
@Plugin(name = "WebConsoleAppender", category = "core", elementType = "appender", printObject = true)
public class WebConsoleLogAppender extends AbstractAppender {
    public WebConsoleLogAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
        final boolean ignoreExceptions, Property[] properties) {
        super(name, filter, layout, ignoreExceptions, properties);
    }
    @Override
    public void append(LogEvent logEvent) {
        String websocketkey = ServiceContext.getWebsocketKey();
        if (StrUtil.isNotEmpty(websocketkey) && !websocketkey.toUpperCase().startsWith("SL")
            && !websocketkey.toUpperCase().startsWith("PKG")) {
            String content = new String(this.getLayout().toByteArray(logEvent));
            LoggerEvent loggerEvent =
                new LoggerEvent(websocketkey, logEvent.getLevel().toString(), "webconsole", content, 50000000);
            WebConsoleLogMsgManager.sendMsg(loggerEvent);
        }
    }
    @PluginFactory
    public static WebConsoleLogAppender createAppender(@PluginAttribute("name") String name,
        @PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout,
        @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
        if (name == null) {
            log.warn("No name provided for WebConsoleAppender");
            return null;
        }
        if (layout == null) {
            layout = PatternLayout.createDefaultLayout();
        }
        return new WebConsoleLogAppender(name, filter, layout, ignoreExceptions, null);
    }
}

任何日志打印,该类的append方法都会接受到,然后推送到redis中,详见append方法。

  1. 将appender添加到log4j.xml或者logback.xml中

1671162323228.jpg

1671162329949.jpg


通过redis的消费订阅模式推送日志消息


  1. redis发布订阅配置
@Configuration
@ConditionalOnBean(RedisConfig.class)
public class RedisSubcrbieConfig {
    /**
     * 订阅主题
     */
    public static final String SUB_KEY = "WEB_CONSOLE_LOG";
    @Bean(name = "stringRedisTemplate")
    public RedisTemplate stringRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        return redisTemplate;
    }
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
        MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(SUB_KEY));
        return container;
    }
    @Bean
    MessageListenerAdapter listenerAdapter(WebConsoleLogMsgReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMsg");
    }
}
public interface WebConsoleLogMsgReceiver {
    /**
     * 接受日志消息
     * @param message 日志消息内容
     */
    void receiveMsg(String message);
}
@Component
public class RedisLogMsgReceiver implements WebConsoleLogMsgReceiver {
    @Autowired
    private SpringWebSocketHandler handler;
    @Override
    public void receiveMsg(String message) {
        LoggerEvent loggerEvent = JSON.parseObject(message, LoggerEvent.class);
        TextMessage textMessage = new TextMessage(JSON.toJSONBytes(loggerEvent, SerializerFeature.PrettyFormat));
        handler.sendMessageToSession(loggerEvent.getSessionId(), textMessage);
    }
}
  1. 发布日志消息
public interface WebConsoleLogMsgSender {
    /**
     * 发送webconsole日志数据
     * @param loggerEvent 日志数据
     */
    void send(LoggerEvent loggerEvent);
}
@Component
@Slf4j
public class RedisLogMsgSender implements WebConsoleLogMsgSender {
    @Autowired
    @Qualifier("stringRedisTemplate")
    private RedisTemplate redisTemplate;
    @Override
    public void send(LoggerEvent loggerEvent) {
        redisTemplate.convertAndSend(RedisSubcrbieConfig.SUB_KEY, JSON.toJSONString(loggerEvent));
    }
}
@Slf4j
public class WebConsoleLogMsgManager {
    /**
     * 发送消息
     * @param loggerEvent 日志对象
     */
    public static void sendMsg(LoggerEvent loggerEvent) {
        if(loggerEvent == null) {
            log.warn("logger event is empty");
            return;
        }
        WebConsoleLogMsgSender sender = SdBeanFactory.getBean(WebConsoleLogMsgSender.class);
        sender.send(loggerEvent);
    }
}
  1. 在appender中发布日志消息

1671162359293.jpg


总结


大家想过,这里采用了redis作为发布订阅模式,为什么不是通过kafka的队列呢?

因为应用为了高可用,需要部署多个节点,每个节点去kafkfa消费的group必须不一样,也就是说,sdm每个节点必须都要消费全部的日志。为什么这样?

试想如果应用是多个节点,会存在什么问题呢?比如有两个应用节点, A节点, B节点。 用户点击指标测试按钮,如果sockjs/webSocketServer/info被转发到A节点,如果AB节点共享同一个group的话,此时正好B节点消费了kafka数据,那么A节点就消费不到,转发到A节点的websocket也就获取不到日志数据,无法展示了,所以要保证不同的节点都要消费到kafkfa数据,sdm节点的group要不一样,但是实际情况下,group一般都是比较固定的。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
使用uniapp实现websocket聊天功能
使用uniapp实现websocket聊天功能
|
9月前
|
移动开发 监控 网络协议
基于Socket通讯(C#)和WebSocket协议(net)编写的两种聊天功能(文末附源码下载地址)
基于Socket通讯(C#)和WebSocket协议(net)编写的两种聊天功能(文末附源码下载地址)
|
6月前
|
前端开发 Cloud Native Java
使用Spring WebSocket实现实时通信功能
使用Spring WebSocket实现实时通信功能
61 0
|
8月前
|
存储 JavaScript 前端开发
SpringBoot集成WebSocket实现及时通讯聊天功能!!!
SpringBoot集成WebSocket实现及时通讯聊天功能!!!
243 0
|
8月前
|
开发框架 JavaScript 前端开发
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
如何使用SpringBoot和Netty实现一个WebSocket服务器,并配合Vue前端实现聊天功能?
196 0
|
10月前
|
JSON 中间件 Go
给Go的Gin web框架增加 WebSocket 功能,让WebSocket 更好用
给Go的Gin web框架增加 WebSocket 功能,让WebSocket 更好用
|
网络协议 前端开发 安全
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
|
前端开发 Java 应用服务中间件
基于websocket的实时通告功能,推送在线用户,新登录用户
SpringBoot 部署与Spring部署都有一些差别,但现在用Srpingboot的公司多,SpringBoot创建项目快,所以使用该方式来讲解,有一个问题就是开发WebSocket时发现无法通过@Autowired注入bean,一直为空。
基于websocket的实时通告功能,推送在线用户,新登录用户
|
JavaScript
js实现websocket实例
js实现websocket实例
193 0
|
消息中间件 网络协议 前端开发
SpringBoot轻松整合WebSocket,实现Web在线聊天室
前面为大家讲述了 Spring Boot的整合Redis、RabbitMQ、Elasticsearch等各种框架组件;随着移动互联网的发展,服务端消息数据推送已经是一个非常重要、非常普遍的基础功能。今天就和大家聊聊在SpringBoot轻松整合WebSocket,实现Web在线聊天室,希望能对大家有所帮助。
SpringBoot轻松整合WebSocket,实现Web在线聊天室