🍃前言
本次开发任务
- 实现 VirtualHostTests 类,对虚拟主机进行测试
🎍准备与收尾工作
准备工作我们需要做的有
- 虚拟机操作后续涉及操作数据库,所以我们需要将spring上下文进行获取一下;
- 创建一个虚拟主机
收尾工作我们需要做的有:
- 关闭上下文
- 虚拟主机置为null
- 删除硬盘上的目录及目录下的文件
代码实现如下:
private VirtualHost virtualHost = null; @BeforeEach public void setUp() { MqApplication.context = SpringApplication.run(MqApplication.class); virtualHost = new VirtualHost("default"); } @AfterEach public void tearDown() throws IOException { MqApplication.context.close(); virtualHost = null; // 把硬盘的目录删除掉 File dataDir = new File("./data"); FileUtils.deleteDirectory(dataDir); }
🍀测试交换机与队列操作
测试交换机我们主要测试四方法:
- 增加交换机
- 删除交换机
- 增加队列
- 删除队列
由于虚拟机里面的方法,都是调用我们前面已经测试过了的方法,所以我们这里只用看该方法返回值知否未true就好
代码实现如下:
@Test public void testExchangeDeclare() { boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); } @Test public void testExchangeDelete() { boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDelete("testExchange"); Assertions.assertTrue(ok); } @Test public void testQueueDeclare() { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); } @Test public void testQueueDelete() { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueDelete("testQueue"); Assertions.assertTrue(ok); }
🎄测试绑定操作
创建相应的队列与交换机,然后将这些队列交换机添加绑定即可
删除也是如此:
代码实现如下:
@Test public void testQueueBind() { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey"); Assertions.assertTrue(ok); } @Test public void testQueueUnbind() { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey"); Assertions.assertTrue(ok); ok = virtualHost.queueUnbind("testQueue", "testExchange"); Assertions.assertTrue(ok); }
🌴测试将消息发入交换机
传参调用即可,代码实现如下:
@Test public void testBasicPublish() { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); }
🌲测试订阅消息
订阅消息我们分为两种情况:
- 先订阅队列,再发送消息
- 先发送消息,再订阅队列
订阅队列操作,我们只需要出入相应的参数,再实现以下消费者自身的回调方法即可。
发送消息直径发送即可
代码实现如下:
// 先订阅队列, 后发送消息 @Test public void testBasicConsume1() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); // 先订阅队列 ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { try { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); } catch (Error e) { // 断言如果失败, 抛出的是 Error, 而不是 Exception! e.printStackTrace(); System.out.println("error"); } } }); Assertions.assertTrue(ok); Thread.sleep(500); // 再发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); } // 先发送消息, 后订阅队列. @Test public void testBasicConsume2() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); // 先发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); // 再订阅队列 ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); Thread.sleep(500); }
🌳测试FANOUT与TOPIC类型的交换机
上面我们只测试了 DIRECT 类型的交换机,接下来我们测试一下FANOUT与TOPIC类型的交换机。
测试步骤与上述类似,准备数据传入,进行测试就好
代码实现如下:
@Test public void testBasicConsumeFanout() throws InterruptedException { boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueDeclare("testQueue1", false, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue1", "testExchange", ""); Assertions.assertTrue(ok); ok = virtualHost.queueDeclare("testQueue2", false, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue2", "testExchange", ""); Assertions.assertTrue(ok); // 往交换机中发布一个消息 ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes()); Assertions.assertTrue(ok); Thread.sleep(500); // 两个消费者订阅上述的两个队列. ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { System.out.println("consumerTag=" + consumerTag); System.out.println("messageId=" + basicProperties.getMessageId()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { System.out.println("consumerTag=" + consumerTag); System.out.println("messageId=" + basicProperties.getMessageId()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); Thread.sleep(500); } @Test public void testBasicConsumeTopic() throws InterruptedException { boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueDeclare("testQueue", false, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb"); Assertions.assertTrue(ok); ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes()); Assertions.assertTrue(ok); ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { System.out.println("consumerTag=" + consumerTag); System.out.println("messageId=" + basicProperties.getMessageId()); Assertions.assertArrayEquals("hello".getBytes(), body); } }); Assertions.assertTrue(ok); Thread.sleep(500); }
🎋测试手动应答
测试手动应答的时候。
我们需要在订阅队列时,将 autoAck 改成 false;
并且我们需要在消费者的回调方法里进行调用手动应答的方法。
代码实现如下:
@Test public void testBasicAck() throws InterruptedException { boolean ok = virtualHost.queueDeclare("testQueue", true, false, false, null); Assertions.assertTrue(ok); ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null); Assertions.assertTrue(ok); // 先发送消息 ok = virtualHost.basicPublish("testExchange", "testQueue", null, "hello".getBytes()); Assertions.assertTrue(ok); // 再订阅队列 [要改的地方, 把 autoAck 改成 false] ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() { @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) { // 消费者自身设定的回调方法. System.out.println("messageId=" + basicProperties.getMessageId()); System.out.println("body=" + new String(body, 0, body.length)); Assertions.assertEquals("testQueue", basicProperties.getRoutingKey()); Assertions.assertEquals(1, basicProperties.getDeliverMode()); Assertions.assertArrayEquals("hello".getBytes(), body); // [要改的地方, 新增手动调用 basicAck] boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId()); Assertions.assertTrue(ok); } }); Assertions.assertTrue(ok); Thread.sleep(500); }
⭕总结
关于《【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下