一、目标
上一节我们说了构建SpringBoot + WebSocket+STOMP指定推送消息。我们这一节对它进行测试。
我们预期的并发目标:
- 支持10000+ 用户
- 每个用户同时发布 500条数据
我们准备的环境:
- -Xms512m
- -Xmx4096m
- CPU 12核 20线程
- 内存 16G
二、服务端改动
之前的服务端我们是这样设置的 config.enableSimpleBroker()
,这种设置属于入门级使用,它非常简单但仅支持 STOMP 命令的子集(无确认、收据等)。
但我们测试时要求确认订阅状态,来确定订阅是否成功。所以必须对之前的配置类进行改动,来满足要求。
客户端订阅确认配置:
@Bean
public ApplicationListener<SessionSubscribeEvent> webSocketEventListener(
final AbstractSubscribableChannel clientOutboundChannel) {
return event -> {
Message<byte[]> message = event.getMessage();
StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);
if (stompHeaderAccessor.getReceipt() != null) {
stompHeaderAccessor.setHeader("stompCommand", StompCommand.RECEIPT);
stompHeaderAccessor.setReceiptId(stompHeaderAccessor.getReceipt());
clientOutboundChannel.send(
MessageBuilder.createMessage(new byte[0], stompHeaderAccessor.getMessageHeaders()));
}
};
}
三、测试程序
3.1 依赖
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>9.4.48.v20220622</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>9.4.48.v20220622</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>9.4.48.v20220622</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>9.4.48.v20220622</version>
<scope>test</scope>
</dependency>
3.1 用户量和消息数
//用户量
private static final int NUMBER_OF_USERS = 10000;
//消息数
private static final int BROADCAST_MESSAGE_COUNT = 500;
3.2 测试端口是否可用
String host = "localhost";
if (args.length > 0) {
host = args[0];
}
int port = 6060;
if (args.length > 1) {
port = Integer.valueOf(args[1]);
}
String homeUrl = "http://localhost:6060";
logger.debug("Sending warm-up HTTP request to " + homeUrl);
HttpStatus status = new RestTemplate().getForEntity(homeUrl, Void.class, host, port).getStatusCode();
Assert.state(status == HttpStatus.OK);
3.3 客户端连接、订阅测试
/**
* CountDownLatch是一种通用的同步工具,可用于多种目的。
* 用计数1初始化的CountDownLatch用作简单的开/关锁存器或门:所有调用的线程都在门处等待,直到调用countDown的线程打开它。
* 初始化为N的CountDownLatch可以用来让一个线程等待,直到N个线程完成了某个操作,或者某个操作完成了N次
*/
//连接锁
final CountDownLatch connectLatch = new CountDownLatch(NUMBER_OF_USERS);
//订阅锁
final CountDownLatch subscribeLatch = new CountDownLatch(NUMBER_OF_USERS);
//消息锁
final CountDownLatch messageLatch = new CountDownLatch(NUMBER_OF_USERS);
//断开连接锁
final CountDownLatch disconnectLatch = new CountDownLatch(NUMBER_OF_USERS);
//失败信息监听
final AtomicReference<Throwable> failure = new AtomicReference<>();
//链接地址
String stompUrl = "ws://localhost:6060/pda-message-websocket";
//构建标准的WebSocket客户端
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
//使用 SockJsClient 模拟大量并发用户,需要配置底层 HTTP 客户端(用于 XHR 传输)以允许足够数量的连接和线程
HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));
jettyHttpClient.start();
//创建 SockJS 客户端
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(webSocketClient));
//JettyXhrTransport使用 JettyHttpClient进行 HTTP 请求
transports.add(new JettyXhrTransport(jettyHttpClient));
SockJsClient sockJsClient = new SockJsClient(transports);
try {
//创建任务线程池
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
//客户端设置
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
//加入消息转换器
stompClient.setMessageConverter(new StringMessageConverter());
//设置线程池
stompClient.setTaskScheduler(taskScheduler);
//设置默认心跳
stompClient.setDefaultHeartbeat(new long[] {6000, 6000});
logger.debug("连接和订阅 " + NUMBER_OF_USERS + " 用户 ");
//记录每个任务执行时间
StopWatch stopWatch = new StopWatch("STOMP Broker 中转发 WebSocket 负载测试");
//开始记录
stopWatch.start();
List<ConsumerStompSessionHandler> consumers = new ArrayList<>();
//循环连接
for (int i=0; i < NUMBER_OF_USERS; i++) {
//StompSessionHandler 实现写入
consumers.add(new ConsumerStompSessionHandler(BROADCAST_MESSAGE_COUNT, connectLatch,
subscribeLatch, messageLatch, disconnectLatch, failure));
//开始连接并订阅
stompClient.connect(stompUrl,consumers.get(i), host, port);
}
if (failure.get() != null) {
throw new AssertionError("测试失败", failure.get());
}
//使当前线程等待,直到锁存器倒计时为零,除非线程被中断或指定的等待时间已过。如果当前计数为零,则此方法立即返回值true。
if (!connectLatch.await(5000, TimeUnit.MILLISECONDS)) {
fail("并非所有用户都已连接,其余用户: " + connectLatch.getCount());
}
if (!subscribeLatch.await(5000, TimeUnit.MILLISECONDS)) {
fail("并非所有用户都订阅了,其余用户: " + subscribeLatch.getCount());
}
stopWatch.stop();
logger.debug("已完成: " + stopWatch.getLastTaskTimeMillis() + " 毫秒");
logger.debug("广播 " + BROADCAST_MESSAGE_COUNT + " 发送的消息 " + NUMBER_OF_USERS + " 用户 ");
3.4 数据发布、断开连接测试
stopWatch.start();
ProducerStompSessionHandler producer = new ProducerStompSessionHandler(BROADCAST_MESSAGE_COUNT, failure);
//发布连接
stompClient.connect(stompUrl, producer, host, port);
stompClient.setTaskScheduler(taskScheduler);
if (failure.get() != null) {
throw new AssertionError("测试失败", failure.get());
}
if (!messageLatch.await(60 * 1000, TimeUnit.MILLISECONDS)) {
for (ConsumerStompSessionHandler consumer : consumers) {
if (consumer.messageCount.get() < consumer.expectedMessageCount) {
logger.debug(consumer);
}
}
}
if (!messageLatch.await(60 * 1000, TimeUnit.MILLISECONDS)) {
fail("并非所有处理程序都收到了每条消息,其余消息为:" + messageLatch.getCount());
}
producer.session.disconnect();
if (!disconnectLatch.await(5000, TimeUnit.MILLISECONDS)) {
fail("并非所有断开连接都已完成,剩余:" + disconnectLatch.getCount());
}
stopWatch.stop();
logger.debug("已完成: " + stopWatch.getLastTaskTimeMillis() + " 毫秒");
System.out.println("\n按任意键退出...");
System.in.read();
}
catch (Throwable t) {
t.printStackTrace();
}
finally {
jettyHttpClient.stop();
}
logger.debug("正在退出");
System.exit(0);
}
四、消费者会话处理程序
private static class ConsumerStompSessionHandler extends StompSessionHandlerAdapter {
private final int expectedMessageCount;
private final CountDownLatch connectLatch;
private final CountDownLatch subscribeLatch;
private final CountDownLatch messageLatch;
private final CountDownLatch disconnectLatch;
private final AtomicReference<Throwable> failure;
private AtomicInteger messageCount = new AtomicInteger(0);
//消费者
public ConsumerStompSessionHandler(int expectedMessageCount, CountDownLatch connectLatch,
CountDownLatch subscribeLatch, CountDownLatch messageLatch, CountDownLatch disconnectLatch,
AtomicReference<Throwable> failure) {
this.expectedMessageCount = expectedMessageCount;
this.connectLatch = connectLatch;
this.subscribeLatch = subscribeLatch;
this.messageLatch = messageLatch;
this.disconnectLatch = disconnectLatch;
this.failure = failure;
}
@Override
public void afterConnected(final StompSession session, StompHeaders connectedHeaders) {
this.connectLatch.countDown();
session.setAutoReceipt(true);
//订阅主题
session.subscribe("/topic/greeting", new StompFrameHandler() {
/**
*在{@link handleFrame(StompHeaders,Object)}之前调用,以确定应将有效负载转换为的Object的类型。
* @param headers 信息的标题
*/
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
/**
* 处理STOMP帧,将有效载荷转换为返回的目标类型 from {@link #getPayloadType(StompHeaders)}.
* @param headers 帧的标头
* @param payload 有效载荷或{@code null}(如果没有有效载荷)
*/
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (messageCount.incrementAndGet() == expectedMessageCount) {
messageLatch.countDown();
disconnectLatch.countDown();
session.disconnect();
}
}
}).addReceiptTask(subscribeLatch::countDown);
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
logger.error("传输错误", exception);
this.failure.set(exception);
if (exception instanceof ConnectionLostException) {
this.disconnectLatch.countDown();
}
}
@Override
public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex) {
logger.error("处理异常", ex);
this.failure.set(ex);
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
Exception ex = new Exception(headers.toString());
logger.error("STOMP ERROR帧", ex);
this.failure.set(ex);
}
@Override
public String toString() {
return "使用者Stomp会话处理程序[消息计数=" + this.messageCount + "]";
}
}
五、生产者会话处理程序
private static class ProducerStompSessionHandler extends StompSessionHandlerAdapter {
private final int numberOfMessagesToBroadcast;
private final AtomicReference<Throwable> failure;
private StompSession session;
public ProducerStompSessionHandler(int numberOfMessagesToBroadcast, AtomicReference<Throwable> failure) {
this.numberOfMessagesToBroadcast = numberOfMessagesToBroadcast;
this.failure = failure;
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
this.session = session;
int i =0;
try {
for ( ; i < this.numberOfMessagesToBroadcast; i++) {
session.send("/app/greeting", "hello");
}
}
catch (Throwable t) {
logger.error("在发送消息失败 " + i, t);
failure.set(t);
}
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
logger.error("传输错误", exception);
this.failure.set(exception);
}
@Override
public void handleException(StompSession s, StompCommand c, StompHeaders h, byte[] p, Throwable ex) {
logger.error("处理异常", ex);
this.failure.set(ex);
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
Exception ex = new Exception(headers.toString());
logger.error("STOMP ERROR帧", ex);
this.failure.set(ex);
}
}
六、测试结果
可以看到在10000用户的并发下,客户端连接用时 5.5秒,500万条数据发送用时 55秒左右。