通过WebSocket实现日志打印功能

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: 通过WebSocket实现日志打印功能

概述


项目中的需求,需要在浏览器前端页面输出执行日志,也就是说将后台的log中的info,error日志输,如下图,,帮助快速定位问题。

1671161323362.jpg

那么该如何实现呢?


整体设计


该功能的实现主要采用下面的技术:

  1. 通过websocket实现日志消息的传输
  2. 通过redis的发布、订阅模式实现消息的流转

image.png

上图是指标测试的整个时序流程图。

  1. 用户点击指标测试按钮,会同时建立两个请求,上图中的1和4,一个是http请求,rs/sd/feature/test/run,执行功能。 同时建立长连接websocket请求, sockjs/webSocketServer/info, 他们有个一个共同的参数websocketKey, 值须要保持一致,这样才能够识别此次指标测试的数据推送到哪个webSocket中。
  2. rs/sd/feature/test/run接口执行过程中的info, warn, error日志都会被推送redis中,这些日志中包含了websocketKey, 标记这个日志属于哪个指标测试的,见上图中的2。
  3. 应用订阅redis主题,获取日志消息,解析,根据websocketKey字段,从sdm推送数据回浏览器,见图7。


代码实现


建立websocket请求


  1. websocket配置类
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Value("${websocket.num:100}")
    private int sessionNum;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler(), "/webSocketServer")
            .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*");
        registry.addHandler(webSocketHandler(), "/sockjs/webSocketServer")
            .addInterceptors(new HttpSessionHandshakeInterceptor()).setAllowedOrigins("*").withSockJS();
    }
    @Bean
    public SpringWebSocketHandler webSocketHandler() {
        return new SpringWebSocketHandler(sessionNum);
    }
    @Bean
    public RequestUpgradeStrategy strategy() {
        return new TomcatRequestUpgradeStrategy();
    }
}
public class SpringWebSocketHandler extends TextWebSocketHandler {
    private static final String SESSION_ID = "HTTP.SESSION.ID";
    private Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);
    private Map<String, WebSocketSession> sessions;
    private Pattern URL_PATTERN = Pattern.compile("(\?|&+)(.+?)=([^&]*)");
    public SpringWebSocketHandler(int maxCacheSzie) {
        sessions = Collections.synchronizedMap(new LRUCache<String, WebSocketSession>(maxCacheSzie));
    }
    /**
     * 连接建立之后回调
     * 
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String key = getSessionKey(session);
        sessions.put(key, session);
        logger.info("session[{}] connected, current cached websocket session size: {}", key, sessions.size());
    }
    /**
     * 连接关闭后回调
     * 
     * @param session
     * @param closeStatus
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        String key = getSessionKey(session);
        sessions.remove(key);
        logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", key,
            sessions.size());
    }
    private String getSessionKey(WebSocketSession session) {
        Matcher matcher = URL_PATTERN.matcher(session.getUri().toString());
        String key = null;
        while (matcher.find()) {
            if(Objects.equals(matcher.group(2), "websocketKey")) {
                key = matcher.group(3);
            }
        }
        if (StringUtils.isEmpty(key)) {
            key = String.valueOf(session.getAttributes().get(SESSION_ID));
        }
        return key;
    }
    /**
     * 发送消息处理
     * 
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
    }
    /**
     * 传输错误处理
     * 
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        logger.warn("websocket handleTransportError: {}", exception.getMessage());
        if (session.isOpen()) {
            session.close();
        }
        String sessionId = (String)session.getAttributes().get("HTTP.SESSION.ID");
        sessions.remove(sessionId);
        logger.info("websocket session[{}] connection closed, current cached websocket session size: {}", sessionId,
            sessions.size());
    }
    /**
     * 是否支持处理部分的消息
     * 
     * @return
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    /**
     * 对指定session发送消息
     * 
     * @param sessionId
     * @param message
     */
    public void sendMessageToSession(String sessionId, TextMessage message) {
        WebSocketSession session = sessions.get(sessionId);
        if (session == null) {
            return;
        }
        try {
            if (session.isOpen()) {
                session.sendMessage(message);
            } else {
                sessions.remove(sessionId);
            }
        } catch (IOException e) {
            logger.error("sendMessageToSession error", e);
        }
    }
    /**
     * 群发消息
     * 
     * @param message
     */
    public void sendMessageToSessions(TextMessage message) {
        Iterator<Map.Entry<String, WebSocketSession>> it = sessions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, WebSocketSession> entry = it.next();
            String sessionId = entry.getKey();
            WebSocketSession session = entry.getValue();
            try {
                if (session.isOpen()) {
                    session.sendMessage(message);
                } else {
                    sessions.remove(sessionId);
                }
            } catch (IOException e) {
                logger.error("sendMessageToSessions error", e);
            }
        }
    }
}


