使用WebSocket实现服务端和客户端的通信

简介: 使用WebSocket实现服务端和客户端的通信

开发中经常会有这样的使用场景.如某个用户在一个数据上做了xx操作, 与该数据相关的用户在线上的话,需要实时接收到一条信息. 这种可以使用WebSocket来实现. 另外,对于消息,可以定义一个类进行固化. 主要是消息内容,接收人,发送人,是否已发送等. 用户上线时, 通过方法去查询出来然后进行发送

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@ServerEndpoint(value = "/websocket/{sessionId}")
public class MyWebSocket {
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    public static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    public Session session;
    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        if (webSocketSet.add(this)) {
            System.out.println("有新连接加入!当前在线人数为" + onlineCount.incrementAndGet());
        }
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketSet.remove(this)) {
            System.out.println("有一连接关闭!当前在线人数为" + onlineCount.decrementAndGet());
        }
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
        //群发消息
    /* for(MyWebSocket item: webSocketSet){
    try {
    item.sendMessage(message);
    } catch (IOException e) {
    e.printStackTrace();
    continue;
    }
    }*/
    }
    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }
    private static ReentrantLock lock = new ReentrantLock(true);
    /**
     * 该方法是我们根据业务需要调用的.
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        synchronized (this.session) {
            if (session.isOpen()) {
                this.session.getAsyncRemote().sendText(message);
            }
        }
    }
}

个人代码:

package com.oldlu.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * 开启WebSocket支持
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
package com.oldlu.websocket;
import com.alibaba.fastjson.JSONObject;
import com.oldlu.common.core.exception.ApiException;
import com.oldlu.entity.po.SysUser;
import com.oldlu.service.SysUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author oldlu
 * 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{sid}") //此注解相当于设置访问URL
public class WebSocket {
    private Session session;
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger onlineNum = new AtomicInteger();
    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
    public static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    //发送消息
    public void sendMessage(Session session, String message) throws IOException {
        if(session != null) {
            synchronized (session) {
                session.getBasicRemote().sendText(message);
            }
        }
    }
    //给指定用户发送信息
    public void sendInfo(String userId, String message){
        Session session = sessionPools.get(userId);
        try {
            sendMessage(session, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 指定多用户推送
    public void sendInfoByUsers(String[] userIds, String message) {
        for (String userId : userIds) {
            sendInfo(userId,message);
        }
    }
    // 群发消息
    public void sendAll(String message) {
        sessionPools.values().forEach(sessionitem -> {
            try {
                sendMessage(sessionitem,message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
    //建立连接成功调用
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "sid") String userName){
        // TODO 客户端连接限制
        this.session = session;
        if (sessionPools.get(userName) == null) addOnlineCount();
        sessionPools.put(userName, session);
        System.out.println(userName + "加入webSocket!当前链接为" + onlineNum);
    }
    //关闭连接时调用
    @OnClose
    public void onClose(@PathParam(value = "sid") String userName) {
        sessionPools.remove(userName);
        subOnlineCount();
        System.out.println(userName + "断开webSocket连接!当前链接为" + onlineNum);
    }
    //收到客户端信息
    @OnMessage
    public void onMessage(String message) throws IOException{
        JSONObject jsonObject = JSONObject.parseObject(message);
        if (jsonObject.get("cmd").equals("heart")){
            try {
                sendMessage(session, message);
            } catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    //错误时调用
    @OnError
    public void onError(Session session, Throwable throwable){
        System.out.println("发生错误");
        throwable.printStackTrace();
    }
    public static void addOnlineCount(){
        onlineNum.incrementAndGet();
    }
    public static void subOnlineCount() {
        onlineNum.decrementAndGet();
    }
}

Controller:

package com.oldlu.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.oldlu.common.core.annotation.CheckEnum;
import com.oldlu.common.core.annotation.CheckValidatorEnum;
import com.oldlu.common.core.exception.ApiException;
import com.oldlu.common.core.utils.JwtSecurityUtil;
import com.oldlu.entity.form.SysNoticeForm;
import com.oldlu.entity.form.SysNoticeQueryForm;
import com.oldlu.entity.param.SysNoticeQueryParam;
import com.oldlu.entity.po.SysNotice;
import com.oldlu.entity.po.SysUser;
import com.oldlu.entity.vo.MassageTemplateSendVO;
import com.oldlu.entity.vo.SysNoticeVO;
import com.oldlu.service.SysNoticeService;
import com.oldlu.service.SysUserService;
import com.oldlu.websocket.WebSocket;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
import java.util.List;
/**
 * <p>
 * 前端控制器
 * </p>
 *
 * @author oldlu
 * @since 2021-05-06
 */
