RabbitMQ高级特性

简介: RabbitMQ高级特性

Confirm确认消息

Confirm消息确认机制:了消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!

1.在 channel 上开启确认模式: channel . confirmselect ( )

2.在channel上添加监听: addConfirmListener ,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!



生成者:

public class Producer {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 获取Connection
    Connection connection = connectionFactory.newConnection();
    //3 通过Connection创建一个新的Channel
    Channel channel = connection.createChannel();
    //4 指定我们的消息投递模式: 消息的确认模式 
    channel.confirmSelect();
    String exchangeName = "test_confirm_exchange";
    String routingKey = "confirm.save";
    //5 发送一条消息
    String msg = "Hello RabbitMQ Send confirm message!";
    channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    //6 添加一个确认监听
    channel.addConfirmListener(new ConfirmListener() {
      @Override
      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------no ack!-----------");
      }
      @Override
      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
      }
    });
  }
}

Return 消息机制

Return LiStener 用于处理一些不可路由的消息!

我们的消息生产者,通过指定一个 Exchange 和Routingkey ,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!


但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !


Return 消息机制在基础 Api 中有一个关键的配置项: Mandatory :如果为 true ,则监听器会接收到路由不可达的消息然后进行后续处理,如果为 false ,那么 broker 端自动删除该消息!


消费者:

public class Consumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_return_exchange";
    String routingKey = "return.#";
    String queueName = "test_return_queue";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, queueingConsumer);
    while(true){
      Delivery delivery = queueingConsumer.nextDelivery();
      String msg = new String(delivery.getBody());
      System.err.println("消费者: " + msg);
    }
  }
}

生产端:

public class Producer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchange = "test_return_exchange";
    String routingKey = "return.save";
    String routingKeyError = "abc.save";
    String msg = "Hello RabbitMQ Return Message";
    channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange,
          String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("---------handle  return----------");
        System.err.println("replyCode: " + replyCode);
        System.err.println("replyText: " + replyText);
        System.err.println("exchange: " + exchange);
        System.err.println("routingKey: " + routingKey);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
      }
    });
        //第三个参数为mandatory
    channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  }
}

自定义消费者使用

继承DefaultConsumer重写handleDelivery方法。

public class MyConsumer extends DefaultConsumer {
  public MyConsumer(Channel channel) {
    super(channel);
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("consumerTag: " + consumerTag);
    System.err.println("envelope: " + envelope);
    System.err.println("properties: " + properties);
    System.err.println("body: " + new String(body));
  }
}
channel.basicConsume(queueName, true, new MyConsumer(channel));

消费端限流

当我们的RabbitMQ上有成千上万未处理的消息,单个客户端没有办法处理这么多消息。

所有我们一般都不会去使用自动签收(autoAck:true)

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

void basicQos(int prefetchSize, int prefetchCount, boolean global)

prefetchSize: 0  消息的大小限制设置为0表示不做限制。

prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。(每次消费几条数据,消费完就发送ack,在传过来几条)

global: true\false 是否将上面设置应用于channel

简单点说,就是上面限制是channel级别的还是consumer级别


每次签收一条

    //1 限流方式  第一件事就是 autoAck设置为 false
    channel.basicQos(0, 1, false);

自定义消费者ack

public class MyConsumer extends DefaultConsumer {
  private Channel channel ;
  public MyConsumer(Channel channel) {
    super(channel);
    this.channel = channel;
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("consumerTag: " + consumerTag);
    System.err.println("envelope: " + envelope);
    System.err.println("properties: " + properties);
    System.err.println("body: " + new String(body));
        //1.消息标签  2.是否批量签收
    channel.basicAck(envelope.getDeliveryTag(), false);
  }
}

Dead-Letters-Queue “死信队列”

来自一个队列的消息可以被当做‘死信’,即被重新发布到另外一个“exchange”去,这样的情况有:

  • 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false 参数
  • 消息的TTL-存活时间已经过期
  • 队列长度限制被超越(队列满)

Dead Letter Pattern


    “死信”模式指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预。


这个过程中的exchange和queue就是所谓的"Dead Letter Exchange 和 Queue"。


    关键是如何区分“消费失败”和“处理失败”?消费失败需要送到死信队列,而处理失败不需要。大部分情况都属于“处理失败”。


20190224221905869.png

生产端

public class Producer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchange = "test_dlx_exchange";
    String routingKey = "dlx.save";
    String msg = "Hello RabbitMQ DLX Message";
    for(int i =0; i<1; i ++){
      //设置过期时间
      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .deliveryMode(2)
          .contentEncoding("UTF-8")
          .expiration("10000")
          .build();
      channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
  }
}

消费端

public class Consumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.254.129");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    // 这就是一个普通的交换机 和 队列 以及路由
    String exchangeName = "test_dlx_exchange";
    String routingKey = "dlx.#";
    String queueName = "test_dlx_queue";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    Map<String, Object> agruments = new HashMap<String, Object>();
    agruments.put("x-dead-letter-exchange", "dlx.exchange");
    //这个agruments属性,要设置到声明队列上
    channel.queueDeclare(queueName, true, false, false, agruments);
    channel.queueBind(queueName, exchangeName, routingKey);
    //要进行死信队列的声明:
    channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
    channel.queueDeclare("dlx.queue", true, false, false, null);
    channel.queueBind("dlx.queue", "dlx.exchange", "#");
    channel.basicConsume(queueName, true, new MyConsumer(channel));
  }
}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 负载均衡
RocketMQ高级特性
RocketMQ高级特性
120 1
|
消息中间件 Java 程序员
SpringBoot整合RocketMQ,尝尝几大高级特性!
作为一名程序员,您一定熟悉RocketMQ的功能,包括支持事务、顺序和延迟消息等。在程序员界有一句名言,“Talk is cheap. Show me the code” 。本文将通过实际案例来引出解决方案,并通过代码实现,让您在学习本节的过程中能够确切地掌握实际编码技能
237 0
SpringBoot整合RocketMQ,尝尝几大高级特性!
|
消息中间件 缓存 NoSQL
Springboot 整合 RabbitMQ高级特性 & 真实业务应用
🐇🐇前言:我们的RabbitMQ经常被用来做⚡秒杀类业务⚡,所以在商城类项目中充当着一个很重要的中间件,关于它的高级特性和企业级项目中的一些重点问题的解决方案在这里我会进行详细的总结, 并在最后展示一部分。
501 1
|
消息中间件 缓存 监控
RabbitMQ之高级特性
RabbitMQ,高级特性
215 0
|
消息中间件 存储 SQL
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
671 0
RabbitMQ精讲4:深入RabbitMQ高级特性-可靠性投递、幂等性消费、Confirm确认消息、Return返回消息
|
消息中间件
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)(下)
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)
142 0
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)(下)
|
消息中间件
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)(上)
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)
185 0
RabbitMQ高级特性-死信队列(DLX,Dead-Letter-Exchange)(上)
|
消息中间件
RabbitMQ高级特性-TTL(Time-To-Live 过期时间)
RabbitMQ高级特性-TTL(Time-To-Live 过期时间)
139 0
RabbitMQ高级特性-TTL(Time-To-Live 过期时间)
|
消息中间件
RabbitMQ高级特性之延时消息/队列
RabbitMQ高级特性之延时消息/队列
116 0
|
消息中间件 安全 调度
RabbitMQ高级特性之-优先级队列(Priority Queue)
RabbitMQ高级特性之-优先级队列(Priority Queue)
682 0