基于Java对STOMP服务端进行测试

简介: 在10000用户的并发下,客户端连接用时 5.5秒,500万条数据发送用时 55秒左右

一、目标

上一节我们说了构建SpringBoot + WebSocket+STOMP指定推送消息。我们这一节对它进行测试。

我们预期的并发目标:

  • 支持10000+ 用户
  • 每个用户同时发布 500条数据

我们准备的环境:

  • -Xms512m
  • -Xmx4096m
  • CPU 12核 20线程
  • 内存 16G

二、服务端改动

之前的服务端我们是这样设置的 config.enableSimpleBroker(),这种设置属于入门级使用,它非常简单但仅支持 STOMP 命令的子集(无确认、收据等)。

但我们测试时要求确认订阅状态,来确定订阅是否成功。所以必须对之前的配置类进行改动,来满足要求。

客户端订阅确认配置:

 @Bean
    public ApplicationListener<SessionSubscribeEvent> webSocketEventListener(
            final AbstractSubscribableChannel clientOutboundChannel) {
        return event -> {
            Message<byte[]> message = event.getMessage();
            StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);
            if (stompHeaderAccessor.getReceipt() != null) {
                stompHeaderAccessor.setHeader("stompCommand", StompCommand.RECEIPT);
                stompHeaderAccessor.setReceiptId(stompHeaderAccessor.getReceipt());
                clientOutboundChannel.send(
                        MessageBuilder.createMessage(new byte[0], stompHeaderAccessor.getMessageHeaders()));
            }
        };
    }

三、测试程序

3.1 依赖

  <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-client</artifactId>
            <version>9.4.48.v20220622</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-server</artifactId>
            <version>9.4.48.v20220622</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-webapp</artifactId>
            <version>9.4.48.v20220622</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-client</artifactId>
            <version>9.4.48.v20220622</version>
            <scope>test</scope>
        </dependency>

3.1 用户量和消息数

    //用户量
    private static final int NUMBER_OF_USERS = 10000;

    //消息数
    private static final int BROADCAST_MESSAGE_COUNT = 500;

3.2 测试端口是否可用

        String host = "localhost";
        if (args.length > 0) {
            host = args[0];
        }

        int port = 6060;
        if (args.length > 1) {
            port = Integer.valueOf(args[1]);
        }

        String homeUrl = "http://localhost:6060";
        logger.debug("Sending warm-up HTTP request to " + homeUrl);
        HttpStatus status = new RestTemplate().getForEntity(homeUrl, Void.class, host, port).getStatusCode();
        Assert.state(status == HttpStatus.OK);

3.3 客户端连接、订阅测试

