Socket.IO服务端与客户端消息通讯

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Socket.IO服务端与客户端消息通讯

1 开始

在本指南中,我们将创建一个基本的聊天应用程序。它几乎不需要 Node.JS 或 Socket.IO 的基本先验知识,因此非常适合所有知识水平的用户。

2 介绍

使用流行的 Web 应用程序堆栈(如 LAMP (PHP))编写聊天应用程序通常非常困难。它涉及轮询服务器以获取更改、跟踪时间戳,并且它比应有的速度慢得多。


传统上,套接字一直是构建大多数实时聊天系统的解决方案,在客户端和服务器之间提供双向通信通道。


这意味着服务器可以向客户端推送消息。每当您编写聊天消息时,其想法是服务器将获取它并将其推送到所有其他连接的客户端。

3 网络框架

第一个目标是建立一个简单的 HTML 网页,提供一个表单和一个消息列表。为此,我们将使用 Node.JS Web 框架express。确保安装了Node.JS。

首先让我们创建一个package.json描述我们项目的清单文件。我建议你把它放在一个专门的空目录中(我称之为 mine chat-example)。

{  "name": "socket-chat-example", 
 "version": "0.0.1",  
 "description": "my first socket.io app", 
  "dependencies": {}
  }

复制

现在,为了dependencies用我们需要的东西轻松地填充属性,我们将使用npm install

npm install express@4

复制

安装后,我们可以创建一个index.js文件来设置我们的应用程序。

const express = require('express');
const app = express();const http = require('http');
const server = http.createServer(app);
app.get('/', (req, res) => {  res.send('<h1>Hello world</h1>');});
server.listen(3000, () => {  console.log('listening on *:3000');});

复制

这意味着它:

  • Express 初始化app为可以提供给 HTTP 服务器的函数处理程序(如第 4 行所示)。
  • 我们定义了一个路由处理程序/,当我们点击我们的网站主页时它会被调用。
  • 我们让 http 服务器监听 3000 端口。

如果您运行,node index.js您应该看到以下内容:

如果您将浏览器指向http://localhost:3000

4 服务 HTML

到目前为止,index.js我们调用res.send并传递了一个 HTML 字符串。如果我们只是将整个应用程序的 HTML 放在那里,我们的代码看起来会非常混乱,因此我们将创建一个index.html文件并提供它。

让我们重构我们的路由处理程序来sendFile代替使用。

app.get('/', (req, res) => {  res.sendFile(__dirname + '/index.html');});

复制

将以下内容放入您的index.html文件中:

<!DOCTYPE html>
<html>
<head><title>Socket.IO chat</title>
    <style>      body {
        margin: 0;
        padding-bottom: 3rem;
        font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
    }
    #form {
        background: rgba(0, 0, 0, 0.15);
        padding: 0.25rem;
        position: fixed;
        bottom: 0;
        left: 0;
        right: 0;
        display: flex;
        height: 3rem;
        box-sizing: border-box;
        backdrop-filter: blur(10px);
    }
    #input {
        border: none;
        padding: 0 1rem;
        flex-grow: 1;
        border-radius: 2rem;
        margin: 0.25rem;
    }
    #input:focus {
        outline: none;
    }
    #form > button {
        background: #333;
        border: none;
        padding: 0 1rem;
        margin: 0.25rem;
        border-radius: 3px;
        outline: none;
        color: #fff;
    }
    #messages {
        list-style-type: none;
        margin: 0;
        padding: 0;
    }
    #messages > li {
        padding: 0.5rem 1rem;
    }
    #messages > li:nth-child(odd) {
        background: #efefef;
    }    </style>
</head>
<body>
<ul id="messages"></ul>
<form id="form" action=""><input id="input" autocomplete="off"/>
    <button>Send</button>
</form>
</body>
</html>

复制

如果您重新启动进程(通过按 Control+C 并node index.js再次运行)并刷新页面,它应该如下所示:

