创建远程访问用户
sudo rabbitmqctl add_user rabbitmq ****** sudo rabbitmqctl set_user_tags rabbitmq administrator sudo rabbitmqctl set_permissions -p "/" rabbitmq ".*" ".*" ".*" //查看新增加的用户 sudo rabbitmqctl list_users
注意:不用在启动后台管理插件了,使用systemctl start rabbitmq-server就已经启动了,端口是15672
Mirror Queue 镜像队列搭建
针对每一个镜像队列都包含一个master节点 和 多个slave节点,需求确保队列的master节点均匀分散的落在集群的各个broker中。如果master不工作,那么假如镜像队列最早的salve升级为master.
镜像队列的配置主要是通过添加相应的 Policy 来完成 :
rabbitmqctl set_policy [-p vhost) [--priority priority) [--apply- to apply- to) {name) {pattern) {definition)
definition 要包含 个部分 ha-mode、 ha-params、 ha-sync-mode
- ha-mode 指明镜像队列的模式,有效值为 all/exactly/nodes默认为 all
all 表示在集群中所有的节点上进行镜像
exactly 表示在指定个数的节点上进行镜像,节点个数由 ha-params 指定;
nodes 表示在指定节点上进行镜像,节点名称通ha-params 指定,节点的名称通常类似于 rabbit@hostname ,可以通过rabbitmqctl cluster status 命令查看到
- ha-params 不同的 hamode 配置中需要用到的参数。
- ha-sync-mode 队列中消息的同步方式,有效值为 automatic 、manual
命令样例
- 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的两个节点上完 成镜像
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_" ' {"ha-mode ":"exactly","ha-params ":2, "ha-sync-mode ": "automatic" }'
- 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的所有节点上完 成镜像
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_" ' {"ha-mode ":"all","ha-sync-mode ":"automatic" }'
- rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’ 可以把队列设置为镜像队列
命令执行
sudo rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_" ' {"ha-mode ":"all","ha-sync-mode ":"automatic" }'
验证
使用新建的rabbitmq用户从本机登录远程的机器
lchod1392: 创建一个队列,以queue开头
队列知识
mandatory、 immediate 参数 channel.basicPublish 方法中的两个参数
- mandatory参数 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监昕器实现。
- immediate参数 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。
- 概括来说 mandatory 参数告诉服务器至少将该消息路由到一个队中, 将消息返 回给生产者。 imrnediate 参数告诉服务器 如果该消息关联的队列上有消费者, 立刻投递; 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
- RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此RabbitMQ官方解释是 immediate 参数会影响镜像队列的性能,增加代码码复杂性,建议采用 TTL、 DLX 的方法
TTL time to live 过期时间
- 设置方式:通过队列属性设置,整个队列的消息都有同样的过期时间;也可以对单条消息单独设置,则一个队列中消息有不同的过期时间。如果两种都设置了,以时间小的为准
- 设置队列消息的TTL代码
Map<String,Object> argss = new HashMap<String, Object>(); argss.put("x-message-ttl " , 5000); channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
- 这种方式, 一旦消息过期,就会从队列中抹去
针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 的属性参数,单位为毫秒:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder deliveryMode(2); 持久化消息 builder expiration( 50000 );/ 设置 TTL=50000ms AMQP.BasicProperties properties = builder. build() ; channel.basicPublish(exchangeName , routingKey, mandatory, properties, "test ttl".getBytes());
- 这种方式, 即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的
- 如果不设置 TTL.则表示此消息不会过期 ;如果将 TTL 设置为 0,则表示除非此时可以直 接将消息投递到消费者,否则该消息会被立即丢弃
- 设置队列的TTL
通过 channel.queueDeclare 方法中的 expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并 且在过期时间段内也未调用过 Basic.Get 命令。
Map<String , Object> args =口ew HashMap<String, Object>{) ; args . put( "x-expires" , 100000); channel . queueDeclare("queuesleb " , false , false , false , args) ;
死信队列 DLX(Dead Letter Message) 当 消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个 交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
- 消息被拒绝 (Basic.Reject/Basic .Na ck) ,井且设置 requeue 参数为 false
- 消息过期
- 队列达到最大长度
- 可以创建消费者监听这个队列的消息进行处理
- 通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这 个队列添加 DLX
channel.exchangeDeclare("dlx_exchange " , "direct "); // 创建 DLX: dlx_exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange" , " dlx-exchange "); //为队列 myqueue 添加 DLX channel.queueDeclare("myqueue" , false , false , false , args); //也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键, 如果指定了,则消费者需要使用 //的路由键才能消费这个队列的消息: args.put("x-dead-letter-routing-key" , "dlx-routing-key");
延迟队列
- 场景:一个订单在30分钟内支付有效,否则自动取消
- 利用上面的TTL和DLX来达到延迟队列的功能
优先级队列
通过设置队列的 x-max-priority 参数来实现:
Map<String, Object> args = new HashMap<String, Object>() ; args.put( "x-max-priority" , 10) ; channel.queueDeclare( "queue.priority" , true , fa1se , false , args) ;
在生产者速度大于消费者速度且broker中有积压的消息的时候,才有效果
持久化
- 交换器的持久化、队列的持久化和消息的持久化 ,才能真正的持久化
- 交换器的持久化:设置durable = true
- 队列的持久化: durable = true
- 消息的持久化:通过将消息的投递模式 (BasicPropertes 中的 deliveryMode 属性)设置为2( DeliveryMode.PERSISTENT) 即可实现消息的持久化 )