/**
         * CountDownLatch是一种通用的同步工具,可用于多种目的。
         * 用计数1初始化的CountDownLatch用作简单的开/关锁存器或门:所有调用的线程都在门处等待,直到调用countDown的线程打开它。
         * 初始化为N的CountDownLatch可以用来让一个线程等待,直到N个线程完成了某个操作,或者某个操作完成了N次
         */

        //连接锁
        final CountDownLatch connectLatch = new CountDownLatch(NUMBER_OF_USERS);
        //订阅锁
        final CountDownLatch subscribeLatch = new CountDownLatch(NUMBER_OF_USERS);
        //消息锁
        final CountDownLatch messageLatch = new CountDownLatch(NUMBER_OF_USERS);
        //断开连接锁
        final CountDownLatch disconnectLatch = new CountDownLatch(NUMBER_OF_USERS);

        //失败信息监听
        final AtomicReference<Throwable> failure = new AtomicReference<>();

        //链接地址
        String stompUrl = "ws://localhost:6060/pda-message-websocket";

        //构建标准的WebSocket客户端
        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
        //使用 SockJsClient 模拟大量并发用户,需要配置底层 HTTP 客户端(用于 XHR 传输)以允许足够数量的连接和线程
        HttpClient jettyHttpClient = new HttpClient();
        jettyHttpClient.setMaxConnectionsPerDestination(1000);
        jettyHttpClient.setExecutor(new QueuedThreadPool(1000));
        jettyHttpClient.start();

        //创建 SockJS 客户端
        List<Transport> transports = new ArrayList<>();
        transports.add(new WebSocketTransport(webSocketClient));
        //JettyXhrTransport使用 JettyHttpClient进行 HTTP 请求
        transports.add(new JettyXhrTransport(jettyHttpClient));
        SockJsClient sockJsClient = new SockJsClient(transports);

        try {
            //创建任务线程池
            ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
            taskScheduler.afterPropertiesSet();
            //客户端设置
            WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
            //加入消息转换器
            stompClient.setMessageConverter(new StringMessageConverter());
            //设置线程池
            stompClient.setTaskScheduler(taskScheduler);
            //设置默认心跳
            stompClient.setDefaultHeartbeat(new long[] {6000, 6000});

            logger.debug("连接和订阅 " + NUMBER_OF_USERS + " 用户 ");
            //记录每个任务执行时间
            StopWatch stopWatch = new StopWatch("STOMP Broker 中转发 WebSocket 负载测试");
            //开始记录
            stopWatch.start();

            List<ConsumerStompSessionHandler> consumers = new ArrayList<>();
            //循环连接
            for (int i=0; i < NUMBER_OF_USERS; i++) {
                //StompSessionHandler 实现写入
                consumers.add(new ConsumerStompSessionHandler(BROADCAST_MESSAGE_COUNT, connectLatch,
                        subscribeLatch, messageLatch, disconnectLatch, failure));
                //开始连接并订阅
                stompClient.connect(stompUrl,consumers.get(i), host, port);
            }

            if (failure.get() != null) {
                throw new AssertionError("测试失败", failure.get());
            }
            //使当前线程等待,直到锁存器倒计时为零,除非线程被中断或指定的等待时间已过。如果当前计数为零,则此方法立即返回值true。
            if (!connectLatch.await(5000, TimeUnit.MILLISECONDS)) {
                fail("并非所有用户都已连接,其余用户: " + connectLatch.getCount());
            }
            if (!subscribeLatch.await(5000, TimeUnit.MILLISECONDS)) {
                fail("并非所有用户都订阅了,其余用户: " + subscribeLatch.getCount());
            }
            stopWatch.stop();
            logger.debug("已完成: " + stopWatch.getLastTaskTimeMillis() + " 毫秒");
            logger.debug("广播 " + BROADCAST_MESSAGE_COUNT + " 发送的消息 " + NUMBER_OF_USERS + " 用户 ");

3.4 数据发布、断开连接测试

stopWatch.start();

            ProducerStompSessionHandler producer = new ProducerStompSessionHandler(BROADCAST_MESSAGE_COUNT, failure);

            //发布连接
            stompClient.connect(stompUrl, producer, host, port);
            stompClient.setTaskScheduler(taskScheduler);

            if (failure.get() != null) {
                throw new AssertionError("测试失败", failure.get());
            }
            if (!messageLatch.await(60 * 1000, TimeUnit.MILLISECONDS)) {
                for (ConsumerStompSessionHandler consumer : consumers) {
                    if (consumer.messageCount.get() < consumer.expectedMessageCount) {
                        logger.debug(consumer);
                    }
                }
            }

            if (!messageLatch.await(60 * 1000, TimeUnit.MILLISECONDS)) {
                fail("并非所有处理程序都收到了每条消息,其余消息为:" + messageLatch.getCount());
            }
            producer.session.disconnect();
           if (!disconnectLatch.await(5000, TimeUnit.MILLISECONDS)) {
                fail("并非所有断开连接都已完成,剩余:" + disconnectLatch.getCount());
            }

            stopWatch.stop();
            logger.debug("已完成: " + stopWatch.getLastTaskTimeMillis() + " 毫秒");

            System.out.println("\n按任意键退出...");
            System.in.read();
        }
        catch (Throwable t) {
            t.printStackTrace();
        }
        finally {
            jettyHttpClient.stop();
        }

        logger.debug("正在退出");
        System.exit(0);
    }