添加日志的appender


  1. 定义webconsole appender
@Slf4j
@Plugin(name = "WebConsoleAppender", category = "core", elementType = "appender", printObject = true)
public class WebConsoleLogAppender extends AbstractAppender {
    public WebConsoleLogAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
        final boolean ignoreExceptions, Property[] properties) {
        super(name, filter, layout, ignoreExceptions, properties);
    }
    @Override
    public void append(LogEvent logEvent) {
        String websocketkey = ServiceContext.getWebsocketKey();
        if (StrUtil.isNotEmpty(websocketkey) && !websocketkey.toUpperCase().startsWith("SL")
            && !websocketkey.toUpperCase().startsWith("PKG")) {
            String content = new String(this.getLayout().toByteArray(logEvent));
            LoggerEvent loggerEvent =
                new LoggerEvent(websocketkey, logEvent.getLevel().toString(), "webconsole", content, 50000000);
            WebConsoleLogMsgManager.sendMsg(loggerEvent);
        }
    }
    @PluginFactory
    public static WebConsoleLogAppender createAppender(@PluginAttribute("name") String name,
        @PluginElement("Filter") final Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout,
        @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
        if (name == null) {
            log.warn("No name provided for WebConsoleAppender");
            return null;
        }
        if (layout == null) {
            layout = PatternLayout.createDefaultLayout();
        }
        return new WebConsoleLogAppender(name, filter, layout, ignoreExceptions, null);
    }
}

任何日志打印,该类的append方法都会接受到,然后推送到redis中,详见append方法。

  1. 将appender添加到log4j.xml或者logback.xml中

1671162323228.jpg

1671162329949.jpg


通过redis的消费订阅模式推送日志消息


  1. redis发布订阅配置
@Configuration
@ConditionalOnBean(RedisConfig.class)
public class RedisSubcrbieConfig {
    /**
     * 订阅主题
     */
    public static final String SUB_KEY = "WEB_CONSOLE_LOG";
    @Bean(name = "stringRedisTemplate")
    public RedisTemplate stringRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        return redisTemplate;
    }
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
        MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(SUB_KEY));
        return container;
    }
    @Bean
    MessageListenerAdapter listenerAdapter(WebConsoleLogMsgReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMsg");
    }
}
public interface WebConsoleLogMsgReceiver {
    /**
     * 接受日志消息
     * @param message 日志消息内容
     */
    void receiveMsg(String message);
}
@Component
public class RedisLogMsgReceiver implements WebConsoleLogMsgReceiver {
    @Autowired
    private SpringWebSocketHandler handler;
    @Override
    public void receiveMsg(String message) {
        LoggerEvent loggerEvent = JSON.parseObject(message, LoggerEvent.class);
        TextMessage textMessage = new TextMessage(JSON.toJSONBytes(loggerEvent, SerializerFeature.PrettyFormat));
        handler.sendMessageToSession(loggerEvent.getSessionId(), textMessage);
    }
}
  1. 发布日志消息
