【消息队列开发】 实现Router类——交换机的转发规则

简介: 【消息队列开发】 实现Router类——交换机的转发规则

🍃前言

本次开发任务

  • 实现Router类, 使用这个类, 来实现交换机的转发规则.
  • 同时也借助这个类验证 bindingKey 与 routingKey 是否合法。

🍀判断routingKey是否合法

routingKey:发布消息的时候,给消息上指定的特殊字符串,与bindingKey 做匹配的。

如果 rontingKey 是答案,那么bindingKey 就是题目

对于routingKey 的命名,我们做出以下规定

  1. 由数字、字母、下划线组成
  2. 使用.把整个 routingKey 分成多个部分
    形如:aaa.bbb.cc

根据以上规定,我们只需要对传入的 routingKey 进行一个一个字符进行判断即可。

如果符合上述规定,则 continue ,继续循环。若不符合,返回false即可

代码实现如下:

// routingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
public boolean checkRoutingKey(String routingKey) {
    if (routingKey.length() == 0) {
        // 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""
        return true;
    }
    for (int i = 0; i < routingKey.length(); i++) {
        char ch = routingKey.charAt(i);
        // 判定该字符是否是大写字母
        if (ch >= 'A' && ch <= 'Z') {
            continue;
        }
        // 判定该字母是否是小写字母
        if (ch >= 'a' && ch <= 'z') {
            continue;
        }
        // 判定该字母是否是阿拉伯数字
        if (ch >= '0' && ch <= '9') {
            continue;
        }
        // 判定是否是 _ 或者 .
        if (ch == '_' || ch == '.') {
            continue;
        }
        // 该字符, 不是上述任何一种合法情况, 就直接返回 false
        return false;
    }
    // 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 true
    return true;
}

🎄判断bindingKey是否合法

bindingKey:创建绑定的时候,给绑定指定的特殊字符串。

bindingKey 的命名规则如下:

  1. 由数字、字母、下划线组成
  2. 使用.把整个bindingKey分成多个部分
  3. 支持两种特殊的符号作为通配符,且*#必须作为被.分割出来的独立部分
  1. *:可以匹配任何一个独立部分
  2. #:可以匹配任何0个或者多个独立的部分

此外,为了后面方便匹配,博主这里规定,通配符之间有如下规定:

  1. aaa.#.#.bbb => 非法
  2. aaa.#.*.bbb => 非法
  3. aaa.*.#.bbb => 非法
  4. aaa...bbb => 合法

根据以上规定,我们首先先对传进来的 bindingKey 的每一个字符进行判断,若不和上述规定,直接返回false 即可;

若都符合,我们就以.将bindingKey 分为几个独立的部分,用数组进行接收,

然后我们进行判断,如果每个独立部分,长度大于1且含有*或者#,我们就说他是非法的

最后就是相邻的两个独立部分不能是:

  1. ##
  2. *#
  3. #*

代码实现如下:

// bindingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
public boolean checkBindingKey(String bindingKey) {
    if (bindingKey.length() == 0) {
        // 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.
        return true;
    }
    // 检查字符串中不能存在非法字符
    for (int i = 0; i < bindingKey.length(); i++) {
        char ch = bindingKey.charAt(i);
        if (ch >= 'A' && ch <= 'Z') {
            continue;
        }
        if (ch >= 'a' && ch <= 'z') {
            continue;
        }
        if (ch >= '0' && ch <= '9') {
            continue;
        }
        if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {
            continue;
        }
        return false;
    }
    // 检查 * 或者 # 是否是独立的部分.
    // aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.
    String[] words = bindingKey.split("\\.");
    for (String word : words) {
        // 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.
        if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
            return false;
        }
    }
    // 约定一下, 通配符之间的相邻关系(人为(俺)约定的).
    // 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~
    // 1. aaa.#.#.bbb    => 非法
    // 2. aaa.#.*.bbb    => 非法
    // 3. aaa.*.#.bbb    => 非法
    // 4. aaa.*.*.bbb    => 合法
    for (int i = 0; i < words.length - 1; i++) {
        // 连续两个 ##
        if (words[i].equals("#") && words[i + 1].equals("#")) {
            return false;
        }
        // # 连着 *
        if (words[i].equals("#") && words[i + 1].equals("*")) {
            return false;
        }
        // * 连着 #
        if (words[i].equals("*") && words[i + 1].equals("#")) {
            return false;
        }
    }
    return true;
}

🌴判断bindingKey 与 routingKey 是否匹配

我们首先将 bindingKey 与 routingKey 进行切分,用数组进行接收。

引入两个下标, 指向上述两个数组. 初始情况下都为 0

