Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程

简介: Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程

前言



如果你还没有了解过websocket,关于整合websocket的简单入门使用,可以先看看我这篇:


《SpringBoot 整合WebSocket 简单实战案例》


ok,若你是通过关键字搜索到该篇的,那么我清楚你想了解 什么,这也是我写该篇文章的目的。


我想让所有人对springboot整合 webSocket 再也不陌生,再也不怕去整合,包括使用rabbitmq等等,使用SendTo,SendToUser等等,你所迷惑的,在这篇文章,我都能给你解惑。


这是一篇结合实例以及代码简析的一篇教程文章,我个人非常啰嗦,所以没有耐心的读者可以跳跃性阅读。


正文


那么该篇内容包括什么?


1.使用stomp协议


2.使用rabbitmq作为消息代理


3.使用rabbitmq接收发送消息


4.推送消息的确认,消费消息的确认


5.简单的页面配合收发消息测试


6. 通过连接websocekt时传递个人信息参数,后台接收(拦截获取参数)


7.一对一发收,一对多发收


实例项目结构:


image.png


大致涉及内容的作用简略图,具体跟着案例代码了解:


image.png


开始进入到我们的实例教程,


首先pom文件引入使用到的依赖:


        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <!-- RabbitMQ Starter Dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- Following additional dependency is required for Full Featured STOMP Broker Relay -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </dependency>


ps:springboot该篇用的2.0.0.RELEASE,java1.8


然后创建WebSocketConfig.java :


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
GetHeaderParamInterceptor getHeaderParamInterceptor;
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*")
                .withSockJS();
        //暴露多个节点,就一样直接addEndpoint 就可以