public interface WebConsoleLogMsgSender {
    /**
     * 发送webconsole日志数据
     * @param loggerEvent 日志数据
     */
    void send(LoggerEvent loggerEvent);
}
@Component
@Slf4j
public class RedisLogMsgSender implements WebConsoleLogMsgSender {
    @Autowired
    @Qualifier("stringRedisTemplate")
    private RedisTemplate redisTemplate;
    @Override
    public void send(LoggerEvent loggerEvent) {
        redisTemplate.convertAndSend(RedisSubcrbieConfig.SUB_KEY, JSON.toJSONString(loggerEvent));
    }
}
@Slf4j
public class WebConsoleLogMsgManager {
    /**
     * 发送消息
     * @param loggerEvent 日志对象
     */
    public static void sendMsg(LoggerEvent loggerEvent) {
        if(loggerEvent == null) {
            log.warn("logger event is empty");
            return;
        }
        WebConsoleLogMsgSender sender = SdBeanFactory.getBean(WebConsoleLogMsgSender.class);
        sender.send(loggerEvent);
    }
}
  1. 在appender中发布日志消息

1671162359293.jpg


总结


大家想过,这里采用了redis作为发布订阅模式,为什么不是通过kafka的队列呢?

因为应用为了高可用,需要部署多个节点,每个节点去kafkfa消费的group必须不一样,也就是说,sdm每个节点必须都要消费全部的日志。为什么这样?

试想如果应用是多个节点,会存在什么问题呢?比如有两个应用节点, A节点, B节点。 用户点击指标测试按钮,如果sockjs/webSocketServer/info被转发到A节点,如果AB节点共享同一个group的话,此时正好B节点消费了kafka数据,那么A节点就消费不到,转发到A节点的websocket也就获取不到日志数据,无法展示了,所以要保证不同的节点都要消费到kafkfa数据,sdm节点的group要不一样,但是实际情况下,group一般都是比较固定的。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
8月前
使用uniapp实现websocket聊天功能
使用uniapp实现websocket聊天功能
|
3月前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
109 1
|
5月前
|
人工智能 数据库连接 Go
Golang 搭建 WebSocket 应用(五) - 消息推送日志
Golang 搭建 WebSocket 应用(五) - 消息推送日志
49 1
|
5月前
|
Linux C++ Docker
【Azure 应用服务】App Service for Linux 中实现 WebSocket 功能 (Python SocketIO)
【Azure 应用服务】App Service for Linux 中实现 WebSocket 功能 (Python SocketIO)
|
6月前
|
前端开发 JavaScript API
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
【7月更文挑战第17天】现代Web开发趋势中,前后端分离配合WebSocket满足实时通信需求。Django Channels扩展了Django,支持WebSocket连接和异步功能。通过安装Channels、配置设置、定义路由和消费者,能在Django中实现WebSocket交互。前端使用WebSocket API连接后端,实现双向数据流,如在线聊天功能。集成Channels提升Web应用的实时性和用户体验,适应实时交互场景的需求。**
236 6
|
5月前
|
JavaScript 前端开发 网络协议
WebSocket在Java Spring Boot+Vue框架中实现消息推送功能
在现代Web应用中,实时消息提醒是一项非常重要的功能,能够极大地提升用户体验。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为实现实时消息提醒提供了高效且低延迟的解决方案。本文将详细介绍如何在Java Spring Boot后端和Vue前端框架中利用WebSocket实现消息提醒功能。
246 0
|
8月前
|
前端开发
t-io websocket的聊天功能学习记录(二)
t-io websocket的聊天功能学习记录(二)
116 0
|
8月前
t-io websocket的聊天功能学习记录(一)
t-io websocket的聊天功能学习记录(一)
133 0
|
移动开发 监控 网络协议
基于Socket通讯(C#)和WebSocket协议(net)编写的两种聊天功能(文末附源码下载地址)
基于Socket通讯(C#)和WebSocket协议(net)编写的两种聊天功能(文末附源码下载地址)
|
设计模式 前端开发 Java
用WebSocket实现一个简易的群聊功能,so easy
本文主要来讲解如何使用WebSocket来实现一个简易的群聊功能。