四、消费者会话处理程序

    private static class ConsumerStompSessionHandler extends StompSessionHandlerAdapter {

        private final int expectedMessageCount;

        private final CountDownLatch connectLatch;

        private final CountDownLatch subscribeLatch;

        private final CountDownLatch messageLatch;

        private final CountDownLatch disconnectLatch;

        private final AtomicReference<Throwable> failure;

        private AtomicInteger messageCount = new AtomicInteger(0);

        //消费者
        public ConsumerStompSessionHandler(int expectedMessageCount, CountDownLatch connectLatch,
                CountDownLatch subscribeLatch, CountDownLatch messageLatch, CountDownLatch disconnectLatch,
                AtomicReference<Throwable> failure) {

            this.expectedMessageCount = expectedMessageCount;
            this.connectLatch = connectLatch;
            this.subscribeLatch = subscribeLatch;
            this.messageLatch = messageLatch;
            this.disconnectLatch = disconnectLatch;
            this.failure = failure;
        }

        @Override
        public void afterConnected(final StompSession session, StompHeaders connectedHeaders) {
            this.connectLatch.countDown();
            session.setAutoReceipt(true);
            //订阅主题
            session.subscribe("/topic/greeting", new StompFrameHandler() {

                /**
                 *在{@link handleFrame(StompHeaders,Object)}之前调用,以确定应将有效负载转换为的Object的类型。
                 * @param headers 信息的标题
                 */
                @Override
                public Type getPayloadType(StompHeaders headers) {
                    return String.class;
                }

                /**
                 * 处理STOMP帧,将有效载荷转换为返回的目标类型 from {@link #getPayloadType(StompHeaders)}.
                 * @param headers 帧的标头
                 * @param payload 有效载荷或{@code null}(如果没有有效载荷)
                 */
                @Override
                public void handleFrame(StompHeaders headers, Object payload) {
                    if (messageCount.incrementAndGet() == expectedMessageCount) {
                        messageLatch.countDown();
                        disconnectLatch.countDown();
                        session.disconnect();
                    }
                }
            }).addReceiptTask(subscribeLatch::countDown);
        }

        @Override
        public void handleTransportError(StompSession session, Throwable exception) {
            logger.error("传输错误", exception);
            this.failure.set(exception);
            if (exception instanceof ConnectionLostException) {
                this.disconnectLatch.countDown();
            }
        }

        @Override
        public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex) {
            logger.error("处理异常", ex);
            this.failure.set(ex);
        }

        @Override
        public void handleFrame(StompHeaders headers, Object payload) {
            Exception ex = new Exception(headers.toString());
            logger.error("STOMP ERROR帧", ex);
            this.failure.set(ex);
        }

        @Override
        public String toString() {
            return "使用者Stomp会话处理程序[消息计数=" + this.messageCount + "]";
        }
    }

五、生产者会话处理程序

    private static class ProducerStompSessionHandler extends StompSessionHandlerAdapter {

        private final int numberOfMessagesToBroadcast;

        private final AtomicReference<Throwable> failure;

        private StompSession session;


        public ProducerStompSessionHandler(int numberOfMessagesToBroadcast, AtomicReference<Throwable> failure) {
            this.numberOfMessagesToBroadcast = numberOfMessagesToBroadcast;
            this.failure = failure;
        }

        @Override
        public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
            this.session = session;
            int i =0;
            try {
                for ( ; i < this.numberOfMessagesToBroadcast; i++) {
                    session.send("/app/greeting", "hello");
                }
            }
            catch (Throwable t) {
                logger.error("在发送消息失败 " + i, t);
                failure.set(t);
            }
        }

        @Override
        public void handleTransportError(StompSession session, Throwable exception) {
            logger.error("传输错误", exception);
            this.failure.set(exception);
        }

        @Override
        public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex) {
            logger.error("处理异常", ex);
            this.failure.set(ex);
        }

        @Override
        public void handleFrame(StompHeaders headers, Object payload) {
            Exception ex = new Exception(headers.toString());
            logger.error("STOMP ERROR帧", ex);
            this.failure.set(ex);
        }
    }