//        registry.addEndpoint("/alone") .setAllowedOrigins("*")
//               .withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
   //如果不使用rabbitmq作为消息代理,那么只需要暴露简单节点即可
   // 默认的Simple Broker
   //   registry.enableSimpleBroker("/topic","/user");
   //  registry.setApplicationDestinationPrefixes("/app");
        // 使用RabbitMQ做为消息代理,替换默认的Simple Broker
        //定义了服务端接收地址的前缀,也即客户端给服务端发消息的地址前缀,@SendTo(XXX) 也可以重定向
         registry.setUserDestinationPrefix("/user"); //这是给sendToUser使用,前端订阅需要加上/user
         registry.setApplicationDestinationPrefixes("/app"); //这是给客户端推送消息到服务器使用 ,推送的接口加上/app
        // "STOMP broker relay"处理所有消息将消息发送到外部的消息代理
        registry.enableStompBrokerRelay("/exchange","/topic","/queue","/amq/queue")
                .setVirtualHost("JCChost") //对应自己rabbitmq里的虚拟host
                .setRelayHost("localhost")
                .setClientLogin("root")
                .setClientPasscode("root")
                .setSystemLogin("root")
                .setSystemPasscode("root")
                .setSystemHeartbeatSendInterval(5000)
                .setSystemHeartbeatReceiveInterval(4000);
    }
    /**
     * 采用自定义拦截器,获取connect时候传递的参数
     *
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(getHeaderParamInterceptor);
    }
}


WebSocketConfig 代码简析及注意点 :


1.@EnableWebSocketMessageBroker


这个注解作用是为了 是程序能够识别出 连接了 暴露的stomp节点 发送过来的 请求。 配合 @MessageMapping 是用 。(后面有介绍)


2.registry.addEndpoint("/ws")


这就是暴露节点,就是可以让客户端连接到websocket的节点。 如果需要暴露多个,就一样使用registry.addEndpoint("/XXX")即可。


3.configureMessageBroker


配置消息代理,前言说到我们该篇会使用rabbitmq来作为消息代理,所以咱们配置的代码就是使用的rabbitmq。


如果不想整合rabbitmq的话,单纯开启简单的消息代理,两三行代码即可。


registry.enableSimpleBroker("/topic","/queue");
registry.setApplicationDestinationPrefixes("/app");
registry.setUserDestinationPrefix("/user"); 


registry.enableSimpleBroker("/topic","/queue") :


意思是在暴露的节点上,消息代理即将会处理 前缀为 /topic 和 /queue 的请求消息。 (可以理解为,服务端给客户端推送消息使用)


registry.setApplicationDestinationPrefixes("/app") :


意思是客户端程序访问服务器,需带有/app 前缀,那么这些带有/app的消息就会匹配到@MessageMapping注解的方法上。


(可以理解为客户端向服务端推送消息使用)

registry.setUserDestinationPrefix("/user") :


这个就比较好玩,就是服务端指定给用户一对一推送消息,使用 sendToUser 方法时,会帮我们默认拼接上 /user,所以客户端也需要订阅相关的/user前缀的主题才能正常接收。(后面代码还会介绍到)


而我们该篇是使用rabbitmq作为消息代理,所以咱们的代码是:


这些代码就没啥还解释了,上面都说了核心的几个代码意义了,其余就是连接rabbbitmq的一些相关信息,账号密码,虚拟host等。


         registry.setUserDestinationPrefix("/user"); //这是给sendToUser使用,前端订阅需要加上/user
         registry.setApplicationDestinationPrefixes("/app"); //这是给客户端推送消息到服务器使用 ,推送的接口加上/app
        // "STOMP broker relay"处理所有消息将消息发送到外部的消息代理
        registry.enableStompBrokerRelay("/exchange","/topic","/queue","/amq/queue")
                .setVirtualHost("JCChost")
                .setRelayHost("localhost")
                .setClientLogin("root")
                .setClientPasscode("root")
                .setSystemLogin("root")
                .setSystemPasscode("root")
                .setSystemHeartbeatSendInterval(5000)
                .setSystemHeartbeatReceiveInterval(4000);


再来看 configureClientInboundChannel ,


这个就比较关键,使用客户端绑定配置,咱们绑定了一个拦截器 getHeaderParamInterceptor ,从命名看就知道,就是为了拦截一些参数。


没错,为了识别客户端连接到咱们的websocket上,到底是谁? 我们该篇采取的就是,让客户端连接时携带自己的个人唯一识别信息过来,在建立连接时,取出该参数,为其实现用户信息认证设置。


需要创建GetHeaderParamInterceptor.java:


import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.Map;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
@Component
public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                Object name = ((Map) raw).get("username"); //取出客户端携带的参数
                System.out.println(name);
                if (name instanceof LinkedList) {
                    // 设置当前访问的认证用户
                    accessor.setUser(new UserPrincipal(((LinkedList) name).get(0).toString()));
                }
            }
        }
        return message;
    }
}


为了完成用户信息的设置,需要使用到Principal ,所以我们还需要创建 UserPrincipal.java:


import java.security.Principal;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
public class UserPrincipal implements Principal {
    private final String name;
    public UserPrincipal(String name) {
        this.name = name;
    }
    @Override
    public String getName() {
        return name;
    }
}



ok,也就是说关于websocket的配置,我们创建了上面的三个文件(继续敲代码前,先自己回顾一下这三个东西的作用):


image.png


接下来,我们需要为项目整合上 RabbitMQ。


创建MyRabbitConfig.java:


提前说下,这个关于rabbitmq的所有东西,我暂且都配置在这里。


import cn.jc.springbootwebsocketdemo.service.ChatService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class MyRabbitConfig {
    @Autowired
    ChatService chatService;
    //绑定键
    public final static String msgTopicKey = "topic.public";
    //队列
    public final static String msgTopicQueue = "topicQueue";
    @Bean
    public Queue topicQueue() {
        return new Queue(MyRabbitConfig.msgTopicQueue,true);
    }
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicWebSocketExchange",true,false);
    }
    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(topicQueue()).to(exchange()).with(msgTopicKey);
    }
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setVirtualHost("JCChost");
        connectionFactory.setPublisherConfirms(true); // 发送消息回调,必须要设置
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
            }
        });
        return rabbitTemplate;
    }
    /**
     * 接受消息的监听,这个监听会接受消息队列topicQueue的消息
     * 针对消费者配置
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(topicQueue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                String msg = new String(body);
                System.out.println("rabbitmq收到消息 : " +msg);
                Boolean sendToWebsocket = chatService.sendMsg(msg);
                if (sendToWebsocket){
                    System.out.println("消息处理成功! 已经推送到websocket!");
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //确认消息成功消费
                }
            }
        });
        return container;
    }
}


看到这么多代码是不是一下子有点懵,没事,看我的MyRabbitConfig代码简析 :


1.连接rabbitmq代码


image.png


2.简单使用一下主题交换机


image.png


(其实可以配置更多的交换机,对于使用rabbitmq方面,完全没接触过的小伙伴,不妨先看看这篇文章,《Springboot 整合RabbitMq ,用心看完这一篇就够了》


3.配置生产者推送消息触发的回调函数


image.png


4.消费者监听器,采取开启手动确认


image.png


ok,到这里我们可以看到整合上rabbitmq的相关内容,完毕。


image.png


那么还有接收消息使用到的实体类,处理消息的业务类,也就是


ChatMessage.java:


这里实体类比较简单,大家可以自己扩展符合自己消息内容场景的。


/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
public class ChatMessage {
    private MessageType type;
    private String content;
    private String sender;
    private String to;
    public enum MessageType {
        CHAT,
        JOIN,
        LEAVE
    }
    @Override
    public String toString() {
        return "ChatMessage{" +
                "type=" + type +
                ", content='" + content + '\'' +
                ", sender='" + sender + '\'' +
                ", to='" + to + '\'' +
                '}';
    }
    public MessageType getType() {
        return type;
    }
    public void setType(MessageType type) {
        this.type = type;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
    public String getSender() {
        return sender;
    }
    public void setSender(String sender) {
        this.sender = sender;
    }
    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
}


然后是ChatService.java:


import cn.jc.springbootwebsocketdemo.model.ChatMessage;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
@Service
public class ChatService {
    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;
    public Boolean sendMsg(String msg) {
        try {
            JSONObject msgJson = JSONObject.parseObject(msg);
            if (msgJson.getString("to").equals("all") && msgJson.getString("type").equals(ChatMessage.MessageType.CHAT.toString())){
                simpMessageSendingOperations.convertAndSend("/topic/public", msgJson);
            }else if (msgJson.getString("to").equals("all") && msgJson.getString("type").equals(ChatMessage.MessageType.JOIN.toString())) {
                simpMessageSendingOperations.convertAndSend("/topic/public", msgJson);
            }else if(msgJson.getString("to").equals("all") &&  msgJson.getString("type").equals(ChatMessage.MessageType.LEAVE.toString())) {
                simpMessageSendingOperations.convertAndSend("/topic/public", msgJson);
            }else if (!msgJson.getString("to").equals("all") &&  msgJson.getString("type").equals(ChatMessage.MessageType.CHAT.toString())){
                simpMessageSendingOperations.convertAndSendToUser(msgJson.getString("to"),"/topic/msg", msgJson);
            }
        } catch (MessagingException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}


这个很关键,该实例的点对点推送消息,群发的处理代码都在这个业务处理类里面了,

那么也是稍微做下代码简析:


image.png


简单点就是 convertAndSend 和 convertAndSendToUser的使用, 一个是群发,一个是单发(单发会在主题前拼接/user)


还有使用到的json转换工具类,其实也就是fastjosn的方法:


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
public final class JsonUtil {
    /**
     * 把Java对象转换成json字符串
     *
     * @param object 待转化为JSON字符串的Java对象
     * @return json 串 or null
     */
    public static String parseObjToJson(Object object) {
        String string = null;
        try {
            string = JSONObject.toJSONString(object);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return string;
    }
    /**
     * 将Json字符串信息转换成对应的Java对象
     *
     * @param json json字符串对象
     * @param c    对应的类型
     */
    public static <T> T parseJsonToObj(String json, Class<T> c) {
        try {
            JSONObject jsonObject = JSON.parseObject(json);
            return JSON.toJavaObject(jsonObject, c);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return null;
    }
}



这一小段咱们完成了这几个文件:


image.png


好了,到这里,我们后端的代码也就完成了。


噢对了,还有我们的一个较为关键的,使用@MessageMapping 的 controller,


创建ChatController.java:


import cn.jc.springbootwebsocketdemo.model.ChatMessage;
import cn.jc.springbootwebsocketdemo.util.JsonUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import java.security.Principal;
/**
 * @Author : JCccc
 * @CreateTime : 2020/8/26
 * @Description :
 **/
@Controller
public class ChatController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 服务端推送给单人的接口
     * @param uid
     * @param content
     */
    @ResponseBody
    @GetMapping("/sendToOne")
    public void sendToOne(@RequestParam("uid") String uid,@RequestParam("content") String content ){
        ChatMessage chatMessage=new ChatMessage();
        chatMessage.setType(ChatMessage.MessageType.CHAT);
        chatMessage.setContent(content);
        chatMessage.setTo(uid);
        chatMessage.setSender("系统消息");
        rabbitTemplate.convertAndSend("topicWebSocketExchange","topic.public", JsonUtil.parseObjToJson(chatMessage));
    }
    /**
     * 接收 客户端传过来的消息 通过setSender和type 来判别时单发还是群发
     * @param chatMessage
     * @param principal
     */
    @MessageMapping("/chat.sendMessageTest")
    public void sendMessageTest(@Payload ChatMessage chatMessage, Principal principal) {
        try {
            String name = principal.getName();
            chatMessage.setSender(name);
            rabbitTemplate.convertAndSend("topicWebSocketExchange","topic.public", JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
    /**
     * 接收 客户端传过来的消息 上线消息
     * @param chatMessage
     */
    @MessageMapping("/chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage) {
        System.out.println("有用户加入到了websocket 消息室" + chatMessage.getSender());
        try {
            System.out.println(chatMessage.toString());
            rabbitTemplate.convertAndSend("topicWebSocketExchange","topic.public", JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

我们也稍微做个代码简析:image.png那么为了做个简单的实力测试,我们也搞了点页面:image.pngmain.css:

* {
    -webkit-box-sizing: border-box;
    -moz-box-sizing: border-box;
    box-sizing: border-box;
}
html,body {
    height: 100%;
    overflow: hidden;
}
body {
    margin: 0;
    padding: 0;
    font-weight: 400;
    font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
    font-size: 1rem;
    line-height: 1.58;
    color: #333;
    background-color: #f4f4f4;
    height: 100%;
}
body:before {
    height: 50%;
    width: 100%;
    position: absolute;
    top: 0;
    left: 0;
    background: #128ff2;
    content: "";
    z-index: 0;
}
.clearfix:after {
    display: block;
    content: "";
    clear: both;
}
.hidden {
    display: none;
}
.form-control {
    width: 100%;
    min-height: 38px;
    font-size: 15px;
    border: 1px solid #c8c8c8;
}
.form-group {
    margin-bottom: 15px;
}
input {
    padding-left: 10px;
    outline: none;
}
h1, h2, h3, h4, h5, h6 {
    margin-top: 20px;
    margin-bottom: 20px;
}
h1 {
    font-size: 1.7em;
}
a {
    color: #128ff2;
}
button {
    box-shadow: none;
    border: 1px solid transparent;
    font-size: 14px;
    outline: none;
    line-height: 100%;
    white-space: nowrap;
    vertical-align: middle;
    padding: 0.6rem 1rem;
    border-radius: 2px;
    transition: all 0.2s ease-in-out;
    cursor: pointer;
    min-height: 38px;
}
button.default {
    background-color: #e8e8e8;
    color: #333;
    box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
}
button.primary {
    background-color: #128ff2;
    box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
    color: #fff;
}
button.accent {
    background-color: #ff4743;
    box-shadow: 0 2px 2px 0 rgba(0, 0, 0, 0.12);
    color: #fff;
}
#username-page {
    text-align: center;
}
.username-page-container {
    background: #fff;
    box-shadow: 0 1px 11px rgba(0, 0, 0, 0.27);
    border-radius: 2px;
    width: 100%;
    max-width: 500px;
    display: inline-block;
    margin-top: 42px;
    vertical-align: middle;
    position: relative;
    padding: 35px 55px 35px;
    min-height: 250px;
    position: absolute;
    top: 50%;
    left: 0;
    right: 0;
    margin: 0 auto;
    margin-top: -160px;
}
.username-page-container .username-submit {
    margin-top: 10px;
}
#chat-page {
    position: relative;
    height: 100%;
}
.chat-container {
    max-width: 700px;
    margin-left: auto;
    margin-right: auto;
    background-color: #fff;
    box-shadow: 0 1px 11px rgba(0, 0, 0, 0.27);
    margin-top: 30px;
    height: calc(100% - 60px);
    max-height: 600px;
    position: relative;
}
#chat-page ul {
    list-style-type: none;
    background-color: #FFF;
    margin: 0;
    overflow: auto;
    overflow-y: scroll;
    padding: 0 20px 0px 20px;
    height: calc(100% - 150px);
}
#chat-page #messageForm {
    padding: 20px;
}
#chat-page ul li {
    line-height: 1.5rem;
    padding: 10px 20px;
    margin: 0;
    border-bottom: 1px solid #f4f4f4;
}
#chat-page ul li p {
    margin: 0;
}
#chat-page .event-message {
    width: 100%;
    text-align: center;
    clear: both;
}
#chat-page .event-message p {
    color: #777;
    font-size: 14px;
    word-wrap: break-word;
}
#chat-page .chat-message {
    padding-left: 68px;
    position: relative;
}
#chat-page .chat-message i {
    position: absolute;
    width: 42px;
    height: 42px;
    overflow: hidden;
    left: 10px;
    display: inline-block;
    vertical-align: middle;
    font-size: 18px;
    line-height: 42px;
    color: #fff;
    text-align: center;
    border-radius: 50%;
    font-style: normal;
    text-transform: uppercase;
}
#chat-page .chat-message span {
    color: #333;
    font-weight: 600;
}
#chat-page .chat-message p {
    color: #43464b;
}
#messageForm .input-group input {
    float: left;
    width: calc(100% - 85px);
}
#messageForm .input-group button {
    float: left;
    width: 80px;
    height: 38px;
    margin-left: 5px;
}
.chat-header {
    text-align: center;
    padding: 15px;
    border-bottom: 1px solid #ececec;
}
.chat-header h2 {
    margin: 0;
    font-weight: 500;
}
.connecting {
    padding-top: 5px;
    text-align: center;
    color: #777;
    position: absolute;
    top: 65px;
    width: 100%;
}
@media screen and (max-width: 730px) {
    .chat-container {
        margin-left: 10px;
        margin-right: 10px;
        margin-top: 10px;
    }
}
@media screen and (max-width: 480px) {
    .chat-container {
        height: calc(100% - 30px);
    }
    .username-page-container {
        width: auto;
        margin-left: 15px;
        margin-right: 15px;
        padding: 25px;
    }
    #chat-page ul {
        height: calc(100% - 120px);
    }
    #messageForm .input-group button {
        width: 65px;
    }
    #messageForm .input-group input {
        width: calc(100% - 70px);
    }
    .chat-header {
        padding: 10px;
    }
    .connecting {
        top: 60px;
    }
    .chat-header h2 {
        font-size: 1.1em;
    }
}

