1 开始
在本指南中,我们将创建一个基本的聊天应用程序。它几乎不需要 Node.JS 或 Socket.IO 的基本先验知识,因此非常适合所有知识水平的用户。
2 介绍
使用流行的 Web 应用程序堆栈(如 LAMP (PHP))编写聊天应用程序通常非常困难。它涉及轮询服务器以获取更改、跟踪时间戳,并且它比应有的速度慢得多。
传统上,套接字一直是构建大多数实时聊天系统的解决方案,在客户端和服务器之间提供双向通信通道。
这意味着服务器可以向客户端推送消息。每当您编写聊天消息时,其想法是服务器将获取它并将其推送到所有其他连接的客户端。
3 网络框架
第一个目标是建立一个简单的 HTML 网页,提供一个表单和一个消息列表。为此,我们将使用 Node.JS Web 框架express
。确保安装了Node.JS。
首先让我们创建一个package.json
描述我们项目的清单文件。我建议你把它放在一个专门的空目录中(我称之为 mine chat-example
)。
{ "name": "socket-chat-example", "version": "0.0.1", "description": "my first socket.io app", "dependencies": {} }
复制
现在,为了dependencies
用我们需要的东西轻松地填充属性,我们将使用npm install
:
npm install express@4
复制
安装后,我们可以创建一个index.js
文件来设置我们的应用程序。
const express = require('express'); const app = express();const http = require('http'); const server = http.createServer(app); app.get('/', (req, res) => { res.send('<h1>Hello world</h1>');}); server.listen(3000, () => { console.log('listening on *:3000');});
复制
这意味着它:
- Express 初始化
app
为可以提供给 HTTP 服务器的函数处理程序(如第 4 行所示)。 - 我们定义了一个路由处理程序
/
,当我们点击我们的网站主页时它会被调用。
- 我们让 http 服务器监听 3000 端口。
如果您运行,node index.js
您应该看到以下内容:
如果您将浏览器指向http://localhost:3000
:
4 服务 HTML
到目前为止,index.js
我们调用res.send
并传递了一个 HTML 字符串。如果我们只是将整个应用程序的 HTML 放在那里,我们的代码看起来会非常混乱,因此我们将创建一个index.html
文件并提供它。
让我们重构我们的路由处理程序来sendFile
代替使用。
app.get('/', (req, res) => { res.sendFile(__dirname + '/index.html');});
复制
将以下内容放入您的index.html
文件中:
<!DOCTYPE html> <html> <head><title>Socket.IO chat</title> <style> body { margin: 0; padding-bottom: 3rem; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; } #form { background: rgba(0, 0, 0, 0.15); padding: 0.25rem; position: fixed; bottom: 0; left: 0; right: 0; display: flex; height: 3rem; box-sizing: border-box; backdrop-filter: blur(10px); } #input { border: none; padding: 0 1rem; flex-grow: 1; border-radius: 2rem; margin: 0.25rem; } #input:focus { outline: none; } #form > button { background: #333; border: none; padding: 0 1rem; margin: 0.25rem; border-radius: 3px; outline: none; color: #fff; } #messages { list-style-type: none; margin: 0; padding: 0; } #messages > li { padding: 0.5rem 1rem; } #messages > li:nth-child(odd) { background: #efefef; } </style> </head> <body> <ul id="messages"></ul> <form id="form" action=""><input id="input" autocomplete="off"/> <button>Send</button> </form> </body> </html>
复制
如果您重新启动进程(通过按 Control+C 并node index.js
再次运行)并刷新页面,它应该如下所示:
5 集成 Socket.IO
Socket.IO 由两部分组成:
- 与 Node.JS HTTP Server socket.io集成(或安装在其上)的服务器
- 在浏览器端加载的客户端库socket.io-client
在开发过程中,socket.io
自动为我们服务客户端,正如我们将看到的,所以现在我们只需要安装一个模块:
npm install socket.io
复制
这将安装模块并将依赖项添加到package.json
. 现在让我们编辑index.js
添加它:
const express = require('express'); const app = express(); const http = require('http'); const server = http.createServer(app); const {Server} = require("socket.io"); const io = new Server(server); app.get('/', (req, res) => { res.sendFile(__dirname + '/index.html'); }); io.on('connection', (socket) => { console.log('a user connected'); }); server.listen(3000, () => { console.log('listening on *:3000'); });
复制
请注意,我socket.io
通过传递server
(HTTP 服务器)对象初始化了一个新实例。然后我监听connection
传入套接字的事件并将其记录到控制台。
现在在 index.html </body>
(end body 标签)之前添加以下代码段:
<script src="/socket.io/socket.io.js"></script><script> var socket = io();</script>
复制
这就是加载socket.io-client
公开io
全局(和端点GET /socket.io/socket.io.js
)然后连接所需的全部内容。
如果您想使用客户端 JS 文件的本地版本,您可以在node_modules/socket.io/client-dist/socket.io.js
.
请注意,我在调用 时没有指定任何 URL io()
,因为它默认尝试连接到为页面提供服务的主机。
如果您现在重新启动进程(通过按 Control+C 并node index.js
再次运行)然后刷新网页,您应该会看到控制台打印“a user connected”。
尝试打开多个选项卡,您会看到几条消息。
每个套接字还会触发一个特殊disconnect
事件:
io.on('connection', (socket) => { console.log('a user connected'); socket.on('disconnect', () => { console.log('user disconnected'); });});
复制
然后,如果您多次刷新选项卡,您可以看到它在运行。
6 发出事件
Socket.IO 背后的主要思想是您可以发送和接收您想要的任何事件,以及您想要的任何数据。任何可以编码为 JSON 的对象都可以,并且也支持二进制数据。
让我们让它在用户输入消息时,服务器将其作为chat message
事件获取。中的script
部分index.html
现在应如下所示:
<script src="/socket.io/socket.io.js"></script> <script> var socket = io(); var form = document.getElementById('form'); var input = document.getElementById('input'); form.addEventListener('submit', function (e) { e.preventDefault(); if (input.value) { socket.emit('chat message', input.value); input.value = ''; } });</script>
复制
在index.js我们打印出chat message事件:
io.on('connection', (socket) => { socket.on('chat message', (msg) => { console.log('message: ' + msg); });});
复制
结果应类似于以下视频:
7 广播
我们的下一个目标是将事件从服务器发送给其他用户。
为了给大家发送一个事件,Socket.IO给了我们io.emit()
方法。
io.emit('some event', { someProperty: 'some value', otherProperty: 'other value' }); // This will emit the event to all connected sockets
复制
如果你想向除了某个发射套接字之外的所有人发送消息,我们有broadcast
从该套接字发射的标志:
io.on('connection', (socket) => { socket.broadcast.emit('hi');});
复制
在这种情况下,为了简单起见,我们会将消息发送给所有人,包括发件人。
io.on('connection', (socket) => { socket.on('chat message', (msg) => { io.emit('chat message', msg); });});
复制
在客户端,当我们捕获chat message
事件时,我们会将其包含在页面中。在总的客户端JavaScript代码,现在金额为:
<script> var socket = io(); var messages = document.getElementById('messages'); var form = document.getElementById('form'); var input = document.getElementById('input'); form.addEventListener('submit', function (e) { e.preventDefault(); if (input.value) { socket.emit('chat message', input.value); input.value = ''; } }); socket.on('chat message', function (msg) { var item = document.createElement('li'); item.textContent = msg; messages.appendChild(item); window.scrollTo(0, document.body.scrollHeight); });</script>
复制
这样就完成了我们的聊天应用程序,大约用了 20 行代码!这是它的样子:
个人代码:
客户端缓存:
package com.oldlu.socket; import com.corundumstudio.socketio.SocketIOClient; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @Component public class ClientCache { //本地缓存,服务终端消息通知 private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>(); // 存放客户端 链接数据 private static Map<String, SocketIOClient> concurrentClientHashMap = new ConcurrentHashMap<>(); /** * 存入本地缓存 * * @param userId 用户ID * @param sessionId 页面sessionID * @param socketIOClient 页面对应的通道连接信息 */ public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) { HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId); if (sessionIdClientCache == null) { sessionIdClientCache = new HashMap<>(); } sessionIdClientCache.put(sessionId, socketIOClient); concurrentHashMap.put(userId, sessionIdClientCache); } /** * 根据用户ID获取所有通道信息 * * @param userId 用户id * @return 返回信息 */ public HashMap<UUID, SocketIOClient> getUserClient(String userId) { return concurrentHashMap.get(userId); } /** * 获取所有的通讯端内容 * * @return 返回信息 */ public Map<String, HashMap<UUID, SocketIOClient>> getAllClient() { return concurrentHashMap; } /** * 根据用户ID及页面sessionID删除页面链接信息 * * @param userId 用户id * @param sessionId 本次sessionId */ public void deleteSessionClient(String userId, UUID sessionId) { //先删除 if (concurrentHashMap.containsKey(userId)) { concurrentHashMap.get(userId).remove(sessionId); } //查询 HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId); if (sessionIdClientCache == null) { //如果已为空,删除键 concurrentHashMap.remove(userId); } } /** * 存入本地缓存 * * @param userId 用户ID * @param socketIOClient 页面对应的通道连接信息 */ public void saveForClient(String userId, SocketIOClient socketIOClient) { if (StringUtils.isNotEmpty(userId)) { concurrentClientHashMap.put(userId, socketIOClient); } } /** * 根据用户ID获取所有通道信息 * * @param userId 用户id * @return 返回信息 */ public SocketIOClient getUserForClient(String userId) { return concurrentClientHashMap.get(userId); } /** * 获取所有的通讯端内容 * * @return 返回信息 */ public Map<String, SocketIOClient> getAllForClient() { return concurrentClientHashMap; } /** * 根据用户ID及页面sessionID删除页面链接信息 * * @param userId 用户id */ public void deleteSessionForClient(String userId) { concurrentClientHashMap.remove(userId); } /** * 根据用户ID及页面sessionID删除页面链接信息 */ public void clear() { concurrentHashMap.clear(); } }
socketIO配置
package com.oldlu.socket; import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Slf4j @Component public class SocketIoConfig { @Value("${socket.host}") private String host; @Value("${socket.port}") private Integer port; @Value("${socket.bossCount}") private int bossCount; @Value("${socket.workCount}") private int workCount; @Value("${socket.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socket.upgradeTimeout}") private int upgradeTimeout; @Value("${socket.pingTimeout}") private int pingTimeout; @Value("${socket.pingInterval}") private int pingInterval; @Bean public SocketIOServer getSocketIOServer() { log.info("创建 SocketIOServer 开始"); SocketConfig socketConfig = new SocketConfig(); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration(); //configuration.setTransports(Transport.POLLING, Transport.WEBSOCKET); //configuration.setOrigin(":*:"); configuration.setHostname(host); configuration.setPort(port); configuration.setBossThreads(bossCount); configuration.setWorkerThreads(workCount); configuration.setAllowCustomRequests(allowCustomRequests); configuration.setUpgradeTimeout(upgradeTimeout); configuration.setPingTimeout(pingTimeout); configuration.setPingInterval(pingInterval); // 握手协议参数使用JWT的Token认证方案 认证方案 configuration.setAuthorizationListener(data -> { /* HttpHeaders httpHeaders = data.getHttpHeaders(); String token = httpHeaders.get("Authorization");*/ return true; }); configuration.setSocketConfig(socketConfig); log.info("创建 SocketIOServer 结束"); return new SocketIOServer(configuration); } }
socketIO封装
package com.oldlu.socket; import com.alibaba.fastjson.JSONObject; import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.oldlu.DAO.SysNoticeMapper; import com.oldlu.DAO.SysUserMapper; import com.oldlu.entity.form.SocketNoticeForm; import com.oldlu.entity.po.SysNoticeUser; import com.oldlu.entity.po.SysUser; import com.oldlu.entity.vo.Conference; import com.oldlu.entity.vo.ConferenceNode; import com.oldlu.entity.vo.SocketNotice; import com.oldlu.feign.MeetingFeignService; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Component @Slf4j public class SocketServer implements CommandLineRunner { private static final String USER_ID = "userId"; private static final String CONFERENCE_ID = "conferenceId"; private static final String IS_SYSTEM = "is_system"; @Value("${socket.host}") String socketHost; @Value("${oauth.token-url}") String tokenUrl; @Resource SysUserMapper sysUserMapper; @Resource MeetingFeignService meetingFeignService; @Resource RedisTemplate redisTemplate; @Resource SysNoticeMapper sysNoticeMapper; @Resource SocketIOServer socketIOServer; private ClientCache clientCache; @Autowired public void setClientCache(ClientCache clientCache) { this.clientCache = clientCache; } /** * 自定义事件,用于服务端与客户端的通信 */ private static final String RECEIVE_CLIENT_EVENT = "receive_client_event"; private static final String OFFLINE_NOTICE = "offline_notice"; private static final String ONLINE_NOTICE = "online_notice"; @Override public void run(String... args) { startSocketServer(); } public void startSocketServer() { log.info("启动socket服务成功"); log.info("输出参数" + socketIOServer.getConfiguration().toString()); clientCache.clear(); socketIOServer.addConnectListener(this::connected); socketIOServer.addDisconnectListener(this::disConnected); socketIOServer.startAsync().addListener(this::startFinished); socketIOServer.addEventListener(RECEIVE_CLIENT_EVENT, String.class, this::addClientListener); } private void startFinished(Future<? super Void> future) { if (!future.isSuccess()) { log.error("webSocket server start failed"); System.exit(-1); } } private void connected(SocketIOClient socketIOClient) { try { String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID); String isSystem = socketIOClient.getHandshakeData().getSingleUrlParam(IS_SYSTEM); if (StringUtils.isNotBlank(userId) && !userId.equals("undefined")) { UUID sessionId = socketIOClient.getSessionId(); clientCache.saveClient(userId, sessionId, socketIOClient); clientCache.saveForClient(userId, socketIOClient); log.error("{}登录", userId); if (StringUtils.isBlank(isSystem)) { String socketConferenceId = socketIOClient.getHandshakeData().getSingleUrlParam(CONFERENCE_ID); Long conferenceId = Long.valueOf(socketConferenceId); SysUser sysUser = sysUserMapper.selectById(userId); //上线 Map<String, Object> socketUser = sysNoticeMapper.getSocketUser(userId, socketConferenceId); if (socketUser == null) { //增加socket在线人员 sysNoticeMapper.insertSocketUser(userId, sysUser != null ? sysUser.getName() : "", socketConferenceId, "0"); } else { //重新登录 sysNoticeMapper.updateSocketUser("0", socketUser.get("id").toString()); } //推送上线 ResponseEntity<Set<String>> allUsersNoSelf = meetingFeignService.getAllUsersNoSelf(conferenceId, Long.valueOf(userId)); Set<String> sendFor = allUsersNoSelf != null && allUsersNoSelf.getStatusCode().is2xxSuccessful() ? allUsersNoSelf.getBody() : getOnlineUserIds(); JSONObject jsonObject = new JSONObject(); jsonObject.put("userName", sysUser != null ? sysUser.getName() : ""); sendSocketTask(userId, sendFor, jsonObject, ONLINE_NOTICE, conferenceId, "1", null); log.error("---------------------------------{}登录推送", userId); // 推送当前会议状态树 // 更新会议树任务栏信息 updateHeadNode(conferenceId); // 获取会议树 Set<String> online4ClientUserIds = getOnline4ClientUserIds(); JSONObject conferenceNote = getConferenceNote(conferenceId); // 推送会议树 sendSocketTask(userId, online4ClientUserIds, conferenceNote, "note_tree", conferenceId, "1", "上线通知"); } } } catch (Exception e) { e.printStackTrace(); disConnected(socketIOClient); } } private void disConnected(SocketIOClient socketIOClient) { log.info("有服务断开连接,连接" + socketIOClient.getRemoteAddress()); try { String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID); String isSystem = socketIOClient.getHandshakeData().getSingleUrlParam(IS_SYSTEM); JSONObject jsonObject = new JSONObject(); log.error("{}下线", userId); if (StringUtils.isNotEmpty(userId) && !userId.equals("undefined")) { List<SysNoticeUser> socketUser = sysNoticeMapper.findSocketUser(userId); clientCache.deleteSessionClient(userId, socketIOClient.getSessionId()); clientCache.deleteSessionForClient(userId); //当userId下所有sessionId为空时再下线 Map<String, HashMap<UUID, SocketIOClient>> allClient = clientCache.getAllClient(); if (allClient.containsKey(userId)) { HashMap<UUID, SocketIOClient> uuidSocketIOClientHashMap = allClient.get(userId); if (uuidSocketIOClientHashMap == null || uuidSocketIOClientHashMap.size() == 0) { sysNoticeMapper.socketUserDisConnected("1", userId, null); } else { String addr = socketIOClient.getRemoteAddress().toString(); Collection<SocketIOClient> values = uuidSocketIOClientHashMap.values(); for (SocketIOClient value : values) { if (value.getRemoteAddress() != null && value.getRemoteAddress().toString().contains(":")) { String[] split = value.getRemoteAddress().toString().split(":"); String[] split1 = addr.split(":"); if (split[0].equals(split1[0])) { clientCache.deleteSessionClient(userId, value.getSessionId()); HashMap<UUID, SocketIOClient> userClient2 = clientCache.getUserClient(userId); if (userClient2 == null || userClient2.size() == 0) { sysNoticeMapper.socketUserDisConnected("1", userId, null); } } } } } } if (StringUtils.isBlank(isSystem)) { //查询下线人员是否正在推流 List<Conference> conferences = sysNoticeMapper.findWebrtc(Long.valueOf(userId)); for (Conference conference : conferences) { if (conference.getIsShare().equals("1")) { //重置推流广播 SocketNoticeForm socketNoticeForm = new SocketNoticeForm(); socketNoticeForm.setConferenceId(conference.getId()); socketNoticeForm.setUserId(Long.valueOf(userId)); socketNoticeForm.setUserName(""); meetingFeignService.endSameScreen(socketNoticeForm); } updateHeadNode(conference.getId()); // 获取会议树 JSONObject conferenceNote = getConferenceNote(conference.getId()); // 获取会议树 Set<String> userIds = new HashSet<>(); userIds.add(userId); // 推送会议树 sendSocketTask(userId, getOnline4ClientUserIds(), conferenceNote, "note_tree", conference.getId(), "1", "下线通知:" + userId); } //推送时先从redis中获取key和过期时间/已过期,不在推送/未过期,再推送 String key = String.format("conference:%s:%s", userId, OFFLINE_NOTICE); if (!redisTemplate.hasKey(key)) { //取会议ID jsonObject.put("userName", sysUserMapper.findByName(userId) == null ? "" : sysUserMapper.findByName(userId)); socketUser.stream().forEach(su -> { this.sendSocketTask(userId, getOnlineUserIds(), jsonObject, OFFLINE_NOTICE, Long.valueOf(su.getConferenceId()), "1", "socket断开"); // 更新会议树任务栏信息 updateHeadNode(Long.valueOf(su.getConferenceId())); // 获取会议树 Set<String> online4ClientUserIds = getOnline4ClientUserIds(); JSONObject conferenceNote = getConferenceNote(Long.valueOf(su.getConferenceId())); // 推送会议树 sendSocketTask(userId, online4ClientUserIds, conferenceNote, "note_tree", Long.valueOf(su.getConferenceId()), "1", "下线通知"); }); log.error("{}下线推送", userId); } } } else { log.info("接收到用户id为空,无法从换从剔除socket信息"); } } catch (Exception e) { e.printStackTrace(); disConnected(socketIOClient); } } /** * 给指定的客户端发送socket 通知 */ public void sendSocketTask(String sendUserId, Set<String> userIds, JSONObject message, String destination, Long conferenceId, String sendType, String valueName) { Map<String, Object> toIPMap = new HashMap<>(); for (String userId : userIds) { log.info("给客户端id为{},推送socket监听接口消息,推送的消息内容{}", userId, message); if (getOnlineUserIds().contains(userId)) { ValueOperations<String, String> operations = redisTemplate.opsForValue(); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); if (destination.equals(OFFLINE_NOTICE)) { //key组成->conference:用户ID:事件 String key = String.format("conference:%s:%s", sendUserId, OFFLINE_NOTICE); //判断key是否存在 if (!redisTemplate.hasKey(key)) { //将发送事件和接收人作为key,存在redis中,设置过期时间 operations.set(key, "已下线", 10000, TimeUnit.MILLISECONDS); } } else if (destination.equals(ONLINE_NOTICE)) { //key组成->conference:用户ID:事件 String key = String.format("conference:%s:%s", sendUserId, ONLINE_NOTICE); //判断key是否存在 if (!redisTemplate.hasKey(key)) { //将发送事件和接收人作为key,存在redis中,设置过期时间 operations.set(key, "已上线", 10000, TimeUnit.MILLISECONDS); } } HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId); if (userClient != null) { for (SocketIOClient socketIOClient : userClient.values()) { if (socketIOClient != null) { toIPMap.put(userId, socketIOClient.getRemoteAddress()); //socketIOClient.sendEvent(destination, message); // 在此进行比较当前会议ID与socket记录会议ID是否相等(相等进行推送,否则不推送) if (conferenceId == null || conferenceId == 0) { socketIOClient.sendEvent(destination, message); } else { List<SysNoticeUser> socketUser = sysNoticeMapper.findAllSocketUser(userId); if (socketUser.size() > 0) { socketUser.stream().filter(su -> su.getConferenceId().equals(String.valueOf(conferenceId))).forEach(su -> socketIOClient.sendEvent(destination, message) ); } } } else { log.info("该服务已经断开链接,无法推送消息"); } } } } } if (!destination.equals("attend_login")) { // 在此处执行推送信息记录 SocketNotice socketNotice = new SocketNotice(); socketNotice.setConferenceId(conferenceId == null ? 0 : conferenceId); socketNotice.setUserId(sendUserId); socketNotice.setUserIds(userIds.toString()); socketNotice.setMessage(message.toJSONString()); socketNotice.setDestination(destination); socketNotice.setSendType(sendType); socketNotice.setUserName(sysUserMapper.findByName(sendUserId) == null ? "" : sysUserMapper.findByName(sendUserId)); socketNotice.setFromIp(socketHost); socketNotice.setToIp(toIPMap.toString()); socketNotice.setValueName(valueName); socketNotice.setOnline(getOnlineUserIds().toString()); sysNoticeMapper.insertNotice(socketNotice); } } public void addClientListener(SocketIOClient socketIOClient, String request, AckRequest ackRequest) { String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID); log.info("监听到客户端发送给服务端的消息,消息内容{},发送的客户端为{}", request, userId); ackRequest.isAckRequested(); } /** * 给所有的客户端发送指定消息 */ public void sendSocketTaskToAll(JSONObject message, String destination) { log.info("目前共有在线客户端:{},给所有客户端推送的消息内容{}", clientCache, message); for (SocketIOClient socketIOClient : clientCache.getAllForClient().values()) { socketIOClient.sendEvent(destination, message); } } /** * 获取当前所有在线的UserIds * * @return 返回在线人员列表信息 */ public Integer getOnlineNum() { return clientCache.getAllClient().keySet().size(); } /** * 获取当前所有在线的UserIds * * @return 返回在线人员列表信息 */ public Set<String> getOnlineUserIds() { return clientCache.getAllClient().keySet(); } /** * 获取当前所有在线的基本信息 * * @return 返回在线人员列表信息 */ public Map<String, HashMap<UUID, SocketIOClient>> getOnline() { return clientCache.getAllClient(); } /** * 获取当前所有在线的UserIds * * @return 返回在线人员列表信息 */ public Set<String> getOnline4ClientUserIds() { return clientCache.getAllForClient().keySet(); } private JSONObject getConferenceNote(Long conferenceId) { JSONObject jsonObject = new JSONObject(new LinkedHashMap<>()); ConferenceNode conferenceNode = sysNoticeMapper.findByConferenceNodeByConferenceId(conferenceId); // 会议状态conference_status jsonObject.put("conferenceStatus", conferenceNode.getConferenceStatus()); if (StringUtils.isNotBlank(conferenceNode.getConferenceContent())) { conferenceNode.setConferenceContent(conferenceNode.getConferenceContent().replaceAll("\t", "").replaceAll("\n", "")); jsonObject.put("conferenceInfo", JSONObject.parseObject(conferenceNode.getConferenceContent())); } if (StringUtils.isNotBlank(conferenceNode.getTopicContent())) { jsonObject.put("topicInfo", JSONObject.parseObject(conferenceNode.getTopicContent())); } if (StringUtils.isNotBlank(conferenceNode.getCountersignData())) { jsonObject.put("countersignData", JSONObject.parseObject(conferenceNode.getCountersignData())); } if (StringUtils.isNotBlank(conferenceNode.getTimer())) { jsonObject.put("timer", JSONObject.parseObject(conferenceNode.getTimer())); } if (StringUtils.isNotBlank(conferenceNode.getSameScreen())) { jsonObject.put("sameScreen", JSONObject.parseObject(conferenceNode.getSameScreen())); } if (StringUtils.isNotBlank(conferenceNode.getHeadData())) { jsonObject.put("headData", JSONObject.parseObject(conferenceNode.getHeadData())); } if (StringUtils.isNotBlank(conferenceNode.getSignInData())) { jsonObject.put("signInData", JSONObject.parseObject(conferenceNode.getSignInData())); // 签到信息 } return jsonObject; } // 更新任务栏数据 private void updateHeadNode(Long conferenceId) { // 更新会议树任务栏信息 String headData = sysNoticeMapper.getHeadData(conferenceId); if (StringUtils.isNotBlank(headData)) { JSONObject jsonObject = JSONObject.parseObject(headData); jsonObject.put("onLineNumber", sysNoticeMapper.getOnLineNum(conferenceId.toString()) == null ? 0 : sysNoticeMapper.getOnLineNum(conferenceId.toString())); sysNoticeMapper.updateHedaData(jsonObject.toJSONString(), conferenceId); } } }