使用WebSocket实现服务端和客户端的通信

简介: 使用WebSocket实现服务端和客户端的通信

开发中经常会有这样的使用场景.如某个用户在一个数据上做了xx操作, 与该数据相关的用户在线上的话,需要实时接收到一条信息. 这种可以使用WebSocket来实现. 另外,对于消息,可以定义一个类进行固化. 主要是消息内容,接收人,发送人,是否已发送等. 用户上线时, 通过方法去查询出来然后进行发送

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@ServerEndpoint(value = "/websocket/{sessionId}")
public class MyWebSocket {
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
    public static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>();
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    public Session session;
    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        if (webSocketSet.add(this)) {
            System.out.println("有新连接加入!当前在线人数为" + onlineCount.incrementAndGet());
        }
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketSet.remove(this)) {
            System.out.println("有一连接关闭!当前在线人数为" + onlineCount.decrementAndGet());
        }
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
        //群发消息
    /* for(MyWebSocket item: webSocketSet){
    try {
    item.sendMessage(message);
    } catch (IOException e) {
    e.printStackTrace();
    continue;
    }
    }*/
    }
    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }
    private static ReentrantLock lock = new ReentrantLock(true);
    /**
     * 该方法是我们根据业务需要调用的.
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
        synchronized (this.session) {
            if (session.isOpen()) {
                this.session.getAsyncRemote().sendText(message);
            }
        }
    }
}

个人代码:

package com.oldlu.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * 开启WebSocket支持
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
package com.oldlu.websocket;
import com.alibaba.fastjson.JSONObject;
import com.oldlu.common.core.exception.ApiException;
import com.oldlu.entity.po.SysUser;
import com.oldlu.service.SysUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author oldlu
 * 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{sid}") //此注解相当于设置访问URL
public class WebSocket {
    private Session session;
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static AtomicInteger onlineNum = new AtomicInteger();
    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
    public static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    //发送消息
    public void sendMessage(Session session, String message) throws IOException {
        if(session != null) {
            synchronized (session) {
                session.getBasicRemote().sendText(message);
            }
        }
    }
    //给指定用户发送信息
    public void sendInfo(String userId, String message){
        Session session = sessionPools.get(userId);
        try {
            sendMessage(session, message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 指定多用户推送
    public void sendInfoByUsers(String[] userIds, String message) {
        for (String userId : userIds) {
            sendInfo(userId,message);
        }
    }
    // 群发消息
    public void sendAll(String message) {
        sessionPools.values().forEach(sessionitem -> {
            try {
                sendMessage(sessionitem,message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
    //建立连接成功调用
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "sid") String userName){
        // TODO 客户端连接限制
        this.session = session;
        if (sessionPools.get(userName) == null) addOnlineCount();
        sessionPools.put(userName, session);
        System.out.println(userName + "加入webSocket!当前链接为" + onlineNum);
    }
    //关闭连接时调用
    @OnClose
    public void onClose(@PathParam(value = "sid") String userName) {
        sessionPools.remove(userName);
        subOnlineCount();
        System.out.println(userName + "断开webSocket连接!当前链接为" + onlineNum);
    }
    //收到客户端信息
    @OnMessage
    public void onMessage(String message) throws IOException{
        JSONObject jsonObject = JSONObject.parseObject(message);
        if (jsonObject.get("cmd").equals("heart")){
            try {
                sendMessage(session, message);
            } catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    //错误时调用
    @OnError
    public void onError(Session session, Throwable throwable){
        System.out.println("发生错误");
        throwable.printStackTrace();
    }
    public static void addOnlineCount(){
        onlineNum.incrementAndGet();
    }
    public static void subOnlineCount() {
        onlineNum.decrementAndGet();
    }
}

Controller:

package com.oldlu.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.oldlu.common.core.annotation.CheckEnum;
import com.oldlu.common.core.annotation.CheckValidatorEnum;
import com.oldlu.common.core.exception.ApiException;
import com.oldlu.common.core.utils.JwtSecurityUtil;
import com.oldlu.entity.form.SysNoticeForm;
import com.oldlu.entity.form.SysNoticeQueryForm;
import com.oldlu.entity.param.SysNoticeQueryParam;
import com.oldlu.entity.po.SysNotice;
import com.oldlu.entity.po.SysUser;
import com.oldlu.entity.vo.MassageTemplateSendVO;
import com.oldlu.entity.vo.SysNoticeVO;
import com.oldlu.service.SysNoticeService;
import com.oldlu.service.SysUserService;
import com.oldlu.websocket.WebSocket;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
import java.util.List;
/**
 * <p>
 * 前端控制器
 * </p>
 *
 * @author oldlu
 * @since 2021-05-06
 */
