Websocket Stomp+RabbitMQ实现消息推送

简介: Websocket Stomp+RabbitMQ实现消息推送


目录

1、技术栈

2、依赖

3、修改配置文件

4、RabbitConfig

5、消息包装类

6、利用STOMP实现前后端长连接

7、编写前端页面:

8、编写消息生产者和消费者

9、测试


1、技术栈

后端:springboot2.0.6

前端:html  js

2、依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.zj</groupId>
  <artifactId>online-test</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>online-test</name>
  <description>Demo project for Spring Boot</description>
  <properties>
    <java.version>1.8</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
    <!--websocket 相关依赖-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
      <groupId>org.webjars</groupId>
      <artifactId>webjars-locator-core</artifactId>
    </dependency>
    <dependency>
      <groupId>org.webjars</groupId>
      <artifactId>sockjs-client</artifactId>
      <version>1.0.2</version>
    </dependency>
    <dependency>
      <groupId>org.webjars</groupId>
      <artifactId>stomp-websocket</artifactId>
      <version>2.3.3</version>
    </dependency>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

image.gif

3、修改配置文件

#rabbitmq
spring.rabbitmq.host=192.168.XXX
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

image.gif

本文省略rabbitmq的安装,这边将你安装的rabbitmq的相关信息填入即可

4、RabbitConfig

先创一个rabbitmq 的配置类,由于我们这边业务逻辑比较简单,就简单使用rabbitmq一下

这边创一个hello的消息队列。

@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
}

image.gif

5、消息包装类

先讲一下消息包装类,前后端的消息都以这种格式来传递,大家可以根据自己的需求自定义。

public class RequestMessage {
    private String room;//频道号
    private String type;//消息类型('1':客户端到服务端   '2':客户端到服务端)
    private String content;//消息内容(即答案)
    private String userId;//用户id
    private String questionId;//题目id
    private String createTime;//时间
    public RequestMessage() {
    }
    public RequestMessage(String room, String type, String content, String userId, String questionId, String createTime) {
        this.room = room;
        this.type = type;
        this.content = content;
        this.userId = userId;
        this.questionId = questionId;
        this.createTime = createTime;
    }
    public String getRoom() {
        return room;
    }
    public String getType() {
        return type;
    }
    public String getContent() {
        return content;
    }
    public void setRoom(String room) {
        this.room = room;
    }
    public String getUserId() {
        return userId;
    }
    public void setUserId(String userId) {
        this.userId = userId;
    }
    public String getQuestionId() {
        return questionId;
    }
    public void setQuestionId(String questionId) {
        this.questionId = questionId;
    }
    public String getCreateTime() {
        return createTime;
    }
    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }
    public void setType(String type) {
        this.type = type;
    }
    public void setContent(String content) {
        this.content = content;
    }
}

image.gif

6、利用STOMP实现前后端长连接

这部分与我之前的文章类似:https://blog.csdn.net/qq_41603102/article/details/88351729

本文这边 省略了消息群发

首先编写websocket配置类:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        /**
         * 订阅来自"/topic"和"/user"的消息,
         * /topic 单聊
         * /all 群聊
         */
        config.enableSimpleBroker("/topic","/all");
        /**
         * 客户端发送过来的消息,需要以"/app"为前缀,再经过Broker转发给响应的Controller
         */
        config.setApplicationDestinationPrefixes("/app");
    }
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        /**
         * 路径"/websocket"被注册为STOMP端点,对外暴露,客户端通过该路径接入WebSocket服务
         */
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
    }
}

image.gif

然后编写消息转发器(前端的消息通过这边,进行转发)

@RestController
public class WebSocketTestController {
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    @Autowired
    Sender senderMQ;
    /**聊天室(单聊+多聊)&&消息转发
     * @param requestMessage
     * @throws Exception
     */
    @CrossOrigin
    @MessageMapping("/chat")
    public void messageHandling(RequestMessage requestMessage) throws Exception {
        String destination = "/topic/" + HtmlUtils.htmlEscape(requestMessage.getRoom());
        String room = HtmlUtils.htmlEscape(requestMessage.getRoom());//htmlEscape  转换为HTML转义字符表示
        String type = HtmlUtils.htmlEscape(requestMessage.getType());
        String content = HtmlUtils.htmlEscape(requestMessage.getContent());
        String userId = HtmlUtils.htmlEscape(requestMessage.getUserId());
        String questionId = HtmlUtils.htmlEscape(requestMessage.getQuestionId());
        String createTime = HtmlUtils.htmlEscape(requestMessage.getCreateTime());
        System.out.println( requestMessage.getRoom() );
        System.out.println( content );
        messagingTemplate.convertAndSend(destination, requestMessage);
    }
}

image.gif

注意:

1、使用@MessageMapping注解来标识所有发送到“/chat”这个destination的消息,都会被路由到这个方法进行处理

2、使用@SendTo注解来标识这个方法返回的结果,都会被发送到它指定的destination,“/topic”