六、测试结果

4009ac1a34054fc8a56a3356847393d8.jpeg

可以看到在10000用户的并发下,客户端连接用时 5.5秒,500万条数据发送用时 55秒左右。

目录
相关文章
|
14天前
|
Java 测试技术 Maven
Java一分钟之-PowerMock:静态方法与私有方法测试
通过本文的详细介绍,您可以使用PowerMock轻松地测试Java代码中的静态方法和私有方法。PowerMock通过扩展Mockito,提供了强大的功能,帮助开发者在复杂的测试场景中保持高效和准确的单元测试。希望本文对您的Java单元测试有所帮助。
27 2
|
22天前
|
Java 程序员 测试技术
Java|让 JUnit4 测试类自动注入 logger 和被测 Service
本文介绍如何通过自定义 IDEA 的 JUnit4 Test Class 模板,实现生成测试类时自动注入 logger 和被测 Service。
21 5
|
1月前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
37 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
23天前
|
JSON Java 开发工具
Java服务端集成Google FCM推送的注意事项和实际经验
本文分享了作者在公司APP海外发布过程中,选择Google FCM进行消息推送的集成经验。文章详细解析了Java集成FCM推送的多种实现方式,包括HTTP请求和SDK集成,并指出了通知栏消息和透传消息的区别与应用场景。同时,作者还探讨了Firebase项目的创建、配置和服务端集成的注意事项,帮助读者解决文档混乱和选择困难的问题。
43 1
|
27天前
|
存储 人工智能 Java
将 Spring AI 与 LLM 结合使用以生成 Java 测试
AIDocumentLibraryChat 项目通过 GitHub URL 为指定的 Java 类生成测试代码,支持 granite-code 和 deepseek-coder-v2 模型。项目包括控制器、服务和配置,能处理源代码解析、依赖加载及测试代码生成,旨在评估 LLM 对开发测试的支持能力。
35 1
|
1月前
|
XML Java Maven
在 Cucumber 测试中自动将 Cucumber 数据表映射到 Java 对象
在 Cucumber 测试中自动将 Cucumber 数据表映射到 Java 对象
53 7
|
1月前
|
JSON Java 开发工具
Java服务端集成Google FCM推送的注意事项和实际经验
公司的app要上海外,涉及到推送功能,经过综合考虑,选择Google FCM进行消息推送。 查看一些集成博客和官方文档,看的似懂非懂,迷迷惑惑。本篇文章除了将我实际集成的经验分享出来,也会对看到的博客及其中产生的疑惑、注意事项一一评论。 从官方文档和众多博客中,你会发现Java集成FCM推送有多种实现方式,会让生产生文档很乱,不知作何选择的困惑。
77 0
|
1月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
31 0
|
2月前
|
SQL JavaScript 前端开发
基于Java访问Hive的JUnit5测试代码实现
根据《用Java、Python来开发Hive应用》一文,建立了使用Java、来开发Hive应用的方法,产生的代码如下
71 6
|
1月前
|
算法 Java 测试技术
数据结构 —— Java自定义代码实现顺序表,包含测试用例以及ArrayList的使用以及相关算法题
文章详细介绍了如何用Java自定义实现一个顺序表类,包括插入、删除、获取数据元素、求数据个数等功能,并对顺序表进行了测试,最后还提及了Java中自带的顺序表实现类ArrayList。
19 0