开发中经常会有这样的使用场景.如某个用户在一个数据上做了xx操作, 与该数据相关的用户在线上的话,需要实时接收到一条信息. 这种可以使用WebSocket来实现. 另外,对于消息,可以定义一个类进行固化. 主要是消息内容,接收人,发送人,是否已发送等. 用户上线时, 通过方法去查询出来然后进行发送
import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @ServerEndpoint(value = "/websocket/{sessionId}") public class MyWebSocket { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static AtomicInteger onlineCount = new AtomicInteger(0); //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 public static CopyOnWriteArraySet<MyWebSocket> webSocketSet = new CopyOnWriteArraySet<MyWebSocket>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 public Session session; /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session) { this.session = session; if (webSocketSet.add(this)) { System.out.println("有新连接加入!当前在线人数为" + onlineCount.incrementAndGet()); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { if (webSocketSet.remove(this)) { System.out.println("有一连接关闭!当前在线人数为" + onlineCount.decrementAndGet()); } } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @OnMessage public void onMessage(String message, Session session) { System.out.println("来自客户端的消息:" + message); //群发消息 /* for(MyWebSocket item: webSocketSet){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } }*/ } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } private static ReentrantLock lock = new ReentrantLock(true); /** * 该方法是我们根据业务需要调用的. * * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { synchronized (this.session) { if (session.isOpen()) { this.session.getAsyncRemote().sendText(message); } } } }
个人代码:
package com.oldlu.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * 开启WebSocket支持 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
package com.oldlu.websocket; import com.alibaba.fastjson.JSONObject; import com.oldlu.common.core.exception.ApiException; import com.oldlu.entity.po.SysUser; import com.oldlu.service.SysUserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * @author oldlu * 此注解相当于设置访问URL */ @Component @Slf4j @ServerEndpoint("/websocket/{sid}") //此注解相当于设置访问URL public class WebSocket { private Session session; //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static AtomicInteger onlineNum = new AtomicInteger(); //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 public static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>(); //发送消息 public void sendMessage(Session session, String message) throws IOException { if(session != null) { synchronized (session) { session.getBasicRemote().sendText(message); } } } //给指定用户发送信息 public void sendInfo(String userId, String message){ Session session = sessionPools.get(userId); try { sendMessage(session, message); } catch (Exception e) { e.printStackTrace(); } } // 指定多用户推送 public void sendInfoByUsers(String[] userIds, String message) { for (String userId : userIds) { sendInfo(userId,message); } } // 群发消息 public void sendAll(String message) { sessionPools.values().forEach(sessionitem -> { try { sendMessage(sessionitem,message); } catch (IOException e) { e.printStackTrace(); } }); } //建立连接成功调用 @OnOpen public void onOpen(Session session, @PathParam(value = "sid") String userName){ // TODO 客户端连接限制 this.session = session; if (sessionPools.get(userName) == null) addOnlineCount(); sessionPools.put(userName, session); System.out.println(userName + "加入webSocket!当前链接为" + onlineNum); } //关闭连接时调用 @OnClose public void onClose(@PathParam(value = "sid") String userName) { sessionPools.remove(userName); subOnlineCount(); System.out.println(userName + "断开webSocket连接!当前链接为" + onlineNum); } //收到客户端信息 @OnMessage public void onMessage(String message) throws IOException{ JSONObject jsonObject = JSONObject.parseObject(message); if (jsonObject.get("cmd").equals("heart")){ try { sendMessage(session, message); } catch(Exception e){ e.printStackTrace(); } } } //错误时调用 @OnError public void onError(Session session, Throwable throwable){ System.out.println("发生错误"); throwable.printStackTrace(); } public static void addOnlineCount(){ onlineNum.incrementAndGet(); } public static void subOnlineCount() { onlineNum.decrementAndGet(); } }
Controller:
package com.oldlu.controller; import com.baomidou.mybatisplus.core.metadata.IPage; import com.oldlu.common.core.annotation.CheckEnum; import com.oldlu.common.core.annotation.CheckValidatorEnum; import com.oldlu.common.core.exception.ApiException; import com.oldlu.common.core.utils.JwtSecurityUtil; import com.oldlu.entity.form.SysNoticeForm; import com.oldlu.entity.form.SysNoticeQueryForm; import com.oldlu.entity.param.SysNoticeQueryParam; import com.oldlu.entity.po.SysNotice; import com.oldlu.entity.po.SysUser; import com.oldlu.entity.vo.MassageTemplateSendVO; import com.oldlu.entity.vo.SysNoticeVO; import com.oldlu.service.SysNoticeService; import com.oldlu.service.SysUserService; import com.oldlu.websocket.WebSocket; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.validation.Valid; import java.util.List; /** * <p> * 前端控制器 * </p> * * @author oldlu * @since 2021-05-06 */ @RestController @Api(tags = "消息通知") @RequestMapping("/sysNotice") public class SysNoticeController { @Autowired private SysNoticeService sysNoticeService; @Autowired private SysUserService sysUserService; @Autowired private WebSocket webSocket; @ApiOperation(value = "管理员通知公告进行上传", notes = "管理员通知公告进行上传") @PostMapping public ResponseEntity<HttpStatus> insertTSysNotice(@RequestBody SysNoticeForm tSysNoticeForm) throws ApiException { sysNoticeService.insertSysNotice(tSysNoticeForm.toPo(SysNotice.class)); return ResponseEntity.ok().build(); } @ApiOperation(value = "短信发送", notes = "短信发送") @PostMapping("/sendMessage") public ResponseEntity<SysNotice> sendMessage(@RequestBody MassageTemplateSendVO massageTemplateSendVO) throws ApiException { return ResponseEntity.ok(sysNoticeService.sendMassage(massageTemplateSendVO)); } @ApiOperation(value = "指定用户添加消息", notes = "指定用户添加消息") @PostMapping("/pushByUserId") public ResponseEntity pushByUserId(String noticeType, String userId, String message, HttpServletRequest request) { SysUser sendUser = getLoginUser(request); if (sendUser == null) { throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST); } SysUser receiveUser = sysUserService.getById(userId); if (receiveUser == null) { throw new ApiException("未找到消息接受用户信息,请重新选择。", HttpStatus.BAD_REQUEST); } SysNotice sysNotice = new SysNotice(); sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName()) .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType); // webSocket.sendInfo(userId, message); sysNoticeService.insertSysNotice(sysNotice); return ResponseEntity.ok("添加成功!"); } @ApiOperation(value = "指定多用户添加消息", notes = "给多个用户添加消息") @PostMapping(value = "/pushByUserIds") public ResponseEntity pushByUserIds(String noticeType, String[] userIds, String message, HttpServletRequest request) { SysUser sendUser = getLoginUser(request); if (sendUser == null) { throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST); } for (String userId : userIds) { SysUser receiveUser = sysUserService.getById(userId); if (receiveUser != null) { SysNotice sysNotice = new SysNotice(); sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(Long.valueOf(userId)).setReceiveUserName(receiveUser.getName()) .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType); sysNoticeService.insertSysNotice(sysNotice); } } // webSocket.sendInfoByUsers(userIds, message); return ResponseEntity.ok("添加成功!"); } @ApiOperation(value = "系统所有用户群发添加消息", notes = "系统所有用户群发添加消息") @PostMapping(value = "/pushAll") public ResponseEntity pushAll(String message, HttpServletRequest request, String noticeType) { SysUser sendUser = getLoginUser(request); if (sendUser == null) { throw new ApiException("未登录,或重新登录。", HttpStatus.BAD_REQUEST); } sysUserService.list().forEach(user -> { if (user.getId().equals(sendUser.getId())) { return; } SysNotice sysNotice = new SysNotice(); sysNotice.setIsReadFlag("1").setNoticeContext(message).setNoticeWay("SYS").setReceiveUserId(user.getId()).setReceiveUserName(user.getName()) .setSendUserId(sendUser.getId()).setSendUserName(sendUser.getName()).setNoticeType(noticeType); sysNoticeService.insertSysNotice(sysNotice); }); // webSocket.sendAll(message); return ResponseEntity.ok("添加成功!"); } @ApiOperation(value = "设置未已读/未读") @PostMapping("/updateReadFlag") public ResponseEntity updateReadFlag(String noticeId, String readFlag) { sysNoticeService.updateReadFlag(noticeId, readFlag); return ResponseEntity.ok("设置成功!"); } @ApiOperation(value = "多条消息设置已读/未读", notes = "多条消息设置已读/未读") @PostMapping("/updateReadFlags") public ResponseEntity updateReadFlags(String[] noticeIds, String readFlag) { for (String noticeId : noticeIds) { sysNoticeService.updateReadFlag(noticeId, readFlag); } return ResponseEntity.ok("设置成功"); } @ApiOperation(value = "批量删除公告", notes = "批量删除公告") @DeleteMapping(value = "/deletes") public ResponseEntity<HttpStatus> deleteSysNotices(String[] noticeIds) { for (String noticeId : noticeIds) { sysNoticeService.deleteSysNotice(noticeId); } return ResponseEntity.ok().build(); } @ApiOperation(value = "删除公告", notes = "删除公告") @ApiImplicitParam(paramType = "path", name = "id", value = "编号", required = true, dataType = "long") @DeleteMapping(value = "/{id}") public ResponseEntity<HttpStatus> deleteSysNotice(@PathVariable String id) throws ApiException { sysNoticeService.deleteSysNotice(id); return ResponseEntity.ok().build(); } @ApiOperation(value = "获取公告", notes = "获取指定公告") @ApiImplicitParam(paramType = "path", name = "id", value = "ID", required = true, dataType = "long") @GetMapping(value = "/{id}") public ResponseEntity<SysNoticeVO> getSysNoticeById(@CheckEnum(type = CheckValidatorEnum.Num) @PathVariable String id) { return ResponseEntity.ok(sysNoticeService.getSysNoticeById(id)); } @ApiOperation(value = "条件组合查询", notes = "条件组合查询") @PostMapping(value = "/list") public ResponseEntity<List<SysNoticeVO>> listSysNotices(@RequestBody SysNotice sysNotice) { return ResponseEntity.ok(sysNoticeService.listSysNotices(sysNotice)); } @ApiOperation(value = "搜索(分页)", notes = "根据条件搜索") @ApiImplicitParam(name = "sysNoticeQueryForm", value = "查询参数", required = true, dataType = "SysNoticeQueryForm") @PostMapping("/page") public ResponseEntity<IPage<SysNoticeVO>> getSysNoticePage(@Valid @RequestBody SysNoticeQueryForm sysNoticeQueryForm) { return ResponseEntity.ok(sysNoticeService.getSysNoticePage(sysNoticeQueryForm.getPage(), sysNoticeQueryForm.toParam(SysNoticeQueryParam.class))); } @ApiOperation(value = "根据用户ID查询消息", notes = "根据用户ID查询消息") @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string") @GetMapping("/findAllByUserId/{userId}") public ResponseEntity<List<SysNoticeVO>> findAllByUserId(@PathVariable("userId") Long userId) { return ResponseEntity.ok(sysNoticeService.findAllByUserId(userId)); } @ApiOperation(value = "根据用户ID已读全部消息", notes = "根据用户ID已读全部消息") @ApiImplicitParam(name = "userId", value = "用户ID", required = true, dataType = "string") @GetMapping("/dealAllByUserId/{userId}") public ResponseEntity<HttpStatus> dealAllByUserId(@PathVariable("userId") Long userId) { sysNoticeService.dealAllByUserId(userId); return ResponseEntity.ok().build(); } /** * 获取登录用户 * * @param request 请求头 */ public SysUser getLoginUser(HttpServletRequest request) { String loginUserId = JwtSecurityUtil.getLoginUserId(request); if (StringUtils.isBlank(loginUserId)) { return null; } return sysUserService.getById(loginUserId); } }
页面中的调用.每个客户都要初始化一个websocket示例.其中我们用用户的userId作为标识的一部分.//页面加载完成. 初始化一个webSocket对象.然后可以根据需要调一个来发信息
window.onload = function () { initWebSocket(); setTimeout(function () { $.post('<%=basePath %>xxx.do', function (r) { //alert(0); }); }, 2000); }; function initWebSocket() { webSocket = new WebSocket(requestUrl.replace("http", "ws") + 'websocket/${userId}'); webSocket.onerror = function (event) { onError(event) }; //连接建立成功事件 webSocket.onopen = function (event) { onOpen(event) }; //接收到服务端消息事件 webSocket.onmessage = function (event) { onMessage(event) }; }
简易版:(使用时注入即可)
package com.oldlu.socket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashSet; import java.util.Set; /** * @author oldlu * 此注解相当于设置访问URL */ @Component @Slf4j @ServerEndpoint("/websocket") //此注解相当于设置访问URL public class WebSocket { private static Set<WebSocket> webSocketSet = new HashSet<>(); private Session session; @OnMessage public void onMessage(String message, Session session) throws IOException { System.out.println("接收的消息是:" + message); System.out.println(session); //将消息发送给其他的用户 } //发送消息 public void sendMessage(String message) throws IOException { for (WebSocket webSocket : webSocketSet) { if (webSocket.session.isOpen()) webSocket.session.getBasicRemote().sendText(message); } } @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); } @OnClose public void onClose(Session seesion) { System.out.println("连接关闭了。。。"); } @OnError public void onError(Session session,Throwable error) { System.out.println("出错了。。。。" + error.getMessage()); } }