RabbmitMQ学习笔记-producer的return Listern机制

简介: retuen 主要处理message 不可达的问题,生产中也遇到过,例如exchanger 未建立、或者queue 和exchanger未绑定关系。这些消息应该让生产者知晓并做相应处理。

retuen 主要处理message 不可达的问题,生产中也遇到过,例如exchanger 未建立、或者queue 和exchanger未绑定关系。这些消息应该让生产者知晓并做相应处理。

一、return Listern机制概述

20200905222417308.png

二、最重要的参数

三、代码

3.1 prducer 代码

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Producer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchange = "test_return_exchange";
    String routingKey = "return.save";
    String routingKeyError = "abc.save";
    String msg = "Hello RabbitMQ Return Message";
    channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange,
          String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("---------handle  return----------");
        System.err.println("replyCode: " + replyCode);
        System.err.println("replyText: " + replyText);
        System.err.println("exchange: " + exchange);
        System.err.println("routingKey: " + routingKey);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
      }
    });
    channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
  }
}

3.2 consummer端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_return_exchange";
    String routingKey = "return.#";
    String queueName = "test_return_queue";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, queueingConsumer);
    while(true){
      Delivery delivery = queueingConsumer.nextDelivery();
      String msg = new String(delivery.getBody());
      System.err.println("消费者: " + msg);
    }
  }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 缓存 人工智能
Kafka生产者客户端几种异常Case详解
1生产者UserCallBack异常 异常日志 ERROR Error executing user-provided callback on message for topic-partition 'Topic1-0' (org.apache.kafka.clients.producer.internals.ProducerBatch) 通常还会有具体的异常栈信息 异常源码 ProducerBatch#completeFutureAndFireCallbacks
Kafka生产者客户端几种异常Case详解
|
缓存 中间件 流计算
如何解决 Netty Channel.isWritable 返回 false
在 Netty 里,有4个方法用来查询 Channel 的状态:isOpen,isRegistered,isActive,isWritable,其中,isWritable 在并发量很高时会返回很多 false。 isWritable 是什么含义? isWritable:Returns true if and only if the I/O thread will perform the req
2825 0
如何解决 Netty Channel.isWritable 返回 false
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
pulsar-client consume test-topic -s 'test-subscription' -p Earliest -n 0 这句命令的作用是什么
【6月更文挑战第27天】pulsar-client consume test-topic -s 'test-subscription' -p Earliest -n 0 这句命令的作用是什么
69 0
|
7月前
|
消息中间件 测试技术 开发工具
消息队列 MQ操作报错合集之收到"WARN RocketmqClient - consumeMessage Orderly return"警告,是什么原因
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
122 0
|
消息中间件 Java Kafka
聊聊 Kafka: Consumer 源码解析之 poll 模型
聊聊 Kafka: Consumer 源码解析之 poll 模型
1003 0
RabbmitMQ学习笔记-producer的Confirm确认机制
RabbmitMQ学习笔记-producer的Confirm确认机制
80 0
|
JSON 监控 测试技术
Rabbmit MQ 清空所有Queue及其message
在测试区,经常因为测试导致测试区很多队列里面堆积很多消息。这一方面对测试区MQ的性能造成问题,dashboard这个插件经常卡主。此外消息过多也不方便debug。
1317 0
|
存储 消息中间件 文件存储
RocketMQ中msg&tag的生命周期
RocketMQ中msg&tag的生命周期
109 0
RocketMQ中msg&tag的生命周期

热门文章

最新文章