【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作

简介: 【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作

🍃前言

本次开发任务

  • 实现 VirtualHostTests 类,对虚拟主机进行测试

🎍准备与收尾工作

准备工作我们需要做的有

  1. 虚拟机操作后续涉及操作数据库,所以我们需要将spring上下文进行获取一下;
  2. 创建一个虚拟主机

收尾工作我们需要做的有:

  1. 关闭上下文
  2. 虚拟主机置为null
  3. 删除硬盘上的目录及目录下的文件

代码实现如下:

private VirtualHost virtualHost = null;
@BeforeEach
public void setUp() {
    MqApplication.context = SpringApplication.run(MqApplication.class);
    virtualHost = new VirtualHost("default");
}
@AfterEach
public void tearDown() throws IOException {
    MqApplication.context.close();
    virtualHost = null;
    // 把硬盘的目录删除掉
    File dataDir = new File("./data");
    FileUtils.deleteDirectory(dataDir);
}

🍀测试交换机与队列操作

测试交换机我们主要测试四方法:

  1. 增加交换机
  2. 删除交换机
  3. 增加队列
  4. 删除队列

由于虚拟机里面的方法,都是调用我们前面已经测试过了的方法,所以我们这里只用看该方法返回值知否未true就好

代码实现如下:

@Test
public void testExchangeDeclare() {
    boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
}
@Test
public void testExchangeDelete() {
    boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDelete("testExchange");
    Assertions.assertTrue(ok);
}
@Test
public void testQueueDeclare() {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
}
@Test
public void testQueueDelete() {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueDelete("testQueue");
    Assertions.assertTrue(ok);
}

🎄测试绑定操作

创建相应的队列与交换机,然后将这些队列交换机添加绑定即可

删除也是如此:

代码实现如下:

@Test
public void testQueueBind() {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
    Assertions.assertTrue(ok);
}
@Test
public void testQueueUnbind() {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
    Assertions.assertTrue(ok);
    ok = virtualHost.queueUnbind("testQueue", "testExchange");
    Assertions.assertTrue(ok);
}

🌴测试将消息发入交换机

传参调用即可,代码实现如下:

@Test
public void testBasicPublish() {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.basicPublish("testExchange", "testQueue", null,
            "hello".getBytes());
    Assertions.assertTrue(ok);
}

🌲测试订阅消息

订阅消息我们分为两种情况:

  1. 先订阅队列,再发送消息
  2. 先发送消息,再订阅队列

订阅队列操作,我们只需要出入相应的参数,再实现以下消费者自身的回调方法即可。

发送消息直径发送即可

代码实现如下:

// 先订阅队列, 后发送消息
@Test
public void testBasicConsume1() throws InterruptedException {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    // 先订阅队列
    ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            try {
                // 消费者自身设定的回调方法.
                System.out.println("messageId=" + basicProperties.getMessageId());
                System.out.println("body=" + new String(body, 0, body.length));
                Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
                Assertions.assertEquals(1, basicProperties.getDeliverMode());
                Assertions.assertArrayEquals("hello".getBytes(), body);
            } catch (Error e) {
                // 断言如果失败, 抛出的是 Error, 而不是 Exception!
                e.printStackTrace();
                System.out.println("error");
            }
        }
    });
    Assertions.assertTrue(ok);
    Thread.sleep(500);
    // 再发送消息
    ok = virtualHost.basicPublish("testExchange", "testQueue", null,
            "hello".getBytes());
    Assertions.assertTrue(ok);
}
// 先发送消息, 后订阅队列.
@Test
public void testBasicConsume2() throws InterruptedException {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    // 先发送消息
    ok = virtualHost.basicPublish("testExchange", "testQueue", null,
            "hello".getBytes());
    Assertions.assertTrue(ok);
    // 再订阅队列
    ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            // 消费者自身设定的回调方法.
            System.out.println("messageId=" + basicProperties.getMessageId());
            System.out.println("body=" + new String(body, 0, body.length));
            Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
            Assertions.assertEquals(1, basicProperties.getDeliverMode());
            Assertions.assertArrayEquals("hello".getBytes(), body);
        }
    });
    Assertions.assertTrue(ok);
    Thread.sleep(500);
}

🌳测试FANOUT与TOPIC类型的交换机

上面我们只测试了 DIRECT 类型的交换机,接下来我们测试一下FANOUT与TOPIC类型的交换机。

测试步骤与上述类似,准备数据传入,进行测试就好

代码实现如下:

@Test
public void testBasicConsumeFanout() throws InterruptedException {
    boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueBind("testQueue1", "testExchange", "");
    Assertions.assertTrue(ok);
    ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueBind("testQueue2", "testExchange", "");
    Assertions.assertTrue(ok);
    // 往交换机中发布一个消息
    ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
    Assertions.assertTrue(ok);
    Thread.sleep(500);
    // 两个消费者订阅上述的两个队列.
    ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            System.out.println("consumerTag=" + consumerTag);
            System.out.println("messageId=" + basicProperties.getMessageId());
            Assertions.assertArrayEquals("hello".getBytes(), body);
        }
    });
    Assertions.assertTrue(ok);
    ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            System.out.println("consumerTag=" + consumerTag);
            System.out.println("messageId=" + basicProperties.getMessageId());
            Assertions.assertArrayEquals("hello".getBytes(), body);
        }
    });
    Assertions.assertTrue(ok);
    Thread.sleep(500);
}
@Test
public void testBasicConsumeTopic() throws InterruptedException {
    boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueDeclare("testQueue", false, false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
    Assertions.assertTrue(ok);
    ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
    Assertions.assertTrue(ok);
    ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            System.out.println("consumerTag=" + consumerTag);
            System.out.println("messageId=" + basicProperties.getMessageId());
            Assertions.assertArrayEquals("hello".getBytes(), body);
        }
    });
    Assertions.assertTrue(ok);
    Thread.sleep(500);
}

🎋测试手动应答

测试手动应答的时候。

我们需要在订阅队列时,将 autoAck 改成 false;

并且我们需要在消费者的回调方法里进行调用手动应答的方法。

代码实现如下:

@Test
public void testBasicAck() throws InterruptedException {
    boolean ok = virtualHost.queueDeclare("testQueue", true,
            false, false, null);
    Assertions.assertTrue(ok);
    ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
            true, false, null);
    Assertions.assertTrue(ok);
    // 先发送消息
    ok = virtualHost.basicPublish("testExchange", "testQueue", null,
            "hello".getBytes());
    Assertions.assertTrue(ok);
    // 再订阅队列 [要改的地方, 把 autoAck 改成 false]
    ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
        @Override
        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
            // 消费者自身设定的回调方法.
            System.out.println("messageId=" + basicProperties.getMessageId());
            System.out.println("body=" + new String(body, 0, body.length));
            Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
            Assertions.assertEquals(1, basicProperties.getDeliverMode());
            Assertions.assertArrayEquals("hello".getBytes(), body);
            // [要改的地方, 新增手动调用 basicAck]
            boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
            Assertions.assertTrue(ok);
        }
    });
    Assertions.assertTrue(ok);
    Thread.sleep(500);
}

⭕总结

关于《【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
5月前
|
机器学习/深度学习 人工智能 并行计算
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
AI部署架构:A100、H100、A800、H800、H20的差异以及如何选型?开发、测试、生产环境如何进行AI大模型部署架构?
|
2月前
|
敏捷开发 运维 数据可视化
DevOps看板工具中的协作功能:如何打破开发、测试与运维之间的沟通壁垒
在DevOps实践中,看板工具通过可视化任务管理和自动化流程,提升开发与运维团队的协作效率。它支持敏捷开发、持续交付,助力团队高效应对需求变化,实现跨职能协作与流程优化。
|
3月前
|
传感器 人工智能 JavaScript
鸿蒙开发:DevEcoTesting中的稳定性测试
DevEcoTesting主要的目的也是用于软件的测试,可以让开发者无需复杂的配置,即可一键执行测试任务,同时提供了测试报告和分析,无论是对于开发者还是测试同学来说,都是一个非常方便的工具。
117 3
鸿蒙开发:DevEcoTesting中的稳定性测试
|
2月前
|
运维 jenkins 测试技术
"还在苦等开发部署环境?3步教你用Jenkins拿回测试主动权"
测试工程师最头疼的问题是什么?依赖开发部署环境! 开发延期→测试时间被压缩→紧急上线后BUG频出→测试背锅。传统流程中,测试被动等待部署,效率低下。而Jenkins自动化部署让测试人员自主搭建环境,实现: ✅ 随时触发测试,不再苦等开发 ✅ 部署效率提升10倍,抢回测试时间 ✅ 改善团队协作,减少互相甩锅 学习Jenkins部署能力,成为高效测试工程师,告别被动等待!
|
6月前
|
数据采集 算法 数据安全/隐私保护
【硬件测试】基于FPGA的4ASK调制解调通信系统开发与硬件片内测试,包含信道模块,误码统计模块,可设置SNR
本文介绍了基于FPGA的4ASK调制解调系统的硬件测试版本,该系统包括testbench、高斯信道模块和误码率统计模块,并新增了ILA在线数据采集和VIO在线SNR设置功能。通过VIO设置不同SNR(如15dB和25dB),实现了对系统性能的实时监测与调整。4ASK是一种通过改变载波幅度表示数据的数字调制方式,适用于多种通信场景。FPGA平台的高效性和灵活性使其成为构建高性能通信系统的理想选择。
147 17
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
10月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
162 0
手撸MQ消息队列——循环数组
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
357 1

热门文章

最新文章