main.js :

'use strict';
var usernamePage = document.querySelector('#username-page');
var chatPage = document.querySelector('#chat-page');
var usernameForm = document.querySelector('#usernameForm');
var messageForm = document.querySelector('#messageForm');
var messageInput = document.querySelector('#message');
var messageArea = document.querySelector('#messageArea');
var connectingElement = document.querySelector('.connecting');
var stompClient = null;
var username = null;
var colors = [
    '#2196F3', '#32c787', '#00BCD4', '#ff5652',
    '#ffc107', '#ff85af', '#FF9800', '#39bbb0'
];
function connect(event) {
    username = document.querySelector('#name').value.trim();
    if(username) {
        usernamePage.classList.add('hidden');
        chatPage.classList.remove('hidden');
        var socket = new SockJS('/ws');
        stompClient = Stomp.over(socket);
        stompClient.connect({
            username:username
        }, onConnected, onError);
    }
    event.preventDefault();
}
function onConnected() {
    // Subscribe to the Public Topic
    stompClient.subscribe('/topic/public', onMessageReceived);
    //额外订阅了mike频道,暂时当做自己的频道,别人需要特意往这个频道发送消息,才能完成 ‘单对单’
    stompClient.subscribe('/user/topic/msg', onMessageReceived);
    // Tell your username to the server
    stompClient.send("/app/chat.addUser",
        {},
        JSON.stringify({sender: username, type: 'JOIN',to:'all'})
    )
    connectingElement.classList.add('hidden');
}
function onError(error) {
    connectingElement.textContent = 'Could not connect to WebSocket server. Please refresh this page to try again!';
    connectingElement.style.color = 'red';
}
function sendMessage(event) {
    var messageContent = messageInput.value.trim();
    if(messageContent && stompClient) {
        var chatMessage = {
            sender: username,
            content: messageInput.value,
            type: 'CHAT',
            to:'all'
        };
        console.log("----- :"+ messageInput.value);
        stompClient.send("/app/chat.sendMessageTest", {}, JSON.stringify(chatMessage));
        messageInput.value = '';
    }
    event.preventDefault();
}
function onMessageReceived(payload) {
    var message = JSON.parse(payload.body);
    var messageElement = document.createElement('li');
    if(message.type === 'JOIN') {
        messageElement.classList.add('event-message');
        message.content = message.sender + ' 上线~!';
    } else if (message.type === 'LEAVE') {
        messageElement.classList.add('event-message');
        message.content = message.sender + ' 离线了!';
    } else {
        messageElement.classList.add('chat-message');
        var avatarElement = document.createElement('i');
        var avatarText = document.createTextNode(message.sender[0]);
        avatarElement.appendChild(avatarText);
        avatarElement.style['background-color'] = getAvatarColor(message.sender);
        messageElement.appendChild(avatarElement);
        var usernameElement = document.createElement('span');
        var usernameText = document.createTextNode(message.sender);
        usernameElement.appendChild(usernameText);
        messageElement.appendChild(usernameElement);
    }
    var textElement = document.createElement('p');
    var messageText = document.createTextNode('消息:'+message.content);
    textElement.appendChild(messageText);
    messageElement.appendChild(textElement);
    messageArea.appendChild(messageElement);
    messageArea.scrollTop = messageArea.scrollHeight;
}
function getAvatarColor(messageSender) {
    var hash = 0;
    for (var i = 0; i < messageSender.length; i++) {
        hash = 31 * hash + messageSender.charCodeAt(i);
    }
    var index = Math.abs(hash % colors.length);
    return colors[index];
}
usernameForm.addEventListener('submit', connect, true)
messageForm.addEventListener('submit', sendMessage, true)