5 集成 Socket.IO

Socket.IO 由两部分组成:

  • 与 Node.JS HTTP Server socket.io集成(或安装在其上)的服务器
  • 在浏览器端加载的客户端库socket.io-client

在开发过程中,socket.io自动为我们服务客户端,正如我们将看到的,所以现在我们只需要安装一个模块:

npm install socket.io

复制

这将安装模块并将依赖项添加到package.json. 现在让我们编辑index.js添加它:

const express = require('express');
const app = express();
const http = require('http');
const server = http.createServer(app);
const {Server} = require("socket.io");
const io = new Server(server);
app.get('/', (req, res) => {
    res.sendFile(__dirname + '/index.html');
});
io.on('connection', (socket) => {
    console.log('a user connected');
});
server.listen(3000, () => {
    console.log('listening on *:3000');
});

复制

请注意,我socket.io通过传递server(HTTP 服务器)对象初始化了一个新实例。然后我监听connection传入套接字的事件并将其记录到控制台。

现在在 index.html </body>(end body 标签)之前添加以下代码段:

<script src="/socket.io/socket.io.js"></script><script>  var socket = io();</script>

复制

这就是加载socket.io-client公开io全局(和端点GET /socket.io/socket.io.js)然后连接所需的全部内容。

如果您想使用客户端 JS 文件的本地版本,您可以在node_modules/socket.io/client-dist/socket.io.js.

请注意,我在调用 时没有指定任何 URL io(),因为它默认尝试连接到为页面提供服务的主机。

如果您现在重新启动进程(通过按 Control+C 并node index.js再次运行)然后刷新网页,您应该会看到控制台打印“a user connected”。

尝试打开多个选项卡,您会看到几条消息。

每个套接字还会触发一个特殊disconnect事件:

io.on('connection', (socket) => {  console.log('a user connected');  socket.on('disconnect', () => {    console.log('user disconnected');  });});

复制

然后,如果您多次刷新选项卡,您可以看到它在运行。

6 发出事件

Socket.IO 背后的主要思想是您可以发送和接收您想要的任何事件,以及您想要的任何数据。任何可以编码为 JSON 的对象都可以,并且也支持二进制数据

让我们让它在用户输入消息时,服务器将其作为chat message事件获取。中的script部分index.html现在应如下所示:

<script src="/socket.io/socket.io.js"></script>
<script>  var socket = io();
var form = document.getElementById('form');
var input = document.getElementById('input');
form.addEventListener('submit', function (e) {
    e.preventDefault();
    if (input.value) {
        socket.emit('chat message', input.value);
        input.value = '';
    }
});</script>

复制


在index.js我们打印出chat message事件:

io.on('connection', (socket) =>
 {  socket.on('chat message', (msg) => 
{    console.log('message: ' + msg);  });});

复制

结果应类似于以下视频:

7 广播

我们的下一个目标是将事件从服务器发送给其他用户。

为了给大家发送一个事件,Socket.IO给了我们io.emit()方法。

io.emit('some event', { someProperty: 'some value', otherProperty: 'other value' }); // This will emit the event to all connected sockets

复制

如果你想向除了某个发射套接字之外的所有人发送消息,我们有broadcast从该套接字发射的标志:

io.on('connection', (socket) => {  socket.broadcast.emit('hi');});

复制

在这种情况下,为了简单起见,我们会将消息发送给所有人,包括发件人。

io.on('connection', (socket) => {  socket.on('chat message', (msg) => {    io.emit('chat message', msg);  });});

复制

在客户端,当我们捕获chat message事件时,我们会将其包含在页面中。在总的客户端JavaScript代码,现在金额为:

<script>  var socket = io();
var messages = document.getElementById('messages');
var form = document.getElementById('form');
var input = document.getElementById('input');
form.addEventListener('submit', function (e) {
    e.preventDefault();
    if (input.value) {
        socket.emit('chat message', input.value);
        input.value = '';
    }
});
socket.on('chat message', function (msg) {
    var item = document.createElement('li');
    item.textContent = msg;
    messages.appendChild(item);
    window.scrollTo(0, document.body.scrollHeight);
});</script>

复制

这样就完成了我们的聊天应用程序,大约用了 20 行代码!这是它的样子:

个人代码:

客户端缓存:

package com.oldlu.socket;
import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class ClientCache {
    //本地缓存,服务终端消息通知
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();
    // 存放客户端 链接数据
    private static Map<String, SocketIOClient> concurrentClientHashMap = new ConcurrentHashMap<>();
    /**
     * 存入本地缓存
     *
     * @param userId         用户ID
     * @param sessionId      页面sessionID
     * @param socketIOClient 页面对应的通道连接信息
     */
    public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {
        HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);
        if (sessionIdClientCache == null) {
            sessionIdClientCache = new HashMap<>();
        }
        sessionIdClientCache.put(sessionId, socketIOClient);
        concurrentHashMap.put(userId, sessionIdClientCache);
    }
    /**
     * 根据用户ID获取所有通道信息
     *
     * @param userId 用户id
     * @return 返回信息
     */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId) {
        return concurrentHashMap.get(userId);
    }
    /**
     * 获取所有的通讯端内容
     *
     * @return 返回信息
     */
    public Map<String, HashMap<UUID, SocketIOClient>> getAllClient() {
        return concurrentHashMap;
    }
    /**
     * 根据用户ID及页面sessionID删除页面链接信息
     *
     * @param userId    用户id
     * @param sessionId 本次sessionId
     */
    public void deleteSessionClient(String userId, UUID sessionId) {
        //先删除
        if (concurrentHashMap.containsKey(userId)) {
            concurrentHashMap.get(userId).remove(sessionId);
        }
        //查询
        HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);
        if (sessionIdClientCache == null) {
            //如果已为空,删除键
            concurrentHashMap.remove(userId);
        }
    }
    /**
     * 存入本地缓存
     *
     * @param userId         用户ID
     * @param socketIOClient 页面对应的通道连接信息
     */
    public void saveForClient(String userId, SocketIOClient socketIOClient) {
        if (StringUtils.isNotEmpty(userId)) {
            concurrentClientHashMap.put(userId, socketIOClient);
        }
    }
    /**
     * 根据用户ID获取所有通道信息
     *
     * @param userId 用户id
     * @return 返回信息
     */
    public SocketIOClient getUserForClient(String userId) {
        return concurrentClientHashMap.get(userId);
    }
    /**
     * 获取所有的通讯端内容
     *
     * @return 返回信息
     */
    public Map<String, SocketIOClient> getAllForClient() {
        return concurrentClientHashMap;
    }
    /**
     * 根据用户ID及页面sessionID删除页面链接信息
     *
     * @param userId 用户id
     */
    public void deleteSessionForClient(String userId) {
        concurrentClientHashMap.remove(userId);
    }
    /**
     * 根据用户ID及页面sessionID删除页面链接信息
     */
    public void clear() {
        concurrentHashMap.clear();
    }
}

socketIO配置