@RestController
@Api(tags = "消息通知")
@RequestMapping("/sysNotice")
public class SysNoticeController {
    @Autowired
    private SysNoticeService sysNoticeService;
    @Autowired
    private SysUserService sysUserService;
    @Autowired
    private WebSocket webSocket;
    @ApiOperation(value = "管理员通知公告进行上传", notes = "管理员通知公告进行上传")
    @PostMapping
    public ResponseEntity<HttpStatus> insertTSysNotice(@RequestBody SysNoticeForm tSysNoticeForm) throws ApiException {
        sysNoticeService.insertSysNotice(tSysNoticeForm.toPo(SysNotice.class));
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "短信发送", notes = "短信发送")
    @PostMapping("/sendMessage")
    public ResponseEntity<SysNotice> sendMessage(@RequestBody MassageTemplateSendVO massageTemplateSendVO) throws ApiException {
        return ResponseEntity.ok(sysNoticeService.sendMassage(massageTemplateSendVO));
    }
    @ApiOperation(value = "指定用户添加消息", notes = "指定用户添加消息")
    @PostMapping("/pushByUserId")
    public ResponseEntity pushByUserId(String noticeType, String userId, String message, HttpServletRequest request) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        SysUser receiveUser = sysUserService.getById(userId);
        if (receiveUser == null) {
            throw new ApiException("未找到消息接受用户信息,请重新选择。", HttpStatus.BAD_REQUEST);
        }
        SysNotice sysNotice = new SysNotice();
        sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName())
                .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
//        webSocket.sendInfo(userId, message);
        sysNoticeService.insertSysNotice(sysNotice);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "指定多用户添加消息", notes = "给多个用户添加消息")
    @PostMapping(value = "/pushByUserIds")
    public ResponseEntity pushByUserIds(String noticeType, String[] userIds, String message, HttpServletRequest request) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        for (String userId : userIds) {
            SysUser receiveUser = sysUserService.getById(userId);
            if (receiveUser != null) {
                SysNotice sysNotice = new SysNotice();
                sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName())
                        .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
                sysNoticeService.insertSysNotice(sysNotice);
            }
        }
//        webSocket.sendInfoByUsers(userIds, message);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "系统所有用户群发添加消息", notes = "系统所有用户群发添加消息")
    @PostMapping(value = "/pushAll")
    public ResponseEntity pushAll(String message, HttpServletRequest request, String noticeType) {
        SysUser sendUser = getLoginUser(request);
        if (sendUser == null) {
            throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST);
        }
        sysUserService.list().forEach(user -> {
            if (user.getId().equals(sendUser.getId())) {
                return;
            }
            SysNotice sysNotice = new SysNotice();
            sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(user.getId()).setReceiveUserName(user.getName())
                    .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType);
            sysNoticeService.insertSysNotice(sysNotice);
        });