这里我们需要注意一些用法:


连接方法:


image.png


同时在连接时,触发的方法OnConnected:


image.png


发送消息方法sendMessage:


image.png


最后时咱们的客户端测试页面,index.html:


<!DOCTYPE html>
<html>
  <head>
      <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
      <meta name="viewport" content="width=device-width, initial-scale=1.0, minimum-scale=1.0">
      <title>Spring Boot WebSocket Chat Application | CalliCoder</title>
      <link rel="stylesheet" href="/css/main.css" />
  </head>
  <body>
    <noscript>
      <h2>Sorry! Your browser doesn't support Javascript</h2>
    </noscript>
    <div id="username-page">
        <div class="username-page-container">
            <h1 class="title">接收消息测试</h1>
            <form id="usernameForm" name="usernameForm">
                <div class="form-group">
                    <input type="text" id="name" placeholder="Username" autocomplete="off" class="form-control" />
                </div>
                <div class="form-group">
                    <button type="submit" class="accent username-submit">进入消息接收室</button>
                </div>
            </form>
        </div>
    </div>
    <div id="chat-page" class="hidden">
        <div class="chat-container">
            <div class="chat-header">
                <h2>消息测试</h2>
            </div>
            <div class="connecting">
                Connecting...
            </div>
            <ul id="messageArea">
            </ul>
            <form id="messageForm" name="messageForm" nameForm="messageForm">
                <div class="form-group">
                    <div class="input-group clearfix">
                        <input type="text" id="message" placeholder="Type a message..." autocomplete="off" class="form-control"/>
                        <button type="submit" class="primary">Send</button>
                    </div>
                </div>
            </form>
        </div>
    </div>
    <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
    <script src="/js/main.js"></script>
  </body>