package com.oldlu.socket;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SocketIoConfig {
    @Value("${socket.host}")
    private String host;
    @Value("${socket.port}")
    private Integer port;
    @Value("${socket.bossCount}")
    private int bossCount;
    @Value("${socket.workCount}")
    private int workCount;
    @Value("${socket.allowCustomRequests}")
    private boolean allowCustomRequests;
    @Value("${socket.upgradeTimeout}")
    private int upgradeTimeout;
    @Value("${socket.pingTimeout}")
    private int pingTimeout;
    @Value("${socket.pingInterval}")
    private int pingInterval;
    @Bean
    public SocketIOServer getSocketIOServer() {
        log.info("创建 SocketIOServer 开始");
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
        //configuration.setTransports(Transport.POLLING, Transport.WEBSOCKET);
        //configuration.setOrigin(":*:");
        configuration.setHostname(host);
        configuration.setPort(port);
        configuration.setBossThreads(bossCount);
        configuration.setWorkerThreads(workCount);
        configuration.setAllowCustomRequests(allowCustomRequests);
        configuration.setUpgradeTimeout(upgradeTimeout);
        configuration.setPingTimeout(pingTimeout);
        configuration.setPingInterval(pingInterval);
        // 握手协议参数使用JWT的Token认证方案 认证方案
        configuration.setAuthorizationListener(data -> {
           /* HttpHeaders httpHeaders = data.getHttpHeaders();
            String token = httpHeaders.get("Authorization");*/
            return true;
        });
        configuration.setSocketConfig(socketConfig);
        log.info("创建 SocketIOServer 结束");
        return new SocketIOServer(configuration);
    }
}

socketIO封装

