消息被拒MQ

简介: 消息被拒MQ

生产者


/**
 * 消息被拒的情况
 */
public class Produce0001 {
    private  static  final  String NORMAL_EXCHANGE="normal_exchange";
    public static void main(String[] args) throws  Exception{
        Channel channel = untils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //该消息用作队列的个数限制
        for(int i=0;i<10;i++)
        {
            String message="info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息"+message);
        }
    }
}


消费者:


/**
 * 消息被拒的情况
 */
public class Consumer0001 {
        //普通交换机
        private  static  final  String NORMAL_EXCHANGE="normal_exchange";
        //死信交换机
        private  static  final  String DEAD_EXCHANGE="dead_exchange";
        public static void main(String[] args) throws  Exception{
            Channel channel = untils.getChannel();
            //声明死信交换机,类型为direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            //声明死信队列
            String deadQueue="dead_queue";
            channel.queueDeclare(deadQueue,false,false,false,null);
            //死信队列绑定交换和routingKey值
            channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
            //正常队列绑定死信队列
            Map<String,Object> params=new HashMap<>();
            //正常队列设置死信交换机,参数key是固定值
            params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //正常队列设置死信routing-key,参数key是固定值
            params.put("x-dead-letter-routing-key", "lisi");
            //正常队列设置的最大限制长度
            params.put("x-max-length",6);
            System.out.println("等待接收消息....");
            String normalQueue="normal_queue";
            channel.queueDeclare(normalQueue,false,false,false,params);
            channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
            DeliverCallback deliverCallback=(consumerTag, message) -> {
                String s = new String(message.getBody(), StandardCharsets.UTF_8);
                if (s.equals("info5"))
                {
                    System.out.println("info5拒接");
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                }
                else
                {
                    System.out.println("01接收到消息"+s);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
        }
}


结果:


2dab69ad4ab947f9bcea85d85da4d9cd.png

bb3f99a75625411293d8aa03e0edf330.png003d8e5dcb674dbe8a5232ecb23859be.png


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
小程序 API
培训报名小程序-订阅消息发送
培训报名小程序-订阅消息发送
|
6月前
|
消息中间件 存储 缓存
3分钟白话RocketMQ系列—— 如何消费消息
3分钟白话RocketMQ系列—— 如何消费消息
313 0
|
3月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
3月前
|
消息中间件
【面试问题】MQ 消息怎么路由?
【1月更文挑战第27天】【面试问题】MQ 消息怎么路由?
|
4月前
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
|
4月前
|
消息中间件 设计模式 Java
原来RocketMQ消息会重复消费是无奈的”Bug“
大家好,我是三友~~ 在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。 为什么需要保证幂等性呢?是因为消息会重复消费。 为什么消息会重复消费? 明明已经消费了,为什么消息会被再次被消费呢? 不同的MQ产生的原因可能不一样 本文就以RocketMQ为例,来扒一扒RocketMQ中会导致消息重复消息的原因,最终你会发现,其实消息重复消费算是RocketMQ无奈的“bug”。
原来RocketMQ消息会重复消费是无奈的”Bug“
|
消息中间件 存储 中间件
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
209 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2500 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 RocketMQ 开发者
消息消费初探|学习笔记
快速学习消息消费初探
63 0
消息消费初探|学习笔记