创建队列、交换机以及绑定关系
@Configuration public class DirectRabbitMQConfig { public static final String CLUSTER_EXCHANGE_NAME = "cluster_direct_exchange"; public static final String CLUSTER_QUEUE_NAME = "cluster_direct_queue"; public static final String CLUSTER_ROUTING_KEY = "cluster"; @Bean public DirectExchange clusterDirectExchange() { return new DirectExchange(CLUSTER_EXCHANGE_NAME,true,false); } @Bean public Queue clusterDirectQueue() { return new Queue(CLUSTER_QUEUE_NAME,true,false,false); } @Bean public Binding clusterDirectBinding(@Qualifier("clusterDirectQueue") Queue clusterDirectQueue, @Qualifier("clusterDirectExchange") DirectExchange clusterDirectExchange) { return BindingBuilder.bind(clusterDirectQueue).to(clusterDirectExchange).with(CLUSTER_ROUTING_KEY); } }
在单元测试中进行消息发送测试
public class RabbitMQTest { @Autowired private RabbitTemplate rabbitTemplate; @Test void directQueueTest() { rabbitTemplate.convertAndSend(DirectRabbitMQConfig.CLUSTER_EXCHANGE_NAME,DirectRabbitMQConfig.CLUSTER_ROUTING_KEY,"Hello Rabbitmq cluster"); } }
消费者端代码
@Service public class DirectClusterConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitListener(queues = {"cluster_direct_queue"}) public void receiveClusterMessage(String message) { logger.info("接收到消息: " + message); } }
2.4、反向测试
2.4.1、停止主节点
在三个节点都启动的状态下,先发布一条消息,然后停掉主节点,也就是 mq01
然后动消费者服务,可以看到服务报错,无法获取到队列消息
2.4.2、停止从节点
此时,停止任意子节点,这里停止 mq02
节点
可以看到,消息队列的各项功能都不受影响。
三、配置镜像集群
镜像集群不需要额外搭建,只需要将队列配置为镜像队列即可。这个配置可以通过网页配置,也可以通过命令行配置。
3.1、网页配置镜像队列
点击 Admin 选项卡,然后点击右边的 Policies
,再点击 Add/update a policy
,如下图:
然后添加策略
各参数含义如下:
Name
:policy 的名称
Pattern
:queue 的匹配模式(正则表达式)
Definition
:镜像定义,主要由三个参数:ha-mode
,ha-params
,ha-sync-mode
。
ha-mode
:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。ha-params
:ha-mode 模式需要用到的参数ha-sync-mode
:进行队列中消息的同步方式,有效值为 automatic 和 manual。priority
为可选参数,表示 policy 的优先级
添加完效果如下
3.2、命令行配置
命令行的配置格式如下:
$ rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition} $ docker exec -it rabbit01 bash $ rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
3.3、测试
停止 mq01
节点,可以看到此时消息队列集群没有任何影响