</html>


到这里,这个实例的前后端咱们总算是完毕了。


ps:  application.properties 项目配置文件咱们就开了个端口


image.png


进入实操演练环节, 运行项目:


先不着急,我们登录rabbitmq的管理页面看一眼(http://localhost:15672):


image.png


其余的队列啥的,自己看看咯,不多介绍了。


打开第一个页面index.html:


我们输入一个客户端用户识别信息, A1001  连接后端weboscket节点。


image.png


这时候,先看一眼控制台,


流程:


没错在连接上后端服务节点的时候携带的A1001已经被我们的拦截器拦截,也设置了用户认证信息;


而且也执行了onConnected方法,往/app/chat.addUser 推送了一条加入消息,对应到的就是ChatController里的@MessageMapping("/chat.sendMessageTest") 方法;


然后推送成功触发了回调函数,且与此同时消费者监听器也消费了下来,并且调用chatService里的方法,将消息判断时群发或者单发后进行最终的推送:


image.png


事不宜迟,我们再打开一个index.html,以 B2002  连接 进入:


image.png


可以看到,A1001 和 B2002 都能收到上线消息:


image.png

OK,接下来,A1001  与 B2002 随便发送点消息 (其实就是调用js里的sendMessage方法):


image.png


ok,群发功能也就这样测试完了。


接下来我们来看看单独推送,我们以接口方法来模拟下我们服务端给指定用户推送消息,单发(点对点):


那么我们调用的接口就是,


image.pngimage.png


可以看到,我们给 B2002 单独推送了一条消息:


image.png


ps: 系统给用户单独推送消息学会了,那么用户直接互相推送也就自然明白了。只是把发送者从系统改为具体的用户id.


ok,到此。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
27天前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
1月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
3天前
|
前端开发 JavaScript 安全
集成WebSocket在Spring Boot中可以用于实现实时的双向通信
集成WebSocket在Spring Boot中可以用于实现实时的双向通信
16 4
|
3天前
|
消息中间件 存储 Java
RabbitMQ 是一个开源的消息代理软件
RabbitMQ 是一个开源的消息代理软件
11 2
|
24天前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
25天前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
3天前
|
消息中间件 Java Spring
最新spingboot整合rabbitmq详细教程
最新spingboot整合rabbitmq详细教程
|
5天前
|
消息中间件
RabbitMQ是一个功能强大的开源消息代理软件,用于处理消息队列
RabbitMQ是一个功能强大的开源消息代理软件,用于处理消息队列
8 0
|
30天前
|
消息中间件 Cloud Native 自动驾驶
RocketMQ实战教程之MQ简介
Apache RocketMQ 是一个云原生的消息流平台,支持消息、事件和流处理,适用于云边端一体化场景。官网提供详细文档和下载资源:[RocketMQ官网](https://rocketmq.apache.org/zh/)。示例中提到了RocketMQ在物联网(如小米台灯)和自动驾驶等领域的应用。要开始使用,可从[下载页面](https://rocketmq.apache.org/zh/download)获取软件。
|
1月前
|
消息中间件 中间件 Java
RocketMQ实战教程之几种MQ优缺点以及选型
该文介绍了几种主流消息中间件,包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。ActiveMQ和RabbitMQ是较老牌的选择,前者在中小企业中常见,后者因强大的并发能力和活跃社区而流行。RocketMQ是阿里巴巴的开源产品,适用于大规模分布式系统,尤其在数据可靠性方面进行了优化。Kafka最初设计用于大数据日志处理,强调高吞吐量。在选择MQ时,考虑因素包括性能、功能、开发语言、社区支持、学习难度、稳定性和集群功能。小型公司推荐使用RabbitMQ,而大型公司则可在RocketMQ和Kafka之间根据具体需求抉择。