然后进行一一比对,我们分为五种情况进行判断:

  • 情况一:如果遇到普通字符串(不是*,也不是#), 要求两边的内容是一样的.
  • 情况二:如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!
  • 情况三:遇到#,该 # 后面没东西了, 说明此时一定能匹配成功了!
  • 情况四:# 后面还有东西, 拿着后面的内容, 去 routingKey 中往后找, 找到对应的位置.
  • 情况五:判定是否是双方同时到达末尾

根据上述五种情况,代码书写如下:

// 这个方法用来判定该消息是否可以转发给这个绑定对应的队列.
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
    // 根据不同的 exchangeType 使用不同的判定转发规则.
    if (exchangeType == ExchangeType.FANOUT) {
        // 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发
        return true;
    } else if (exchangeType == ExchangeType.TOPIC) {
        // 如果是 TOPIC 主题交换机, 规则就要更复杂一些.
        return routeTopic(binding, message);
    } else {
        // 其他情况是不应该存在的.
        throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);
    }
}
private boolean routeTopic(Binding binding, Message message) {
    // 先把这两个 key 进行切分
    String[] bindingTokens = binding.getBindingKey().split("\\.");
    String[] routingTokens = message.getRoutingKey().split("\\.");
    // 引入两个下标, 指向上述两个数组. 初始情况下都为 0
    int bindingIndex = 0;
    int routingIndex = 0;
    // 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 for
    while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
        if (bindingTokens[bindingIndex].equals("*")) {
            // [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!
            bindingIndex++;
            routingIndex++;
            continue;
        } else if (bindingTokens[bindingIndex].equals("#")) {
            // 如果遇到 #, 需要先看看有没有下一个位置.
            bindingIndex++;
            if (bindingIndex == bindingTokens.length) {
                // [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!
                return true;
            }
            // [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.
            // findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1
            routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
            if (routingIndex == -1) {
                // 没找到匹配的结果. 匹配失败
                return false;
            }
            // 找到的匹配的情况, 继续往后匹配.
            bindingIndex++;
            routingIndex++;
        } else {
            // [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.
            if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
                return false;
            }
            bindingIndex++;
            routingIndex++;
        }
    }
    // [情况五] 判定是否是双方同时到达末尾
    // 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.
    if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
        return true;
    }
    return false;
}
private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {
    for (int i = routingIndex; i < routingTokens.length; i++) {
        if (routingTokens[i].equals(bindingToken)) {
            return i;
        }
    }
    return -1;
}

🌲测试匹配方法

我们对上述所书写的匹配方法进行测试一下。

测试数据如下:

测试代码如下:

@SpringBootTest
public class RouterTests {
    private Router router = new Router();
    private Binding binding = null;
    private Message message = null;
    @BeforeEach
    public void setUp() {
        binding = new Binding();
        message = new Message();
    }
    @AfterEach
    public void tearDown() {
        binding = null;
        message = null;
    }
    // [测试用例]
    // binding key          routing key         result
    // aaa                  aaa                 true
    // aaa.bbb              aaa.bbb             true
    // aaa.bbb              aaa.bbb.ccc         false
    // aaa.bbb              aaa.ccc             false
    // aaa.bbb.ccc          aaa.bbb.ccc         true
    // aaa.*                aaa.bbb             true
    // aaa.*.bbb            aaa.bbb.ccc         false
    // *.aaa.bbb            aaa.bbb             false
    // #                    aaa.bbb.ccc         true
    // aaa.#                aaa.bbb             true
    // aaa.#                aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.ccc             true
    // aaa.#.ccc            aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.aaa.bbb.ccc     true
    // #.ccc                ccc                 true
    // #.ccc                aaa.bbb.ccc         true
    @Test
    public void test1() throws MqException {
        binding.setBindingKey("aaa");
        message.setRoutingKey("aaa");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test2() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test3() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test4() throws MqException {
        binding.setBindingKey("aaa.bbb");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test5() throws MqException {
        binding.setBindingKey("aaa.bbb.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test6() throws MqException {
        binding.setBindingKey("aaa.*");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test7() throws MqException {
        binding.setBindingKey("aaa.*.bbb");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test8() throws MqException {
        binding.setBindingKey("*.aaa.bbb");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test9() throws MqException {
        binding.setBindingKey("#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test10() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test11() throws MqException {
        binding.setBindingKey("aaa.#");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test12() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test13() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test14() throws MqException {
        binding.setBindingKey("aaa.#.ccc");
        message.setRoutingKey("aaa.aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test15() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
    @Test
    public void test16() throws MqException {
        binding.setBindingKey("#.ccc");
        message.setRoutingKey("aaa.bbb.ccc");
        Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
    }
}

测试结果如下:

⭕总结

关于《【消息队列开发】 实现Router类——交换机的转发规则》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
5月前
|
消息中间件 Java 数据库
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
|
4月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
5月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
5月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java Spring
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
50 0
|
5月前
|
消息中间件 API
【消息队列开发】 实现 MqClientTests 类——测试客户端
【消息队列开发】 实现 MqClientTests 类——测试客户端
|
5月前
|
消息中间件 存储 网络协议
【消息队列开发】实现客户端
【消息队列开发】实现客户端
|
5月前
|
消息中间件 网络协议
【消息队列开发】 设计网络通信协议
【消息队列开发】 设计网络通信协议
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
下一篇
无影云桌面