//        webSocket.sendAll(message);
        return ResponseEntity.ok("添加成功!");
    }
    @ApiOperation(value = "设置未已读/未读")
    @PostMapping("/updateReadFlag")
    public ResponseEntity updateReadFlag(String noticeId, String readFlag) {
        sysNoticeService.updateReadFlag(noticeId, readFlag);
        return ResponseEntity.ok("设置成功!");
    }
    @ApiOperation(value = "多条消息设置已读/未读", notes = "多条消息设置已读/未读")
    @PostMapping("/updateReadFlags")
    public ResponseEntity updateReadFlags(String[] noticeIds, String readFlag) {
        for (String noticeId : noticeIds) {
            sysNoticeService.updateReadFlag(noticeId, readFlag);
        }
        return ResponseEntity.ok("设置成功");
    }
    @ApiOperation(value = "批量删除公告", notes = "批量删除公告")
    @DeleteMapping(value = "/deletes")
    public ResponseEntity<HttpStatus> deleteSysNotices(String[] noticeIds) {
        for (String noticeId : noticeIds) {
            sysNoticeService.deleteSysNotice(noticeId);
        }
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "删除公告", notes = "删除公告")
    @ApiImplicitParam(paramType = "path", name = "id", value = "编号", required = true, dataType = "long")
    @DeleteMapping(value = "/{id}")
    public ResponseEntity<HttpStatus> deleteSysNotice(@PathVariable String id) throws ApiException {
        sysNoticeService.deleteSysNotice(id);
        return ResponseEntity.ok().build();
    }
    @ApiOperation(value = "获取公告", notes = "获取指定公告")
    @ApiImplicitParam(paramType = "path", name = "id", value = "ID", required = true, dataType = "long")
    @GetMapping(value = "/{id}")
    public ResponseEntity<SysNoticeVO> getSysNoticeById(@CheckEnum(type = CheckValidatorEnum.Num) @PathVariable String id) {
        return ResponseEntity.ok(sysNoticeService.getSysNoticeById(id));
    }
    @ApiOperation(value = "条件组合查询", notes = "条件组合查询")
    @PostMapping(value = "/list")
    public ResponseEntity<List<SysNoticeVO>> listSysNotices(@RequestBody SysNotice sysNotice) {
        return ResponseEntity.ok(sysNoticeService.listSysNotices(sysNotice));
    }
    @ApiOperation(value = "搜索(分页)", notes = "根据条件搜索")
    @ApiImplicitParam(name = "sysNoticeQueryForm", value = "查询参数", required = true, dataType = "SysNoticeQueryForm")
    @PostMapping("/page")
    public ResponseEntity<IPage<SysNoticeVO>> getSysNoticePage(@Valid @RequestBody SysNoticeQueryForm sysNoticeQueryForm) {
        return ResponseEntity.ok(sysNoticeService.getSysNoticePage(sysNoticeQueryForm.getPage(), sysNoticeQueryForm.toParam(SysNoticeQueryParam.class)));
    }
    @ApiOperation(value = "根据用户ID查询消息", notes = "根据用户ID查询消息")
    @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string")
    @GetMapping("/findAllByUserId/{userId}")
    public ResponseEntity<List<SysNoticeVO>> findAllByUserId(@PathVariable("userId") Long userId) {
        return ResponseEntity.ok(sysNoticeService.findAllByUserId(userId));
    }
    @ApiOperation(value = "根据用户ID已读全部消息", notes = "根据用户ID已读全部消息")
    @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string")
    @GetMapping("/dealAllByUserId/{userId}")
    public ResponseEntity<HttpStatus> dealAllByUserId(@PathVariable("userId") Long userId) {
        sysNoticeService.dealAllByUserId(userId);
        return ResponseEntity.ok().build();
    }
    /**
     * 获取登录用户
     *
     * @param request 请求头
     */
    public SysUser getLoginUser(HttpServletRequest request) {
        String loginUserId = JwtSecurityUtil.getLoginUserId(request);
        if (StringUtils.isBlank(loginUserId)) {
            return null;
        }
        return sysUserService.getById(loginUserId);
    }
}

页面中的调用.每个客户都要初始化一个websocket示例.其中我们用用户的userId作为标识的一部分.//页面加载完成. 初始化一个webSocket对象.然后可以根据需要调一个来发信息

    window.onload = function () {
    initWebSocket();
    setTimeout(function () {
        $.post('<%=basePath %>xxx.do', function (r) {
            //alert(0);
        });
    }, 2000);
};
function initWebSocket() {
    webSocket = new WebSocket(requestUrl.replace("http", "ws")
        + 'websocket/${userId}');
    webSocket.onerror = function (event) {
        onError(event)
    };
    //连接建立成功事件
    webSocket.onopen = function (event) {
        onOpen(event)
    };
    //接收到服务端消息事件
    webSocket.onmessage = function (event) {
        onMessage(event)
    };
}

简易版:(使用时注入即可)

package com.oldlu.socket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
 * @author oldlu
 * 此注解相当于设置访问URL
 */
@Component
@Slf4j
@ServerEndpoint("/websocket") //此注解相当于设置访问URL
public class WebSocket {
    private static Set<WebSocket> webSocketSet = new HashSet<>();
    private Session session;
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        System.out.println("接收的消息是:" + message);
        System.out.println(session);
        //将消息发送给其他的用户
    }
    //发送消息
    public void sendMessage(String message) throws IOException {
        for (WebSocket webSocket : webSocketSet) {
            if (webSocket.session.isOpen())
                webSocket.session.getBasicRemote().sendText(message);
        }
    }
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);
    }
    @OnClose
    public void onClose(Session seesion) {
        System.out.println("连接关闭了。。。");
    }
    @OnError
    public void onError(Session session,Throwable error) {
        System.out.println("出错了。。。。" + error.getMessage());
    }
}


