🍃前言
本次开发目标
- 实现消费消息的核心逻辑
🌴扫描线程的实现
我们先给ConsumerManager类注入一些基础的属性
我们定义一个构造方法,里面就是我们的扫描线程,确保我们程序启动时,就可以一直对我们的阻塞队列进行扫描。
传入的对象为我们前面所构造的 虚拟机类的对象,我们只需要在VirtualHost 类里面添加实例化并传参即可
扫描线程逻辑如下:
- 阻塞队列里元素就取出,取出来的元素我们称它为令牌
- 根据该令牌我们可以找到需要被消费消息的队列
- 若该队列存在,我们便可以调用相应的消费方法进行消费
我们还需要将该线程设为后台线程
为了线程安全,必要的地方我们进行加锁操作。
并让这个线程永远的扫描下去,代码实现如下:
public ConsumerManager(VirtualHost p) { parent = p; scannerThread = new Thread(() -> { while (true) { try { // 1. 拿到令牌 String queueName = tokenQueue.take(); // 2. 根据令牌, 找到队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null) { throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName); } // 3. 从这个队列中消费一个消息. synchronized (queue) { consumeMessage(queue); } } catch (InterruptedException | MqException e) { e.printStackTrace(); } } }); // 把线程设为后台线程. scannerThread.setDaemon(true); scannerThread.start(); }
🌲实现消费消息
扫描后,就需要交给线程池进行负责执行回调函数消费消息
流程如下:
- 按照轮询的方式找出消费者
- 若消费者存在,则从队列中取出一个消息
- 若该队列中有消息,我们则交给线程池来执行回调函数消费消息
在线程池内我们需要做的操作有:
- 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
- 执行回调操作,若出现问题,则不会执行后续应答操作,若正常执行,我们则需要进行应答
- 应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理.
我们先来看一下自动应答的情况:
删除消息时,我们需要删除三处的存储:硬盘、待确认集合、内存中心的消息
需要注意的是:在删除硬盘上的数据时,我们需要查看该消息是否要持久化,若需要持久化,则不要删除.
实现代码如下:
private void consumeMessage(MSGQueue queue) { // 1. 按照轮询的方式, 找个消费者出来. ConsumerEnv luckyDog = queue.chooseConsumer(); if (luckyDog == null) { // 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说. return; } // 2. 从队列中取出一个消息 Message message = parent.getMemoryDataCenter().pollMessage(queue.getName()); if (message == null) { // 当前队列中还没有消息, 也不需要消费. return; } // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行. workerPool.submit(() -> { try { // 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前. parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message); // 2. 真正执行回调操作 luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody()); // 3. 如果当前是 "自动应答" , 就可以直接把消息删除了. // 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理. if (luckyDog.isAutoAck()) { // 1) 删除硬盘上的消息 if (message.getDeliverMode() == 2) { parent.getDiskDataCenter().deleteMessage(queue, message); } // 2) 删除上面的待确认集合中的消息 parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId()); // 3) 删除内存中消息中心里的消息 parent.getMemoryDataCenter().removeMessage(message.getMessageId()); System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName()); } } catch (Exception e) { e.printStackTrace(); } }); }
关于手动应答的实现步骤我们分为以下四步:
- 获取到消息和队列
- 删除硬盘上的数据
- 删除消息中心中的数据
- 删除待确认的集合中的数据
代码实现如下:
public boolean basicAck(String queueName, String messageId) { queueName = virtualHostName + queueName; try { // 1. 获取到消息和队列 Message message = memoryDataCenter.getMessage(messageId); if (message == null) { throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId); } MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName); } // 2. 删除硬盘上的数据 if (message.getDeliverMode() == 2) { diskDataCenter.deleteMessage(queue, message); } // 3. 删除消息中心中的数据 memoryDataCenter.removeMessage(messageId); // 4. 删除待确认的集合中的数据 memoryDataCenter.removeMessageWaitAck(queueName, messageId); System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId=" + messageId); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName + ", messageId=" + messageId); e.printStackTrace(); return false; } }
🌳实现addConsumer()方法
该方法是为了实现前面我们添加订阅者的方法
在这个方法里,我们需要实现的是:
- 根据队列名查找该队列是否存在
- 若存在,构造相应的消费者进行添加
- 并且若该队列里存在未消费的消息,我们就直接进行消费掉
代码实现如下:
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException { // 找到对应的队列. MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null) { throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName); } ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer); synchronized (queue) { queue.addConsumerEnv(consumerEnv); // 如果当前队列中已经有了一些消息了, 需要立即就消费掉. int n = parent.getMemoryDataCenter().getMessageCount(queueName); for (int i = 0; i < n; i++) { // 这个方法调用一次就消费一条消息. consumeMessage(queue); } } }
🎋VirtualHost类订阅消息的完善
直接调用即可,代码实现如下:
// 订阅消息. // 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者. // consumerTag: 消费者的身份标识 // autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答. // consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了. public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { // 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中. queueName = virtualHostName + queueName; try { consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer); System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName); e.printStackTrace(); return false; } }
⭕总结
关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下