3、传入的参数RequestMessage requestMessage为客户端发送过来的消息,是自动绑定的。

 

如果觉得文章对你有帮助,欢迎关注微信公众号:小牛呼噜噜


将前端传过来的消息解析,然后再通过messagingTemplate.convertAndSend(“目的地”,“消息内容”)转发出去。

7、编写前端页面:

这边的前端页面仅仅用了html+js

<!DOCTYPE HTML>
<html>
<head>
    <meta charset="UTF-8">
    <title>My WebSocket</title>
    <script src="js/sockjs.min.js"></script>
    <script src="js/jquery.min.js"></script>
    <script src="js/stomp.min.js"></script>
    <!--<script type="text/javascript"></script>-->
    <style>
        #message22{
            margin-top:40px;
            border:1px solid gray;
            padding:20px;
        }
    </style>
    <style>
        #message{
            margin-top:40px;
            border:1px solid gray;
            padding:20px;
        }
    </style>
</head>
<body>
频道号:<input id="room" type="text"/>
<button onclick="conectWebSocket()">连接WebSocket</button>
<button onclick="disconnect()">断开连接</button>
<hr />
<div id="message22"></div>
<br />
做题区:<input id="text" type="text" />
<!-- 频道号:<input id="toUser" type="text" /> -->
<button onclick="sendMessage()">发送消息</button>
<div id="message"></div>
</body>
<script type="text/javascript">
    var stompClient;
    var serverUrl = "http://localhost:8080/websocket";
    var room;//频道号
    var websocket = null;
    //websocket连接
    function conectWebSocket(){
        this.room = document.getElementById('room').value;//频道号
        console.log(this.room);
        console.log(this.serverUrl);
        var socket = new SockJS(this.serverUrl);
        this.stompClient = Stomp.over(socket);
        var that = this;
          this.stompClient.connect({}, function (frame) {
              that.stompClient.subscribe('/topic/'+that.room ,function(txt) {
                  // console.log("websocket连接成功");
                  // console.log(txt);
                  document.getElementById('message').innerHTML += JSON.parse(txt.body)['content']+ '<br/>';
                  // const sender = JSON.parse(message.body)['sender'];
////                const language = JSON.parse(message.body)['language'];
////                const content = JSON.parse(message.body)['content'];
////                const type = JSON.parse(message.body)['type'];
              });
          });
     }
     //发送消息
    function sendMessage() {
        //获取输入的文本信息进行发送
        var message = document.getElementById('text').value;
        // var room = document.getElementById('toUser').value;
        var socketMsg = {msg:message,toUser:room};
        var that = this
        this.stompClient.send(
            '/app/chat',
            {},
            JSON.stringify({
                'room': that.room,
                'type': "1",//1,2
                'content': message,
                'userId':"566072523",//小明
                'questionId':"222299023",//题目1
                'createTime':"",
                 })
        );
    }
    function disconnect() {
         //断开连接的方法
        if (this.stompClient !== undefined) {
            this.stompClient.disconnect();
            alert("Disconnected");
        }else{
            alert("当前没有连接websocket")
        }
        this.stompClient = undefined;
    }
</script>
</html>

image.gif

页面都很简单,就不细讲了,但要引入sockjs.min.js   jquery.min.js   stomp.min.js这3个js

大家可以从我github上拉取:online-test/src/main/resources/static/js at master · zj827622690/online-test · GitHub

这样就前后端长连接就完成了,赶紧来试试~

image.png

 可以看出长连接成功

 下面我们要来讲讲 消息推送如何实现,当我们前端发一个http请求,在这个请求结束前,后端会新开一个线程。当原本的请求结束时,后端新开的那个线程还在运行,当它结束时,后端应该要返回消息给前端,但之前的请求已经结束,http握手已经早结束了,消息怎么传递。所以我们需要用到rabbitmq(用来消息的解耦)  websocket(用来保证前后端通信通道不关闭),来让后端主动推送的消息,能到前端页面显示。    可以这么笼统地理解:}。

8、编写消息生产者和消费者

 现在我们讲讲 rabbitmq 消息2个重要的组成部分:消息生产者和消息消费者

sender:

@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(String context) { //注意因为是AmqpTemplate,所有这里只接受String,byte[],Seriz..
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

image.gif

这里注意一下,发送消息采用了AmqpTemplate 模板 。AmqpTemplate接口已经定义了发送和接收消息的基本操作。我们直接使用即可,但要注意的是 必须符合它的类型,这里只支持   String,byte[],Seriz..类型的消息

convertAndSend("hello", context);      hello就是我们前面创的消息队列,context是消息的内容

但我们有时候消息特别复杂,一般用对象来储存消息。这个怎么办呢?,这边我们先不讲,到我们下面再讲。我们继续来讲消息消费者

receiver:

@Component
@RabbitListener(queues = "hello")
public class Receiver {
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    @RabbitHandler
    public void process(String context) throws IOException {
        System.out.println("Receiver : " + context);
        RequestMessage mqTask = new RequestMessage(  );
        BeanUtils.copyProperties( JsonUtils.jsonToObject( context,RequestMessage.class ),mqTask );
        if (Objects.equals( mqTask.getType(), "2" )) {
            String destination = "/topic/" +mqTask.getRoom();
            messagingTemplate.convertAndSend( destination, mqTask);
        }
    }
}

image.gif

这里注意:我们前面说过,由于消息一般比较复杂,所以发送过来的消息一般是对象类型,但AmqpTemplate不支持,所以我们需要在消息生产者这边把它先转化成String型,传到消息消费者这边,再转化成对象。然后对消息进行处理。

最后通过            messagingTemplate.convertAndSend( destination, mqTask); 将消息发送到前端页面上。

我们这边用到的工具类:

public class JsonUtils {
    private static final ObjectMapper MAPPER = new ObjectMapper();
    /**
     * 对象-->Json字符串
     * @version 创建时间:2018年4月17日 下午3:39:35
     */
    public static String objectToJson(Object data) {
        try {
            return MAPPER.writeValueAsString(data);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * Json字符串-->对象
     * @version 创建时间:2018年4月17日 下午3:39:45
     */
    public static <T> T jsonToObject(String jsonData, Class<T> beanType) {
        try {
            return MAPPER.readValue(jsonData, beanType);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * Json字符串--> List<对象>
     * @version 创建时间:2018年4月17日 下午3:40:09
     */
    public static <T> List<T> jsonToList(String jsonData, Class<T> beanType) {
        JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
        try {
            return MAPPER.readValue(jsonData, javaType);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    public static <T> Map<String, T> jsonToMap(String jsonData) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.readValue(jsonData, Map.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static String mapToJson(Map map) {
        try {
            return MAPPER.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static <T> Set<T> jsonToSet(String jsonData) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.readValue(jsonData, Set.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    public static String setToJson(Set set) {
        try {
            return MAPPER.writeValueAsString(set);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

image.gif

9、测试

我们先定义一个sevice方法,来实现异步。

public interface CommonService {
    void testAsync();
}

image.gif

@Service
public class CommonServiceImpl implements CommonService {
    @Autowired
    Sender sender;
    @Async
    @Override
    public void testAsync() {
        RequestMessage mqTask = new RequestMessage(  );
        for(int i=0;i<6;i++) {
            mqTask.setRoom( "123");
            mqTask.setUserId("000");
            mqTask.setType( "2" );
            mqTask.setQuestionId( "0000");
            mqTask.setCreateTime( "0000");
            mqTask.setContent("this:"+i);
            sender.send( JsonUtils.objectToJson( mqTask ) );
            try {
                Thread.sleep( 1000 );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

image.gif

这里实现异步,为了简单就只用了@Async,关于线程池有兴趣可以看看我之前的一篇文章:P

Springboot @Async及线程池的使用和扩展_小牛呼噜噜的博客-CSDN博客_@async和线程池的区别

再写一个测试controller

@RestController
public class TestController {
    @Autowired
    CommonService commonService;
    @GetMapping("/test/testAsync")
    public String testAsync() {
        commonService.testAsync();
        return "http请求已结束";
    }
}

image.gif

打开浏览器,先连接websocket,然后再开一个窗口,输入请求localhost:8080/test/testAsync

发现如下:

image.png

表明成功,本文暂且就到这了 = - =


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
前端开发 Java 测试技术
SpringBoot整合WebSocket实现定时任务消息推送
SpringBoot整合WebSocket实现定时任务消息推送
139 0
|
4月前
|
Java
SpringBoot:第七篇 websocket(消息推送)
SpringBoot:第七篇 websocket(消息推送)
45 0
|
8月前
|
JavaScript 网络协议 安全
基于若依(SpringBoot前后分离版-vue)的WebSocket消息推送实现
基于若依(SpringBoot前后分离版-vue)的WebSocket消息推送实现
2482 1
|
9月前
|
前端开发 安全 Java
SpringBoot + WebSocket+STOMP指定推送消息
本文将简单的描述SpringBoot + WebSocket+STOMP指定推送消息场景,不包含信息安全加密等,请勿用在生产环境。
199 0
|
9月前
|
消息中间件 Java Maven
WebSocket 基于OkHttps搭配Stomp实现客户端的监听
WebSocket 基于OkHttps搭配Stomp实现客户端的监听
|
9月前
|
Android开发
Android WebSocket长链接使用Stomp协议【精品】
Android WebSocket长链接使用Stomp协议【精品】
|
10月前
|
NoSQL Go Redis
Go WebSocket + Redis 实现轻量级的订阅和实时消息推送
Go WebSocket + Redis 实现轻量级的订阅和实时消息推送
|
移动开发 网络协议 前端开发
SpringBoot整合WebSocket实现消息推送
SpringBoot整合WebSocket实现消息推送
388 0
|
网络协议 前端开发 安全
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
websocket和http的瓜葛以及websocket协议实现
|
JavaScript
js实现websocket实例
js实现websocket实例
193 0