教你用纯Java实现一个即时通讯系统(附源码)(下)

简介: 教你用纯Java实现一个即时通讯系统(附源码)(下)

RocketMq的服务生产者Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 11:05 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqProducerConfig.class})
public class MqProducerAutoConfig {
    @Resource
    private MqProducerConfig mqProducerConfig;
    @Bean
    @ConditionalOnMissingBean
    //意味着DefaultMQProducer的配置可以被覆盖
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());
        producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
        //没有则自动创建topic的key
//        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
        producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());
        producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());
        try {
            producer.start();
            log.info("【 MqProducerAutoConfig 】mq producer is started!");
        } catch (Exception e) {
            log.error("[MqProducerAutoConfig] start fail, e is ", e);
        }
        return producer;
    }
}


然后是对RocketMq内部发送消息事件的一层函数封装


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.idea.web.socket.config.MqProducerConfig;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
 * 消息广播发送端
 *
 * @Author linhao
 * @Date created in 10:43 下午 2021/5/9
 */
@Component
@Slf4j
public class BroadcastMqProducer {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private MqProducerConfig mqProducerConfig;
    private static String TOPIC = "ws-topic";
    private static String TAGS = "ws-tag";
    public static Integer ALL_USER_RECEIVE_TYPE = 1;
    public static Integer ONE_USER_RECEIVE_TYPE = 2;
    /**
     * 点对点之间的消息发送
     *
     * @param destSessionKey
     * @param msg
     * @return
     */
    public SendResult sendWebSocketToUser(String destSessionKey,String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketToUser] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            broadcastMqDTO.setSessionKey(destSessionKey);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
    /**
     * 广播消息发送
     *
     * @param msg
     * @return
     */
    public SendResult sendWebSocketBroadcastMsg(String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketBroadcastMsg] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
}


对消息的订阅模块实现代码如下:


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.List;
import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
/**
 * @Author linhao
 * @Date created in 10:59 上午 2021/5/10
 */
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {
    @Resource
    private SocketManager socketManager;
    @Resource
    private SimpMessagingTemplate template;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            Log.info("receive empty msg");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        byte[] bytes = messageExt.getBody();
        String json = new String(bytes);
        BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
        log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
        if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO);
            template.convertAndSend("/topic/sendTopic", broadcastMqDTO);
        } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            String sessionKey = broadcastMqDTO.getSessionKey();
            WebSocketSession webSocketSession = socketManager.get(sessionKey);
            if (webSocketSession != null) {
                template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
                log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}


整体设计结构如下图:


image.png


于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。


业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。


image.png


设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。


遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。


项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:


https://gitee.com/IdeaHome_admin/socket-framework


推荐好文


>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能


>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8天前
|
JavaScript NoSQL Java
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
152 96
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
|
10天前
|
前端开发 JavaScript Java
【03】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架搭建-服务端-后台管理-整体搭建-优雅草卓伊凡商业项目实战
【03】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架搭建-服务端-后台管理-整体搭建-优雅草卓伊凡商业项目实战
57 13
【03】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架搭建-服务端-后台管理-整体搭建-优雅草卓伊凡商业项目实战
|
11天前
|
人工智能 JavaScript 关系型数据库
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
57 14
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
|
9天前
|
SQL JavaScript 安全
【04】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架二次开发准备工作-以及建立初步后端目录菜单列-优雅草卓伊凡商业项目实战
【04】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架二次开发准备工作-以及建立初步后端目录菜单列-优雅草卓伊凡商业项目实战
47 11
【04】Java+若依+vue.js技术栈实现钱包积分管理系统项目-若依框架二次开发准备工作-以及建立初步后端目录菜单列-优雅草卓伊凡商业项目实战
|
13天前
|
人工智能 JavaScript 安全
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
57 13
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
|
4天前
|
JavaScript 安全 Java
智慧产科一体化管理平台源码,基于Java,Vue,ElementUI技术开发,二开快捷
智慧产科一体化管理平台覆盖从备孕到产后42天的全流程管理,构建科室协同、医患沟通及智能设备互联平台。通过移动端扫码建卡、自助报道、智能采集数据等手段优化就诊流程,提升孕妇就诊体验,并实现高危孕产妇五色管理和孕妇学校三位一体化管理,全面提升妇幼健康宣教质量。
33 12
|
7天前
|
缓存 Java 应用服务中间件
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
java语言后台管理若依框架-登录提示404-接口异常-系统接口404异常如何处理-登录验证码不显示prod-api/captchaImage 404 (Not Found) 如何处理-解决方案优雅草卓伊凡
35 5
|
8天前
|
人工智能 监控 安全
Java智慧工地(源码):数字化管理提升施工安全与质量
随着科技的发展,智慧工地已成为建筑行业转型升级的重要手段。依托智能感知设备和云物互联技术,智慧工地为工程管理带来了革命性的变革,实现了项目管理的简单化、远程化和智能化。
29 4
|
26天前
|
JavaScript Java 测试技术
基于Java+SpringBoot+Vue实现的车辆充电桩系统设计与实现(系统源码+文档+部署讲解等)
面向大学生毕业选题、开题、任务书、程序设计开发、论文辅导提供一站式服务。主要服务:程序设计开发、代码修改、成品部署、支持定制、论文辅导,助力毕设!
57 6
|
消息中间件 存储 小程序
教你用纯Java实现一个即时通讯系统(附源码)(上)
教你用纯Java实现一个即时通讯系统(附源码)(上)
教你用纯Java实现一个即时通讯系统(附源码)(上)