🍃前言
本次开发任务
- 实现放送消息到队列/交换机中
🎋实现思路
大致思路为根据交换机的类型与转发规则,将消息转发到相应的队列。
具体步骤如下:
- 首先我们需要对该交换机进行重命名
- 检查 routingKey 是否合法,该部分的逻辑统一放在 Router 里面进行处理,后面会进行开发的
- 查询当前交换机对象是否存在,不存在返回即可
- 查询当前交换机的类型,若为DIRECT(直接交换机)
- 按照直接交换机的方式来转发消息以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
- 构造相应的消息对象
- 查找该队列名对应的对象
- 存在,直接写入对象即可
- 若不为,按照 fanout 和topic的方式来转发
- 找到该交换机关联的所有绑定, 并遍历这些绑定对象
- 遍历时,我们需要获取到绑定对象, 判定对应的队列是否存在,此处队列不存在咱们就不抛出异常了.。可能此处有多个这样的队列。希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
- 构造消息对象
- 判定这个消息是否能转发给该队列,该逻辑同样放在 Router 里面进行处理
- 如果是 fanout, 所有绑定的队列都要转发的.
- 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配
- 5.真正转发消息给队列
- 此处我们先创建个方法,表示转发。但具体实现后面才会进行实现。
- 返回true
最后不要忘了进行异常处理
🎍代码实现
// 发送消息到指定的交换机/队列中. public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) { try { // 1. 转换交换机的名字 exchangeName = virtualHostName + exchangeName; // 2. 检查 routingKey 是否合法. if (!router.checkRoutingKey(routingKey)) { throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey); } // 3. 查找交换机对象 Exchange exchange = memoryDataCenter.getExchange(exchangeName); if (exchange == null) { throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName); } // 4. 判定交换机的类型 if (exchange.getType() == ExchangeType.DIRECT) { // 按照直接交换机的方式来转发消息 // 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中. // 此时, 可以无视绑定关系. String queueName = virtualHostName + routingKey; // 5. 构造消息对象 Message message = Message.createMessageWithId(routingKey, basicProperties, body); // 6. 查找该队列名对应的对象 MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName); } // 7. 队列存在, 直接给队列中写入消息 sendMessage(queue, message); } else { // 按照 fanout 和 topic 的方式来转发. // 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象 ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName); for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) { // 1) 获取到绑定对象, 判定对应的队列是否存在 Binding binding = entry.getValue(); MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName()); if (queue == null) { // 此处咱们就不抛出异常了. 可能此处有多个这样的队列. // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输. System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName()); continue; } // 2) 构造消息对象 Message message = Message.createMessageWithId(routingKey, basicProperties, body); // 3) 判定这个消息是否能转发给该队列. // 如果是 fanout, 所有绑定的队列都要转发的. // 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配. if (!router.route(exchange.getType(), binding, message)) { continue; } // 4) 真正转发消息给队列 sendMessage(queue, message); } } return true; } catch (Exception e) { System.out.println("[VirtualHost] 消息发送失败!"); e.printStackTrace(); return false; } } private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException { }
⭕总结
关于《【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下