@RestController
@Api(tags = "消息通知")
@RequestMapping("/sysNotice")
public class SysNoticeController {
    @Autowired
    private SysNoticeService sysNoticeService;
    @Autowired
    private SysUserService sysUserService;
    @Autowired
    private WebSocket webSocket;
    @ApiOperation(value = "管理员通知公告进行上传", notes = "管理员通知公告进行上传")
    @PostMapping
    public ResponseEntity<HttpStatus> insertTSysNotice(@RequestBody SysNoticeForm tSysNoticeForm) throws ApiException {
        sysNoticeService.insertSysNotice(tSysNoticeForm.toPo(SysNotice.class));
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "短信发送", notes = "短信发送")
    @PostMapping("/sendMessage")
    public ResponseEntity<SysNotice> sendMessage(@RequestBody MassageTemplateSendVO massageTemplateSendVO) throws ApiException {
        return ResponseEntity.ok(sysNoticeService.sendMassage(massageTemplateSendVO));
    }
    @ApiOperation(value = "指定用户添加消息", notes = "指定用户添加消息")
    @PostMapping("/pushByUserId")
    public ResponseEntity pushByUserId(String noticeType, String userId, String message, HttpServletRequest request) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        SysUser receiveUser = sysUserService.getById(userId);
        if (receiveUser == null) {
            throw new ApiException("未找到消息接受用户信息,请重新选择。", HttpStatus.BAD_REQUEST);
        }
        SysNotice sysNotice = new SysNotice();
        sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName())
                .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
//        webSocket.sendInfo(userId, message);
        sysNoticeService.insertSysNotice(sysNotice);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "指定多用户添加消息", notes = "给多个用户添加消息")
    @PostMapping(value = "/pushByUserIds")
    public ResponseEntity pushByUserIds(String noticeType, String[] userIds, String message, HttpServletRequest request) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        for (String userId : userIds) {
            SysUser receiveUser = sysUserService.getById(userId);
            if (receiveUser != null) {
                SysNotice sysNotice = new SysNotice();
                sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName())
                        .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
                sysNoticeService.insertSysNotice(sysNotice);
            }
        }
//        webSocket.sendInfoByUsers(userIds, message);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "系统所有用户群发添加消息", notes = "系统所有用户群发添加消息")
    @PostMapping(value = "/pushAll")
    public ResponseEntity pushAll(String message, HttpServletRequest request, String noticeType) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        sysUserService.list().forEach(user -> {
            if (user.getId().equals(sendUser.getId())) {
                return;
            }
            SysNotice sysNotice = new SysNotice();
            sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(user.getId()).setReceiveUserName(user.getName())
                    .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
            sysNoticeService.insertSysNotice(sysNotice);
        });
