声明
本文提炼于个人练手项目,其中的实现逻辑不一定标准,实现思路没有参考权威的文档和教程,仅为个人思考得出,因此可能存在较多本人未考虑到的情况和漏洞,因此仅供参考,如果大家觉得有问题,恳请大家指出有问题的地方
如果对客户端的实现感兴趣,可以转身查看【UniApp开发小程序】私聊功能uniapp界面实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】
聊天数据查询管理
数据库设计
【私信表】
Vo
package com.ruoyi.common.core.domain.vo; import lombok.Data; import java.util.Date; /** * @Author dam * @create 2023/8/22 21:39 */ @Data public class ChatUserVo { private Long userId; private String userAvatar; private String userName; private String userNickname; /** * 最后一条消息的内容 */ private String lastChatContent; /** * 最后一次聊天的日期 */ private Date lastChatDate; /** * 未读消息数量 */ private Integer unReadChatNum; }
Controller
其中两个方法较为重要,介绍如下:
- listChatUserVo:当用户进入消息界面的时候,需要查询出最近聊天的用户,其中还需要展示一些信息,如
ChatUserVo
的属性 - listChat:该方法于查询对方最近和自己的私聊内容,当用户查询了这些私聊内容,默认用户已经看过了,将这些私聊内容设置为已读状态
package com.shm.controller; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import javax.servlet.http.HttpServletResponse; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ruoyi.common.core.domain.entity.Chat; import com.ruoyi.common.core.domain.vo.ChatUserVo; import com.shm.service.IChatService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.ruoyi.common.annotation.Log; import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.enums.BusinessType; import com.ruoyi.common.utils.poi.ExcelUtil; import com.ruoyi.common.core.page.TableDataInfo; /** * 聊天数据Controller * * @author dam * @date 2023-08-19 */ @RestController @RequestMapping("/market/chat") @Api public class ChatController extends BaseController { @Autowired private IChatService chatService; /** * 查询聊天数据列表 */ @PreAuthorize("@ss.hasPermi('market:chat:list')") @GetMapping("/list") public TableDataInfo list(Chat chat) { startPage(); List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat)); return getDataTable(list); } /** * 查询最近和自己聊天的用户 */ @ApiOperation("listChatUserVo") @PreAuthorize("@ss.hasPermi('market:chat:list')") @GetMapping("/listChatUserVo") public TableDataInfo listChatUserVo() { startPage(); String username = getLoginUser().getUsername(); List<ChatUserVo> list = chatService.listChatUserVo(username); return getDataTable(list); } /** * 查询用户和自己最近的聊天信息 */ @ApiOperation("listUsersChatWithMe") @PreAuthorize("@ss.hasPermi('market:chat:list')") @GetMapping("/listChat/{toUsername}") public TableDataInfo listChat(@PathVariable("toUsername") String toUsername) { String curUsername = getLoginUser().getUsername(); startPage(); List<Chat> list = chatService.listChat(curUsername, toUsername); for (Chat chat : list) { System.out.println("chat:"+chat.toString()); } System.out.println(); // 查出的数据,如果消息是对方发的,且是未读状态,重新设置为已读 List<Long> unReadIdList = list.stream().filter( (item1) -> { if (item1.getIsRead() == 0 && item1.getFromWho().equals(toUsername)) { return true; } else { return false; } } ) .map(item2 -> { return item2.getId(); }).collect(Collectors.toList()); System.out.println("将"+ unReadIdList.toString()+"设置为已读"); if (unReadIdList.size() > 0) { // 批量设置私聊为已读状态 chatService.batchRead(unReadIdList); } return getDataTable(list); } /** * 导出聊天数据列表 */ @PreAuthorize("@ss.hasPermi('market:chat:export')") @Log(title = "聊天数据", businessType = BusinessType.EXPORT) @PostMapping("/export") public void export(HttpServletResponse response, Chat chat) { List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat)); ExcelUtil<Chat> util = new ExcelUtil<Chat>(Chat.class); util.exportExcel(response, list, "聊天数据数据"); } /** * 获取聊天数据详细信息 */ @PreAuthorize("@ss.hasPermi('market:chat:query')") @GetMapping(value = "/getInfo/{id}") public AjaxResult getInfo(@PathVariable("id") Long id) { return success(chatService.getById(id)); } /** * 新增聊天数据 */ @PreAuthorize("@ss.hasPermi('market:chat:add')") @Log(title = "聊天数据", businessType = BusinessType.INSERT) @PostMapping public AjaxResult add(@RequestBody Chat chat) { return toAjax(chatService.save(chat)); } /** * 修改聊天数据 */ @PreAuthorize("@ss.hasPermi('market:chat:edit')") @Log(title = "聊天数据", businessType = BusinessType.UPDATE) @PutMapping public AjaxResult edit(@RequestBody Chat chat) { return toAjax(chatService.updateById(chat)); } /** * 删除聊天数据 */ @PreAuthorize("@ss.hasPermi('market:chat:remove')") @Log(title = "聊天数据", businessType = BusinessType.DELETE) @DeleteMapping("/{ids}") public AjaxResult remove(@PathVariable List<Long> ids) { return toAjax(chatService.removeByIds(ids)); } }
Service
package com.shm.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.ruoyi.common.core.domain.entity.Chat; import com.ruoyi.common.core.domain.vo.ChatUserVo; import com.shm.mapper.ChatMapper; import com.shm.service.IChatService; import org.springframework.stereotype.Service; import java.util.List; /** * @author 17526 * @description 针对表【chat(聊天数据表)】的数据库操作Service实现 * @createDate 2023-08-19 21:12:49 */ @Service public class IChatServiceImpl extends ServiceImpl<ChatMapper, Chat> implements IChatService { /** * 查询最近和自己聊天的用户 * * @return */ @Override public List<ChatUserVo> listChatUserVo(String username) { return baseMapper.listChatUserVo(username); } /** * 查询用户和自己最近的聊天信息 * * @param curUsername * @param toUsername * @return */ @Override public List<Chat> listChat(String curUsername, String toUsername) { return baseMapper.listChat(curUsername, toUsername); } @Override public void batchRead(List<Long> unReadIdList) { baseMapper.batchRead(unReadIdList); } }
Mapper
package com.shm.mapper; import com.ruoyi.common.core.domain.entity.Chat; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.ruoyi.common.core.domain.vo.ChatUserVo; import org.apache.ibatis.annotations.Param; import java.util.List; /** * @author 17526 * @description 针对表【chat(聊天数据表)】的数据库操作Mapper * @createDate 2023-08-19 21:12:49 * @Entity com.ruoyi.common.core.domain.entity.Chat */ public interface ChatMapper extends BaseMapper<Chat> { List<ChatUserVo> listChatUserVo(@Param("username") String username); List<Chat> listChat(@Param("curUsername") String curUsername, @Param("toUsername") String toUsername); void batchRead(@Param("unReadIdList") List<Long> unReadIdList); }
【xml文件】
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.shm.mapper.ChatMapper"> <resultMap id="BaseResultMap" type="com.ruoyi.common.core.domain.entity.Chat"> <id property="id" column="id" jdbcType="BIGINT"/> <result property="createTime" column="create_time" jdbcType="TIMESTAMP"/> <result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/> <result property="isDeleted" column="is_deleted" jdbcType="TINYINT"/> <result property="fromWho" column="from_who" jdbcType="BIGINT"/> <result property="toWho" column="to_who" jdbcType="BIGINT"/> <result property="content" column="content" jdbcType="VARCHAR"/> <result property="picUrl" column="pic_url" jdbcType="VARCHAR"/> </resultMap> <sql id="Base_Column_List"> id,create_time,update_time, is_deleted,from,to, content,pic_url </sql> <update id="batchRead"> update chat set is_read = 1 where id in <foreach collection="unReadIdList" item="chatId" separator="," open="(" close=")"> #{chatId} </foreach> </update> <select id="listChatUserVo" resultType="com.ruoyi.common.core.domain.vo.ChatUserVo"> SELECT (CASE WHEN c.from_who=#{username} THEN c.to_who ELSE c.from_who END) AS `userName`, c.content AS `lastChatContent`, c.create_time AS lastChatDate, u.user_id AS userId, u.avatar AS userAvatar, u.nick_name AS userNickname, ur.unReadNum as unReadChatNum FROM (SELECT MAX(`id`) AS chatId, CASE WHEN `from_who` = #{username} THEN `to_who` ELSE `from_who` END AS uname FROM `chat` WHERE `from_who` = #{username} OR `to_who` = #{username} GROUP BY uname) AS t INNER JOIN `chat` c ON c.id = t.chatId LEFT JOIN `sys_user` u ON t.uname = u.user_name LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = #{username} GROUP BY from_who) ur ON ur.from_who = t.uname ORDER BY c.create_time DESC </select> <select id="listChat" resultType="com.ruoyi.common.core.domain.entity.Chat"> SELECT * FROM chat WHERE ( from_who = #{curUsername} AND to_who = #{toUsername} ) OR ( to_who = #{curUsername} AND from_who = #{toUsername} ) ORDER BY create_time DESC </select> </mapper>
【查询最近聊天的用户的用户名和那条消息的id】
因为id是自增的,所以最新的那条消息的id肯定最大,因此可以使用MAX(id)
来获取最近的消息
SELECT MAX(`id`) AS chatId, CASE WHEN `from_who` = 'admin' THEN `to_who` ELSE `from_who` END AS uname FROM `chat` WHERE `from_who` = 'admin' OR `to_who` = 'admin' GROUP BY uname
【内连接私信表获取消息的其他信息】
INNER JOIN `chat` c ON c.id = t.chatId
【左连接用户表获取用户的相关信息】
LEFT JOIN `sys_user` u ON t.uname = u.user_name
【左联接私信表获取未读对方消息的数量】
CASE WHEN is_read=1 THEN 0 ELSE 1 END
如果已读,说明未读数量为0;否则为1
LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = 'admin' GROUP BY from_who) ur ON ur.from_who = t.uname
【最后按照用户和自己最后聊天的时间来降序排序】
ORDER BY c.create_time DESC
WebSocket引入
为什么使用WebSocket
WebSocket不仅支持客户端向服务端发送消息,同时也支持服务端向客户端发送消息,这样才能完成私聊的功能。即
用户1-->服务端-->用户2
依赖
<!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
配置类
package com.shm.config; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * 注入一个ServerEndpointExporter, * 该Bean会自动注册使用@ServerEndpoint注解 声明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
WebSocket服务
需要注意的是,Websocket是多例模式,无法直接使用@Autowired
注解来注入rabbitTemplate,需要使用下面的方式,其中rabbitTemplate为静态变量
private static RabbitTemplate rabbitTemplate; @Autowired public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { WebSocketServer.rabbitTemplate = rabbitTemplate; }
package com.shm.component; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.ruoyi.common.core.domain.entity.Chat; import com.shm.component.delay.DelayQueueManager; import com.shm.component.delay.DelayTask; import com.shm.constant.RabbitMqConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author websocket服务 */ @ServerEndpoint(value = "/websocket/{username}") @Component//将WebSocketServer注册为spring的一个bean public class WebSocketServer { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 记录当前在线连接的客户端的session */ public static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>(); /** * 记录正在进行的聊天的发出者和接收者 */ public static final Map<String, Integer> fromToMap = new ConcurrentHashMap<>(); /** * 用户Session保留时间,如果超过该时间,用户还没有给服务端发送消息,认为用户下线,删除其Session * 注意:该时间需要比客户端的心跳时间更长 */ private static final long expire = 6000; // websocket为多例模式,无法直接注入,需要换成下面的方式 // @Autowired // RabbitTemplate rabbitTemplate; private static RabbitTemplate rabbitTemplate; @Autowired public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { WebSocketServer.rabbitTemplate = rabbitTemplate; } @Autowired private static DelayQueueManager delayQueueManager; @Autowired public void setDelayQueueManager(DelayQueueManager delayQueueManager) { WebSocketServer.delayQueueManager = delayQueueManager; } /** * 浏览器和服务端连接建立成功之后会调用这个方法 */ @OnOpen public void onOpen(Session session, @PathParam("username") String username) { usernameAndSessionMap.put(username, session); // 建立延时任务,如果到expire时间,客户端还是没有和服务器有任何交互的话,就删除该用户的session,表示该用户下线 delayQueueManager.put(new DelayTask(username, expire)); log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session, @PathParam("username") String username) { usernameAndSessionMap.remove(username); log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size()); } /** * 发生错误的时候会调用这个方法 */ @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); } /** * 服务端发送消息给客户端 */ public void sendMessage(String message, Session toSession) { try { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); toSession.getBasicRemote().sendText(message); } catch (Exception e) { log.error("服务端发送消息给客户端失败", e); } } /** * onMessage方法是一个消息的中转站 * 1、首先接受浏览器端socket.send发送过来的json数据 * 2、然后解析其数据,找到消息要发送给谁 * 3、最后将数据发送给相应的人 * * @param message 客户端发送过来的消息 数据格式:{"from":"user1","to":"admin","text":"你好呀"} */ @OnMessage public void onMessage(String message, Session session, @PathParam("username") String username) { // log.info("服务端接收到 {} 的消息,消息内容是:{}", username, message); // 收到用户的信息,删除之前的延时任务,创建新的延时任务 delayQueueManager.put(new DelayTask(username, expire)); if (!usernameAndSessionMap.containsKey(username)) { // 可能用户挂机了一段时间,被下线了,后面又重新回来发信息了,需要重新将用户和session添加字典中 usernameAndSessionMap.put(username, session); } // 将json字符串转化为json对象 JSONObject obj = JSON.parseObject(message); String status = (String) obj.get("status"); // 获取消息的内容 String text = (String) obj.get("text"); // 查看消息要发送给哪个用户 String to = (String) obj.get("to"); String fromToKey = username + "-" + to; String toFromKey = to + "-" + username; if (status != null) { if (status.equals("start")) { fromToMap.put(fromToKey, 1); } else if (status.equals("end")) { System.out.println("移除销毁的fromToKey:" + fromToKey); fromToMap.remove(fromToKey); } else if (status.equals("ping")) { // 更新用户对应的时间戳 // usernameAndTimeStampMap.put(username, System.currentTimeMillis()); } } else { // 封装数据发送给消息队列 Chat chat = new Chat(); chat.setFromWho(username); chat.setToWho(to); chat.setContent(text); chat.setIsRead(0); // chat.setPicUrl(""); // 根据to来获取相应的session,然后通过session将消息内容转发给相应的用户 Session toSession = usernameAndSessionMap.get(to); if (toSession != null) { JSONObject jsonObject = new JSONObject(); // 设置消息来源的用户名 jsonObject.put("from", username); // 设置消息内容 jsonObject.put("text", text); // 服务端发送消息给目标客户端 this.sendMessage(jsonObject.toString(), toSession); log.info("发送消息给用户 {} ,消息内容是:{} ", toSession, jsonObject.toString()); if (fromToMap.containsKey(toFromKey)) { chat.setIsRead(1); } } else { log.info("发送失败,未找到用户 {} 的session", to); } rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat); } } }
RabbitMQ引入
为什么使用消息队列
在用户之间进行聊天的时候,需要将用户的聊天数据存储到数据库中,但是如果大量用户同时在线的话,可能同一时间发送的消息数量太多,如果同时将这些消息存储到数据库中,会给数据库带来较大的压力,使用RabbitMQ可以先把要存储的数据放到消息队列,然后数据库服务器压力没这么大的时候,就会从消息队列中获取数据来存储,这样可以分散数据库的压力。但是如果用户是直接从数据库获取消息的话,消息可能有一定的延迟,如果用户之间正在聊天的话,消息则不会延迟,因为聊天内容会立刻通过WebSocket发送给对方。
依赖
<!-- rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
启动类添加注解
在启动类的上方添加@EnableRabbit
注解
常量类
因为有多处会使用到队列命名等信息,创建一个常量类来保存相关信息
package com.shm.constant; public class RabbitMqConstant { public static final String CHAT_STORAGE_QUEUE = "shm.chat-storage.queue"; public static final String CHAT_STORAGE_EXCHANGE = "shm.chat-storage-event-exchange"; public static final String CHAT_STORAGE_ROUTER_KEY = "shm.chat-storage.register"; }
使用配置类创建队列、交换机、绑定关系
package com.shm.config; import com.shm.constant.RabbitMqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabbitConfig { /** * 使用JSON序列化机制,进行消息转换 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 私信存储队列 * * @return */ @Bean public Queue chatStorageQueue() { Queue queue = new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false); return queue; } /** * 私信存储交换机 * 创建交换机,由于只需要一个队列,创建direct交换机 * * @return */ @Bean public Exchange chatStorageExchange() { //durable:持久化 return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false); } /** * 创建私信存储 交换机和队列的绑定关系 * * @return */ @Bean public Binding chatStorageBinding() { return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE, Binding.DestinationType.QUEUE, RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, null); } }
消息监听器
创建一个消息监听类来监听队列的消息,然后调用相关的逻辑来处理信息,本文主要的处理是将私信内容存储到数据库中
package com.shm.listener; import com.rabbitmq.client.Channel; import com.ruoyi.common.core.domain.entity.Chat; import com.shm.constant.RabbitMqConstant; import com.shm.service.IChatService; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; @Service /** * 注意,类上面需要RabbitListener注解 */ @RabbitListener(queues = RabbitMqConstant.CHAT_STORAGE_QUEUE) public class ChatStorageListener { @Autowired private IChatService chatService; @RabbitHandler public void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException { try { boolean save = chatService.save(chat); //解锁成功,手动确认,消息才从MQ中删除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { //只要有异常,拒绝消息,让消息重新返回队列,让别的消费者继续解锁 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
发送消息到消息队列
WebSocketServer
为Websocket后端服务代码,其中的onMessage方法会接受客户端发送过来的消息,当接收到消息的时候,将消息发送给消息队列
// 封装数据发送给消息队列 Chat chat = new Chat(); chat.setFromWho(username); chat.setToWho(to); chat.setContent(text); chat.setPicUrl(""); rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);
延时任务
为什么使用延时任务
为了更好地感知用户的在线状态,在用户连接了WebSocket或者发送消息之后,建立一个延时任务,如果到达了所设定的延时时间,就删除用户的Session,认为用户已经下线;如果在延时期间之内,用户发送了新消息,或者发送了心跳信号,证明该用户还处于在线状态,删除前面的延时任务,并创建新的延时任务
延时任务类
package com.shm.component.delay; import lombok.Data; import lombok.Getter; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @Author dam * @create 2023/8/25 15:12 */ @Getter public class DelayTask implements Delayed { /** * 用户名 */ private final String userName; /** * 任务的真正执行时间 */ private final long executeTime; /** * 任务延时多久执行 */ private final long expire; /** * @param expire 任务需要延时的时间 */ public DelayTask(String userName, long expire) { this.userName = userName; this.executeTime = expire + System.currentTimeMillis(); this.expire = expire; } /** * 根据给定的时间单位,返回与此对象关联的剩余延迟时间 * * @param unit the time unit 时间单位 * @return 返回剩余延迟,零值或负值表示延迟已经过去 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.executeTime - System.currentTimeMillis(), unit); } @Override public int compareTo(Delayed o) { return 0; } }
延时任务管理
package com.shm.component.delay; import com.shm.component.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executors; /** * @Author dam * @create 2023/8/25 15:12 */ @Component @Slf4j public class DelayQueueManager implements CommandLineRunner { private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>(); private final Map<String, DelayTask> usernameAndDelayTaskMap = new ConcurrentHashMap<>(); /** * 加入到延时队列中 * * @param task */ public void put(DelayTask task) { // 因为一个用户只能对应一个延时任务,所以如果已经存在了延时任务,将其进行删除 if (usernameAndDelayTaskMap.containsKey(task.getUserName())) { this.remove(task.getUserName()); } delayQueue.put(task); usernameAndDelayTaskMap.put(task.getUserName(), task); } /** * 取消延时任务 * * @param username 要删除的任务的用户名 * @return */ public boolean remove(String username) { DelayTask remove = usernameAndDelayTaskMap.remove(username); return delayQueue.remove(remove); } @Override public void run(String... args) throws Exception { this.executeThread(); } /** * 延时任务执行线程 */ private void executeThread() { while (true) { try { DelayTask task = delayQueue.take(); //执行任务 processTask(task); } catch (InterruptedException e) { break; } } } /** * 执行延时任务 * * @param task */ private void processTask(DelayTask task) { // 删除该用户的session,表示用户下线 WebSocketServer.usernameAndSessionMap.remove(task.getUserName()); log.error("执行定时任务:{}下线", task.getUserName()); } }