package com.oldlu.socket;
import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.oldlu.DAO.SysNoticeMapper;
import com.oldlu.DAO.SysUserMapper;
import com.oldlu.entity.form.SocketNoticeForm;
import com.oldlu.entity.po.SysNoticeUser;
import com.oldlu.entity.po.SysUser;
import com.oldlu.entity.vo.Conference;
import com.oldlu.entity.vo.ConferenceNode;
import com.oldlu.entity.vo.SocketNotice;
import com.oldlu.feign.MeetingFeignService;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class SocketServer implements CommandLineRunner {
    private static final String USER_ID = "userId";
    private static final String CONFERENCE_ID = "conferenceId";
    private static final String IS_SYSTEM = "is_system";
    @Value("${socket.host}")
    String socketHost;
    @Value("${oauth.token-url}")
    String tokenUrl;
    @Resource
    SysUserMapper sysUserMapper;
    @Resource
    MeetingFeignService meetingFeignService;
    @Resource
    RedisTemplate redisTemplate;
    @Resource
    SysNoticeMapper sysNoticeMapper;
    @Resource
    SocketIOServer socketIOServer;
    private ClientCache clientCache;
    @Autowired
    public void setClientCache(ClientCache clientCache) {
        this.clientCache = clientCache;
    }
    /**
     * 自定义事件,用于服务端与客户端的通信
     */
    private static final String RECEIVE_CLIENT_EVENT = "receive_client_event";
    private static final String OFFLINE_NOTICE = "offline_notice";
    private static final String ONLINE_NOTICE = "online_notice";
    @Override
    public void run(String... args) {
        startSocketServer();
    }
    public void startSocketServer() {
        log.info("启动socket服务成功");
        log.info("输出参数" + socketIOServer.getConfiguration().toString());
        clientCache.clear();
        socketIOServer.addConnectListener(this::connected);
        socketIOServer.addDisconnectListener(this::disConnected);
        socketIOServer.startAsync().addListener(this::startFinished);
        socketIOServer.addEventListener(RECEIVE_CLIENT_EVENT, String.class, this::addClientListener);
    }
    private void startFinished(Future<? super Void> future) {
        if (!future.isSuccess()) {
            log.error("webSocket server start failed");
            System.exit(-1);
        }
    }
    private void connected(SocketIOClient socketIOClient) {
        try {
            String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
            String isSystem = socketIOClient.getHandshakeData().getSingleUrlParam(IS_SYSTEM);
            if (StringUtils.isNotBlank(userId) && !userId.equals("undefined")) {
                UUID sessionId = socketIOClient.getSessionId();
                clientCache.saveClient(userId, sessionId, socketIOClient);
                clientCache.saveForClient(userId, socketIOClient);
                log.error("{}登录", userId);
                if (StringUtils.isBlank(isSystem)) {
                    String socketConferenceId = socketIOClient.getHandshakeData().getSingleUrlParam(CONFERENCE_ID);
                    Long conferenceId = Long.valueOf(socketConferenceId);
                    SysUser sysUser = sysUserMapper.selectById(userId);
                    //上线
                    Map<String, Object> socketUser = sysNoticeMapper.getSocketUser(userId, socketConferenceId);
                    if (socketUser == null) {
                        //增加socket在线人员
                        sysNoticeMapper.insertSocketUser(userId, sysUser != null ? sysUser.getName() : "", socketConferenceId, "0");
                    } else {
                        //重新登录
                        sysNoticeMapper.updateSocketUser("0", socketUser.get("id").toString());
                    }
                    //推送上线
                    ResponseEntity<Set<String>> allUsersNoSelf = meetingFeignService.getAllUsersNoSelf(conferenceId, Long.valueOf(userId));
                    Set<String> sendFor = allUsersNoSelf != null && allUsersNoSelf.getStatusCode().is2xxSuccessful() ? allUsersNoSelf.getBody() : getOnlineUserIds();
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("userName", sysUser != null ? sysUser.getName() : "");
                    sendSocketTask(userId, sendFor, jsonObject, ONLINE_NOTICE, conferenceId, "1", null);
                    log.error("---------------------------------{}登录推送", userId);
                    // 推送当前会议状态树
                    // 更新会议树任务栏信息
                    updateHeadNode(conferenceId);
                    // 获取会议树
                    Set<String> online4ClientUserIds = getOnline4ClientUserIds();
                    JSONObject conferenceNote = getConferenceNote(conferenceId);
                    // 推送会议树
                    sendSocketTask(userId, online4ClientUserIds, conferenceNote, "note_tree", conferenceId, "1", "上线通知");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            disConnected(socketIOClient);
        }
    }
    private void disConnected(SocketIOClient socketIOClient) {
        log.info("有服务断开连接,连接" + socketIOClient.getRemoteAddress());
        try {
            String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
            String isSystem = socketIOClient.getHandshakeData().getSingleUrlParam(IS_SYSTEM);
            JSONObject jsonObject = new JSONObject();
            log.error("{}下线", userId);
            if (StringUtils.isNotEmpty(userId) && !userId.equals("undefined")) {
                List<SysNoticeUser> socketUser = sysNoticeMapper.findSocketUser(userId);
                clientCache.deleteSessionClient(userId, socketIOClient.getSessionId());
                clientCache.deleteSessionForClient(userId);
                //当userId下所有sessionId为空时再下线
                Map<String, HashMap<UUID, SocketIOClient>> allClient = clientCache.getAllClient();
                if (allClient.containsKey(userId)) {
                    HashMap<UUID, SocketIOClient> uuidSocketIOClientHashMap = allClient.get(userId);
                    if (uuidSocketIOClientHashMap == null || uuidSocketIOClientHashMap.size() == 0) {
                        sysNoticeMapper.socketUserDisConnected("1", userId, null);
                    } else {
                        String addr = socketIOClient.getRemoteAddress().toString();
                        Collection<SocketIOClient> values = uuidSocketIOClientHashMap.values();
                        for (SocketIOClient value : values) {
                            if (value.getRemoteAddress() != null && value.getRemoteAddress().toString().contains(":")) {
                                String[] split = value.getRemoteAddress().toString().split(":");
                                String[] split1 = addr.split(":");
                                if (split[0].equals(split1[0])) {
                                    clientCache.deleteSessionClient(userId, value.getSessionId());
                                    HashMap<UUID, SocketIOClient> userClient2 = clientCache.getUserClient(userId);
                                    if (userClient2 == null || userClient2.size() == 0) {
                                        sysNoticeMapper.socketUserDisConnected("1", userId, null);
                                    }
                                }
                            }
                        }
                    }
                }
                if (StringUtils.isBlank(isSystem)) {
                    //查询下线人员是否正在推流
                    List<Conference> conferences = sysNoticeMapper.findWebrtc(Long.valueOf(userId));
                    for (Conference conference : conferences) {
                        if (conference.getIsShare().equals("1")) {
                            //重置推流广播
                            SocketNoticeForm socketNoticeForm = new SocketNoticeForm();
                            socketNoticeForm.setConferenceId(conference.getId());
                            socketNoticeForm.setUserId(Long.valueOf(userId));
                            socketNoticeForm.setUserName("");
                            meetingFeignService.endSameScreen(socketNoticeForm);
                        }
                        updateHeadNode(conference.getId());
                        // 获取会议树
                        JSONObject conferenceNote = getConferenceNote(conference.getId());
                        // 获取会议树
                        Set<String> userIds = new HashSet<>();
                        userIds.add(userId);
                        // 推送会议树
                        sendSocketTask(userId, getOnline4ClientUserIds(), conferenceNote, "note_tree", conference.getId(), "1", "下线通知:" + userId);
                    }
                    //推送时先从redis中获取key和过期时间/已过期,不在推送/未过期,再推送
                    String key = String.format("conference:%s:%s", userId, OFFLINE_NOTICE);
                    if (!redisTemplate.hasKey(key)) {
                        //取会议ID
                        jsonObject.put("userName", sysUserMapper.findByName(userId) == null ? "" : sysUserMapper.findByName(userId));
                        socketUser.stream().forEach(su -> {
                            this.sendSocketTask(userId, getOnlineUserIds(), jsonObject, OFFLINE_NOTICE, Long.valueOf(su.getConferenceId()), "1", "socket断开");
                            // 更新会议树任务栏信息
                            updateHeadNode(Long.valueOf(su.getConferenceId()));
                            // 获取会议树
                            Set<String> online4ClientUserIds = getOnline4ClientUserIds();
                            JSONObject conferenceNote = getConferenceNote(Long.valueOf(su.getConferenceId()));
                            // 推送会议树
                            sendSocketTask(userId, online4ClientUserIds, conferenceNote, "note_tree", Long.valueOf(su.getConferenceId()), "1", "下线通知");
                        });
                        log.error("{}下线推送", userId);
                    }
                }
            } else {
                log.info("接收到用户id为空,无法从换从剔除socket信息");
            }
        } catch (Exception e) {
            e.printStackTrace();
            disConnected(socketIOClient);
        }
    }
    /**
     * 给指定的客户端发送socket 通知
     */
    public void sendSocketTask(String sendUserId, Set<String> userIds, JSONObject message, String
            destination, Long conferenceId, String sendType, String valueName) {
        Map<String, Object> toIPMap = new HashMap<>();
        for (String userId : userIds) {
            log.info("给客户端id为{},推送socket监听接口消息,推送的消息内容{}", userId, message);
            if (getOnlineUserIds().contains(userId)) {
                ValueOperations<String, String> operations = redisTemplate.opsForValue();
                redisTemplate.setKeySerializer(new StringRedisSerializer());
                redisTemplate.setValueSerializer(new StringRedisSerializer());
                if (destination.equals(OFFLINE_NOTICE)) {
                    //key组成->conference:用户ID:事件
                    String key = String.format("conference:%s:%s", sendUserId, OFFLINE_NOTICE);
                    //判断key是否存在
                    if (!redisTemplate.hasKey(key)) {
                        //将发送事件和接收人作为key,存在redis中,设置过期时间
                        operations.set(key, "已下线", 10000, TimeUnit.MILLISECONDS);
                    }
                } else if (destination.equals(ONLINE_NOTICE)) {
                    //key组成->conference:用户ID:事件
                    String key = String.format("conference:%s:%s", sendUserId, ONLINE_NOTICE);
                    //判断key是否存在
                    if (!redisTemplate.hasKey(key)) {
                        //将发送事件和接收人作为key,存在redis中,设置过期时间
                        operations.set(key, "已上线", 10000, TimeUnit.MILLISECONDS);
                    }
                }
                HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
                if (userClient != null) {
                    for (SocketIOClient socketIOClient : userClient.values()) {
                        if (socketIOClient != null) {
                            toIPMap.put(userId, socketIOClient.getRemoteAddress());
                            //socketIOClient.sendEvent(destination, message);
                            // 在此进行比较当前会议ID与socket记录会议ID是否相等(相等进行推送,否则不推送)
                            if (conferenceId == null || conferenceId == 0) {
                                socketIOClient.sendEvent(destination, message);
                            } else {
                                List<SysNoticeUser> socketUser = sysNoticeMapper.findAllSocketUser(userId);
                                if (socketUser.size() > 0) {
                                    socketUser.stream().filter(su -> su.getConferenceId().equals(String.valueOf(conferenceId))).forEach(su ->
                                            socketIOClient.sendEvent(destination, message)
                                    );
                                }
                            }
                        } else {
                            log.info("该服务已经断开链接,无法推送消息");
                        }
                    }
                }
            }
        }
        if (!destination.equals("attend_login")) {
            // 在此处执行推送信息记录
            SocketNotice socketNotice = new SocketNotice();
            socketNotice.setConferenceId(conferenceId == null ? 0 : conferenceId);
            socketNotice.setUserId(sendUserId);
            socketNotice.setUserIds(userIds.toString());
            socketNotice.setMessage(message.toJSONString());
            socketNotice.setDestination(destination);
            socketNotice.setSendType(sendType);
            socketNotice.setUserName(sysUserMapper.findByName(sendUserId) == null ? "" : sysUserMapper.findByName(sendUserId));
            socketNotice.setFromIp(socketHost);
            socketNotice.setToIp(toIPMap.toString());
            socketNotice.setValueName(valueName);
            socketNotice.setOnline(getOnlineUserIds().toString());
            sysNoticeMapper.insertNotice(socketNotice);
        }
    }
    public void addClientListener(SocketIOClient socketIOClient, String request, AckRequest ackRequest) {
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("监听到客户端发送给服务端的消息,消息内容{},发送的客户端为{}", request, userId);
        ackRequest.isAckRequested();
    }
    /**
     * 给所有的客户端发送指定消息
     */
    public void sendSocketTaskToAll(JSONObject message, String destination) {
        log.info("目前共有在线客户端:{},给所有客户端推送的消息内容{}", clientCache, message);
        for (SocketIOClient socketIOClient : clientCache.getAllForClient().values()) {
            socketIOClient.sendEvent(destination, message);
        }
    }
    /**
     * 获取当前所有在线的UserIds
     *
     * @return 返回在线人员列表信息
     */
    public Integer getOnlineNum() {
        return clientCache.getAllClient().keySet().size();
    }
    /**
     * 获取当前所有在线的UserIds
     *
     * @return 返回在线人员列表信息
     */
    public Set<String> getOnlineUserIds() {
        return clientCache.getAllClient().keySet();
    }
    /**
     * 获取当前所有在线的基本信息
     *
     * @return 返回在线人员列表信息
     */
    public Map<String, HashMap<UUID, SocketIOClient>> getOnline() {
        return clientCache.getAllClient();
    }
    /**
     * 获取当前所有在线的UserIds
     *
     * @return 返回在线人员列表信息
     */
    public Set<String> getOnline4ClientUserIds() {
        return clientCache.getAllForClient().keySet();
    }
    private JSONObject getConferenceNote(Long conferenceId) {
        JSONObject jsonObject = new JSONObject(new LinkedHashMap<>());
        ConferenceNode conferenceNode = sysNoticeMapper.findByConferenceNodeByConferenceId(conferenceId);
        // 会议状态conference_status
        jsonObject.put("conferenceStatus", conferenceNode.getConferenceStatus());
        if (StringUtils.isNotBlank(conferenceNode.getConferenceContent())) {
            conferenceNode.setConferenceContent(conferenceNode.getConferenceContent().replaceAll("\t", "").replaceAll("\n", ""));
            jsonObject.put("conferenceInfo", JSONObject.parseObject(conferenceNode.getConferenceContent()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getTopicContent())) {
            jsonObject.put("topicInfo", JSONObject.parseObject(conferenceNode.getTopicContent()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getCountersignData())) {
            jsonObject.put("countersignData", JSONObject.parseObject(conferenceNode.getCountersignData()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getTimer())) {
            jsonObject.put("timer", JSONObject.parseObject(conferenceNode.getTimer()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getSameScreen())) {
            jsonObject.put("sameScreen", JSONObject.parseObject(conferenceNode.getSameScreen()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getHeadData())) {
            jsonObject.put("headData", JSONObject.parseObject(conferenceNode.getHeadData()));
        }
        if (StringUtils.isNotBlank(conferenceNode.getSignInData())) {
            jsonObject.put("signInData", JSONObject.parseObject(conferenceNode.getSignInData())); // 签到信息
        }
        return jsonObject;
    }
    // 更新任务栏数据
    private void updateHeadNode(Long conferenceId) {
        // 更新会议树任务栏信息
        String headData = sysNoticeMapper.getHeadData(conferenceId);
        if (StringUtils.isNotBlank(headData)) {
            JSONObject jsonObject = JSONObject.parseObject(headData);
            jsonObject.put("onLineNumber", sysNoticeMapper.getOnLineNum(conferenceId.toString()) == null ? 0 : sysNoticeMapper.getOnLineNum(conferenceId.toString()));
            sysNoticeMapper.updateHedaData(jsonObject.toJSONString(), conferenceId);
        }
    }
}


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
21天前
|
网络协议 程序员 Python
pythonTCP客户端编程创建Socket对象
【4月更文挑战第6天】本教程介绍了TCP客户端如何创建Socket对象。Socket作为网络通信的基础单元,包含协议、IP地址和端口等信息。在TCP/IP中,Socket分为流式(TCP)、数据报(UDP)和原始套接字。以Python为例,创建TCP Socket对象需调用`socket.socket(AF_INET, SOCK_STREAM)`。为确保健壮性,应使用异常处理处理可能的`socket.error`。学习本教程将帮助你掌握TCP客户端创建Socket对象的技能。
|
3月前
Socket网络编程练习题四:客户端上传文件(多线程版)
Socket网络编程练习题四:客户端上传文件(多线程版)
|
3月前
Socket网络编程练习题三:客户端上传文件到服务器
Socket网络编程练习题三:客户端上传文件到服务器
|
3月前
|
Java
Socket网络编程练习题五:客户端多用户上传文件(多线程版)并使用线程池管理线程
Socket网络编程练习题五:客户端多用户上传文件(多线程版)并使用线程池管理线程
|
12天前
|
网络协议 Ubuntu Unix
Linux 下使用 socket 实现 TCP 客户端
Linux 下使用 socket 实现 TCP 客户端
|
13天前
|
安全 程序员
|
3月前
C++socket客户端select异步连接发送接收数据
C++socket客户端select异步连接发送接收数据
24 0
|
3月前
Socket网络编程练习题二:客户端发送一条数据,接收服务端反馈的消息并打印;服务端接收数据并打印,再给客户端反馈消息
Socket网络编程练习题二:客户端发送一条数据,接收服务端反馈的消息并打印;服务端接收数据并打印,再给客户端反馈消息
|
3月前
Socket网络编程练习题一:客户端多次发送数据,服务端多次接收数据并打印
Socket网络编程练习题一:客户端多次发送数据,服务端多次接收数据并打印
|
4月前
|
网络协议 安全 Python
socket客户端和服务端,文件的传输
socket 实现,客户端和服务端,文件的传输
43 1