//        webSocket.sendAll(message);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "设置未已读/未读")
    @PostMapping("/updateReadFlag")
    public ResponseEntity updateReadFlag(String noticeId, String readFlag) {
        sysNoticeService.updateReadFlag(noticeId, readFlag);
        return ResponseEntity.ok("设置成功!");
    }
    @ApiOperation(value = "多条消息设置已读/未读", notes = "多条消息设置已读/未读")
    @PostMapping("/updateReadFlags")
    public ResponseEntity updateReadFlags(String[] noticeIds, String readFlag) {
        for (String noticeId : noticeIds) {
            sysNoticeService.updateReadFlag(noticeId, readFlag);
        }
        return ResponseEntity.ok("设置成功");
    }
    @ApiOperation(value = "批量删除公告", notes = "批量删除公告")
    @DeleteMapping(value = "/deletes")
    public ResponseEntity<HttpStatus> deleteSysNotices(String[] noticeIds) {
        for (String noticeId : noticeIds) {
            sysNoticeService.deleteSysNotice(noticeId);
        }
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "删除公告", notes = "删除公告")
    @ApiImplicitParam(paramType = "path", name = "id", value = "编号", required = true, dataType = "long")
    @DeleteMapping(value = "/{id}")
    public ResponseEntity<HttpStatus> deleteSysNotice(@PathVariable String id) throws ApiException {
        sysNoticeService.deleteSysNotice(id);
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "获取公告", notes = "获取指定公告")
    @ApiImplicitParam(paramType = "path", name = "id", value = "ID", required = true, dataType = "long")
    @GetMapping(value = "/{id}")
    public ResponseEntity<SysNoticeVO> getSysNoticeById(@CheckEnum(type = CheckValidatorEnum.Num) @PathVariable String id) {
        return ResponseEntity.ok(sysNoticeService.getSysNoticeById(id));
    }
    @ApiOperation(value = "条件组合查询", notes = "条件组合查询")
    @PostMapping(value = "/list")
    public ResponseEntity<List<SysNoticeVO>> listSysNotices(@RequestBody SysNotice sysNotice) {
        return ResponseEntity.ok(sysNoticeService.listSysNotices(sysNotice));
    }
    @ApiOperation(value = "搜索(分页)", notes = "根据条件搜索")
    @ApiImplicitParam(name = "sysNoticeQueryForm", value = "查询参数", required = true, dataType = "SysNoticeQueryForm")
    @PostMapping("/page")
    public ResponseEntity<IPage<SysNoticeVO>> getSysNoticePage(@Valid @RequestBody SysNoticeQueryForm sysNoticeQueryForm) {
        return ResponseEntity.ok(sysNoticeService.getSysNoticePage(sysNoticeQueryForm.getPage(), sysNoticeQueryForm.toParam(SysNoticeQueryParam.class)));
    }
    @ApiOperation(value = "根据用户ID查询消息", notes = "根据用户ID查询消息")
    @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string")
    @GetMapping("/findAllByUserId/{userId}")
    public ResponseEntity<List<SysNoticeVO>> findAllByUserId(@PathVariable("userId") Long userId) {
        return ResponseEntity.ok(sysNoticeService.findAllByUserId(userId));
    }
    @ApiOperation(value = "根据用户ID已读全部消息", notes = "根据用户ID已读全部消息")
    @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string")
    @GetMapping("/dealAllByUserId/{userId}")
    public ResponseEntity<HttpStatus> dealAllByUserId(@PathVariable("userId") Long userId) {
        sysNoticeService.dealAllByUserId(userId);
        return ResponseEntity.ok().build();
    }
    /**
     * 获取登录用户
     *
     * @param request 请求头
     */
    public SysUser getLoginUser(HttpServletRequest request) {
        String loginUserId = JwtSecurityUtil.getLoginUserId(request);
        if (StringUtils.isBlank(loginUserId)) {
            return null;
        }
        return sysUserService.getById(loginUserId);
    }
}

页面中的调用.每个客户都要初始化一个websocket示例.其中我们用用户的userId作为标识的一部分.//页面加载完成. 初始化一个webSocket对象.然后可以根据需要调一个来发信息

    window.onload = function () {
    initWebSocket();
    setTimeout(function () {
        $.post('<%=basePath %>xxx.do', function (r) {
            //alert(0);
        });
    }, 2000);
};
function initWebSocket() {
    webSocket = new WebSocket(requestUrl.replace("http", "ws")
        + 'websocket/${userId}');
    webSocket.onerror = function (event) {
        onError(event)
    };
    //连接建立成功事件
    webSocket.onopen = function (event) {
        onOpen(event)
    };
    //接收到服务端消息事件
    webSocket.onmessage = function (event) {
        onMessage(event)
    };
}

简易版:(使用时注入即可)

package com.oldlu.socket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
 * @author oldlu
 * 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket") //此注解相当于设置访问URL
public class WebSocket {
    private static Set<WebSocket> webSocketSet = new HashSet<>();
    private Session session;
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        System.out.println("接收的消息是:" + message);
        System.out.println(session);
        //将消息发送给其他的用户
    }
    //发送消息
    public void sendMessage(String message) throws IOException {
        for (WebSocket webSocket : webSocketSet) {
            if (webSocket.session.isOpen())
                webSocket.session.getBasicRemote().sendText(message);
        }
    }
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);
    }
    @OnClose
    public void onClose(Session seesion) {
        System.out.println("连接关闭了。。。");
    }
    @OnError
    public void onError(Session session,Throwable error) {
        System.out.println("出错了。。。。" + error.getMessage());
    }
}


