【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中

简介: 【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中

🍃前言

本次开发任务

  • 实现放送消息到队列/交换机中

🎋实现思路

大致思路为根据交换机的类型与转发规则,将消息转发到相应的队列。

具体步骤如下:

  1. 首先我们需要对该交换机进行重命名
  2. 检查 routingKey 是否合法,该部分的逻辑统一放在 Router 里面进行处理,后面会进行开发的
  3. 查询当前交换机对象是否存在,不存在返回即可
  4. 查询当前交换机的类型,若为DIRECT(直接交换机)
  1. 按照直接交换机的方式来转发消息以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
  2. 构造相应的消息对象
  3. 查找该队列名对应的对象
  4. 存在,直接写入对象即可
  1. 若不为,按照 fanout 和topic的方式来转发
  1. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
  2. 遍历时,我们需要获取到绑定对象, 判定对应的队列是否存在,此处队列不存在咱们就不抛出异常了.。可能此处有多个这样的队列。希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
  3. 构造消息对象
  4. 判定这个消息是否能转发给该队列,该逻辑同样放在 Router 里面进行处理
    - 如果是 fanout, 所有绑定的队列都要转发的.
    - 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配
  1. 5.真正转发消息给队列
  • 此处我们先创建个方法,表示转发。但具体实现后面才会进行实现。
  1. 返回true

最后不要忘了进行异常处理

🎍代码实现

// 发送消息到指定的交换机/队列中.
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
    try {
        // 1. 转换交换机的名字
        exchangeName = virtualHostName + exchangeName;
        // 2. 检查 routingKey 是否合法.
        if (!router.checkRoutingKey(routingKey)) {
            throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
        }
        // 3. 查找交换机对象
        Exchange exchange = memoryDataCenter.getExchange(exchangeName);
        if (exchange == null) {
            throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
        }
        // 4. 判定交换机的类型
        if (exchange.getType() == ExchangeType.DIRECT) {
            // 按照直接交换机的方式来转发消息
            // 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
            // 此时, 可以无视绑定关系.
            String queueName = virtualHostName + routingKey;
            // 5. 构造消息对象
            Message message = Message.createMessageWithId(routingKey, basicProperties, body);
            // 6. 查找该队列名对应的对象
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null) {
                throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
            }
            // 7. 队列存在, 直接给队列中写入消息
            sendMessage(queue, message);
        } else {
            // 按照 fanout 和 topic 的方式来转发.
            // 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
            ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
            for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {
                // 1) 获取到绑定对象, 判定对应的队列是否存在
                Binding binding = entry.getValue();
                MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                if (queue == null) {
                    // 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
                    // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
                    System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
                    continue;
                }
                // 2) 构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 3) 判定这个消息是否能转发给该队列.
                //    如果是 fanout, 所有绑定的队列都要转发的.
                //    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
                if (!router.route(exchange.getType(), binding, message)) {
                    continue;
                }
                // 4) 真正转发消息给队列
                sendMessage(queue, message);
            }
        }
        return true;
    } catch (Exception e) {
        System.out.println("[VirtualHost] 消息发送失败!");
        e.printStackTrace();
        return false;
    }
}
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
   
}

⭕总结

关于《【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
4月前
|
消息中间件 存储 负载均衡
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
|
6月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
93 0
|
4月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
6月前
|
消息中间件 Java Spring
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
51 0
|
6月前
|
消息中间件 存储 Java
消息队列-死信队列
消息队列-死信队列
58 0
|
6月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
6月前
|
消息中间件 API
【消息队列开发】 实现 MqClientTests 类——测试客户端
【消息队列开发】 实现 MqClientTests 类——测试客户端
|
6月前
|
消息中间件 存储 网络协议
【消息队列开发】实现客户端
【消息队列开发】实现客户端
|
6月前
|
消息中间件 网络协议
【消息队列开发】 设计网络通信协议
【消息队列开发】 设计网络通信协议