教你用纯Java实现一个即时通讯系统(附源码)(下)

简介: 教你用纯Java实现一个即时通讯系统(附源码)(下)

RocketMq的服务生产者Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 11:05 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqProducerConfig.class})
public class MqProducerAutoConfig {
    @Resource
    private MqProducerConfig mqProducerConfig;
    @Bean
    @ConditionalOnMissingBean
    //意味着DefaultMQProducer的配置可以被覆盖
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());
        producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
        //没有则自动创建topic的key
//        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
        producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());
        producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());
        try {
            producer.start();
            log.info("【 MqProducerAutoConfig 】mq producer is started!");
        } catch (Exception e) {
            log.error("[MqProducerAutoConfig] start fail, e is ", e);
        }
        return producer;
    }
}


然后是对RocketMq内部发送消息事件的一层函数封装


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.idea.web.socket.config.MqProducerConfig;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
 * 消息广播发送端
 *
 * @Author linhao
 * @Date created in 10:43 下午 2021/5/9
 */
@Component
@Slf4j
public class BroadcastMqProducer {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private MqProducerConfig mqProducerConfig;
    private static String TOPIC = "ws-topic";
    private static String TAGS = "ws-tag";
    public static Integer ALL_USER_RECEIVE_TYPE = 1;
    public static Integer ONE_USER_RECEIVE_TYPE = 2;
    /**
     * 点对点之间的消息发送
     *
     * @param destSessionKey
     * @param msg
     * @return
     */
    public SendResult sendWebSocketToUser(String destSessionKey,String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketToUser] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            broadcastMqDTO.setSessionKey(destSessionKey);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
    /**
     * 广播消息发送
     *
     * @param msg
     * @return
     */
    public SendResult sendWebSocketBroadcastMsg(String msg) {
        if (StringUtils.isEmpty(msg)) {
            log.error("[sendWebSocketBroadcastMsg] msg can not be null!");
            return null;
        }
        Message message = null;
        SendResult sendResult = null;
        try {
            BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
            broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);
            broadcastMqDTO.setMessage(msg);
            message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = defaultMQProducer.send(message);
        } catch (Exception e) {
            log.error("[sendWebSocketBroadcastMsg] e is ", e);
        }
        return sendResult;
    }
}


对消息的订阅模块实现代码如下:


package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.List;
import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
/**
 * @Author linhao
 * @Date created in 10:59 上午 2021/5/10
 */
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {
    @Resource
    private SocketManager socketManager;
    @Resource
    private SimpMessagingTemplate template;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            Log.info("receive empty msg");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        byte[] bytes = messageExt.getBody();
        String json = new String(bytes);
        BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
        log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
        if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO);
            template.convertAndSend("/topic/sendTopic", broadcastMqDTO);
        } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
            String sessionKey = broadcastMqDTO.getSessionKey();
            WebSocketSession webSocketSession = socketManager.get(sessionKey);
            if (webSocketSession != null) {
                template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
                log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}


整体设计结构如下图:


image.png


于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。


业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。


image.png


设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。


遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。


项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:


https://gitee.com/IdeaHome_admin/socket-framework


推荐好文


>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能


>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
18 4
|
10天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
15天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
32 3
|
20天前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
58 3
|
24天前
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
22 1
|
25天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
28天前
|
移动开发 前端开发 JavaScript
java家政系统成品源码的关键特点和技术应用
家政系统成品源码是已开发完成的家政服务管理软件,支持用户注册、登录、管理个人资料,家政人员信息管理,服务项目分类,订单与预约管理,支付集成,评价与反馈,地图定位等功能。适用于各种规模的家政服务公司,采用uniapp、SpringBoot、MySQL等技术栈,确保高效管理和优质用户体验。
时间轮-Java实现篇
在前面的文章《[时间轮-理论篇](https://developer.aliyun.com/article/910513)》讲了时间轮的一些理论知识,然后根据理论知识。我们自己来实现一个简单的时间轮。
|
8天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
7天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
下一篇
无影云桌面