目录
相关文章
|
4月前
|
前端开发 网络协议 JavaScript
在Spring Boot中实现基于WebSocket的实时通信
在Spring Boot中实现基于WebSocket的实时通信
|
22天前
|
JavaScript 前端开发 测试技术
前端全栈之路Deno篇(五):如何快速创建 WebSocket 服务端应用 + 客户端应用 - 可能是2025最佳的Websocket全栈实时应用框架
本文介绍了如何使用Deno 2.0快速构建WebSocket全栈应用,包括服务端和客户端的创建。通过一个简单的代码示例,展示了Deno在WebSocket实现中的便捷与强大,无需额外依赖,即可轻松搭建具备基本功能的WebSocket应用。Deno 2.0被认为是最佳的WebSocket全栈应用JS运行时,适合全栈开发者学习和使用。
|
20天前
|
Kubernetes Cloud Native JavaScript
为使用WebSocket构建的双向通信应用带来基于服务网格的全链路灰度
介绍如何使用为基于WebSocket的云原生应用构建全链路灰度方案。
|
6月前
|
网络协议 JavaScript 前端开发
WebSocket:实现客户端与服务器实时通信的技术
WebSocket:实现客户端与服务器实时通信的技术
|
4月前
|
前端开发 JavaScript API
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
【7月更文挑战第17天】现代Web开发趋势中,前后端分离配合WebSocket满足实时通信需求。Django Channels扩展了Django,支持WebSocket连接和异步功能。通过安装Channels、配置设置、定义路由和消费者,能在Django中实现WebSocket交互。前端使用WebSocket API连接后端,实现双向数据流,如在线聊天功能。集成Channels提升Web应用的实时性和用户体验,适应实时交互场景的需求。**
186 6
|
4月前
|
安全 数据安全/隐私保护 UED
优化用户体验:前后端分离架构下Python WebSocket实时通信的性能考量
【7月更文挑战第17天】前后端分离趋势下,WebSocket成为实时通信的关键,Python有`websockets`等库支持WebSocket服务。与HTTP轮询相比,WebSocket减少延迟,提高响应。连接管理、消息传输效率、并发处理及安全性是性能考量重点。使用WebSocket能优化用户体验,尤其适合社交、游戏等实时场景。开发应考虑场景需求,充分利用WebSocket优势。
143 3
|
4月前
|
前端开发 Python
前后端分离的进化:Python Web项目中的WebSocket实时通信解决方案
【7月更文挑战第18天】在Python的Flask框架中,结合Flask-SocketIO库可轻松实现WebSocket实时通信,促进前后端分离项目中的高效交互。示例展示了一个简单的聊天应用:Flask路由渲染HTML,客户端通过Socket.IO库连接服务器,发送消息并监听广播。此方法支持多种实时通信协议,适应不同环境,提供流畅的实时体验。
91 3
|
4月前
|
JavaScript 前端开发 UED
WebSocket在Python Web开发中的革新应用:解锁实时通信的新可能
【7月更文挑战第16天】WebSocket是实现Web实时通信的协议,与HTTP不同,它提供持久双向连接,允许服务器主动推送数据。Python有多种库如websockets和Flask-SocketIO支持WebSocket开发。使用Flask-SocketIO的简单示例包括定义路由、监听消息事件,并在HTML中用JavaScript建立连接。WebSocket提高了实时性、减少了服务器压力,广泛应用于聊天、游戏等场景。
46 1
|
4月前
|
移动开发 前端开发 网络协议
Python Web实时通信新纪元:基于WebSocket的前后端分离技术探索
【7月更文挑战第16天】WebSocket增强Web实时性,Python借助Flask-SocketIO简化实现。安装`flask`和`flask-socketio`,示例展示服务器端接收连接及消息并广播响应,前端HTML用Socket.IO库连接并监听事件。WebSocket开启双向通信新时代,助力动态Web应用开发。
52 1
|
3月前
|
网络协议 Go
[golang]gin框架接收websocket通信
[golang]gin框架接收websocket通信
100 0