目录
相关文章
|
14天前
|
网络协议 Java Go
【Go语言专栏】Go语言中的WebSocket实时通信应用
【4月更文挑战第30天】Go语言(Golang)是Google开发的编程语言,适用于云计算、微服务等领域。本文介绍了WebSocket,一种实现浏览器与服务器全双工通信的协议,其特点是实时性、全双工和轻量级。在Go中实现WebSocket,可以使用gorilla/websocket库。示例展示了如何创建服务器端和客户端,实现消息的收发。WebSocket广泛应用于聊天、游戏、通知推送和实时数据同步等场景。学习Go语言中的WebSocket对于开发实时通信应用至关重要。
|
12天前
|
监控 安全 API
WebSocket通过建立一个持久的连接实现实时双向通信
【5月更文挑战第2天】WebSocket通过建立一个持久的连接实现实时双向通信
24 4
|
14天前
|
缓存 监控 前端开发
【Go 语言专栏】Go 语言中的 WebSocket 实时通信应用
【4月更文挑战第30天】本文探讨了Go语言在WebSocket实时通信中的应用。WebSocket作为全双工通信协议,允许持续的双向通信。Go语言凭借其高效和并发特性,适合构建实时应用。文中概述了在Go中实现WebSocket的基本步骤,包括服务器和客户端的建立与通信,并列举了实时聊天、数据监控和在线协作等应用案例。同时,强调了消息格式、并发处理、错误处理和安全性的注意事项。通过数据压缩、缓存管理和连接管理等策略可优化性能。Go语言还能与数据库和前端框架结合,提升用户体验。总之,Go语言为WebSocket实时通信提供了强大支持,有望在更多领域发挥作用。
|
14天前
|
JavaScript PHP UED
【PHP开发专栏】PHP与WebSocket实时通信
【4月更文挑战第30天】本文介绍了PHP实现WebSocket实时通信的原理、使用方法和实际案例。WebSocket是基于HTTP的全双工通信协议,PHP 5.4以上版本支持WebSocket,可通过内置函数或第三方库如Socket.io、PHP-WebSocket来实现。文章详细展示了创建WebSocket服务器和客户端的PHP代码示例,并提及在PHP中使用Socket.io库进行实时通信。
|
20天前
|
JSON JavaScript 前端开发
服务器通信:使用WebSocket与后端实时交互
【4月更文挑战第24天】WebSocket为解决服务器与客户端实时双向通信问题而生,常用于聊天、游戏和实时通知等场景。本文通过4步教你实现WebSocket通信:1) 客户端使用JavaScript创建WebSocket连接;2) 监听`open`、`message`和`close`事件;3) 使用`send`方法发送数据;4) 使用`close`方法关闭连接。服务器端则需处理连接和数据发送,具体实现依后端技术而定。WebSocket为现代Web应用的实时交互提供了强大支持。
|
25天前
|
网络协议 Java 关系型数据库
如何公网远程访问本地WebSocket服务端
如何公网远程访问本地WebSocket服务端
|
28天前
|
JavaScript 前端开发 Java
Java WebSocket编程:实现实时通信
【4月更文挑战第16天】本文介绍了Java如何利用WebSocket API实现实时通信。WebSocket协议提供全双工通信,减少延迟,提高效率。Java EE的WebSocket API让开发者能轻松创建WebSocket端点,示例代码展示了端点的生命周期方法。客户端可使用JavaScript的WebSocket API进行连接和通信。安全性是关键,应启用WSS加密并过滤客户端数据。通过学习和实践,开发者能构建出满足现代Web应用实时需求的系统。
|
1月前
|
网络协议 JavaScript 前端开发
深入了解WebSocket:实时双向通信的魔法
深入了解WebSocket:实时双向通信的魔法
|
1月前
|
监控 小程序 前端开发
小程序全栈开发中的WebSocket实时通信实践
【4月更文挑战第12天】本文探讨了小程序全栈开发中WebSocket实时通信的实践,WebSocket作为实现双向实时通信的协议,其长连接特性和双向通信能力在实时聊天、推送、游戏和监控等场景中发挥关键作用。开发者需注意安全性、性能和兼容性问题,以优化用户体验并确保小程序稳定运行。通过掌握WebSocket,开发者能提升小程序的功能性和用户体验。
|
6月前
|
Go API 开发者
Golang Websocket框架:实时通信的新选择
Golang Websocket框架:实时通信的新选择