开发者社区> 问答> 正文

Rabbitmq怎么用java代码控制对列大小

Rabbitmq怎么用java代码控制对列大小,当对列满了停止生产,当对列小于对列存放的最大值,则继续生产

展开
收起
ivioc 2018-09-06 10:41:19 3401 0
2 条回答
写回答
取消 提交回答
  • java 数据分析 数据可视化 大数据

    RabbitMQ有两种对队列长度的限制方式

    对队列中消息的条数进行限制 x-max-length
    对队列中消息的总量进行限制 x-max-length-bytes

    2019-07-17 23:03:51
    赞同 展开评论 打赏
  • 乐于学习与分析

    RabbitMQ有两种对队列长度的限制方式

    对队列中消息的条数进行限制 x-max-length
    对队列中消息的总量进行限制 x-max-length-bytes
    对消息总条数进行限制(总条数包括未被消费的消息+被消费但未被确认的消息):

    public class QueueLengthLimit {

    private static final String QUEUE_NAME = "queueLengthLimit";
    
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel senderChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
    
        // 设置队列最大消息数量为5
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-length", 5);
        args.put("x-dead-letter-exchange","normal_exchange");
        args.put("x-dead-letter-routing-key","normal");
        senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
        // 发布6个消息
        for (int i = 0; i < 6;) {
            String message = "NO. " + ++i;
            senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
    
        // 获取的消息为 NO. 2,说明队列头部第一条消息被抛弃
        Thread.sleep(1000);
        GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
        String message = new String(resp.getBody(), "UTF-8");
        System.out.printf("consume: %s\n", message);
        System.out.printf("queue size: %d\n", resp.getMessageCount());
    
        // 现在队列中有4个 Ready消息,1个 Unacked消息。此时再发布两条消息,应该只有 NO. 3 被抛弃。
        senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
        senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
        Thread.sleep(100);
        GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
        message = new String(resp2.getBody(), "UTF-8");
        System.out.printf("consume: %s\n\n", message);
    
        // 现在队列中有4个 Ready消息,2个 Unacked消息。
        // 此时Nack,消息2、4取消退回队列头导致队列消息数量超过设定值,谁能留下?
        consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
        Thread.sleep(5000);
        System.out.println("======================================");
        while (true) {
            resp = consumerChannel.basicGet(QUEUE_NAME, true);
            if (resp == null) {
                break;
            } else {
                message = new String(resp.getBody(), "UTF-8");
                System.out.printf("consume: %s\n", message);
            }
        }
    }

    }
    GetResponse resp.getMessageCount() 队列中未被消费的消息的数量,其中不包含被消费未确认的消息。

    当队列中的消息要超过队列限制时,将失效队首元素,

    这是接收死信的队列,可知被失效的消息是NO.1(队首) 试验验证结果真是上面结论。

    第二种是对队列中消息总字节数进行限制:

    Map args = new HashMap();
    args.put("x-max-length-bytes ", 1000);
    senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
    只计算消息体的字节数,不算消息投,消息属性等字节数。

    RabbitMQ可以设置队列的最大优先级,也可以设置消息的优先级,优先级高的队列中的消息具有更高的被优先消费的权限。

    可以通过如下参数:

    队列的最大优先级:x-max-priority
    消息的优先级:priority
    队列优先级设置方式:

    也可以通过代码去实现:

    Map queueParam = new HashMap<>();
    queueParam.put("x-max-priority",10);
    channel.queueDeclare("queue_priority",true,false,false,queueParam);
    配置了队列优先级之后,会在管理后台界面看到如下Pri的标记:

    上面我们设置了队列的最大优先级,之后我们发送消息的时候便可以设置消息自身的优先级别,来调整消息被消费的优先级顺序。

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.priority(5);
    AMQP.BasicProperties build = builder.build();
    channel.basicPublish("exchange_priority","rk_priority",build,("message-"+i).getBytes());
    接下来我们看一个实现;

    public class PriorityQueue {

    public static void main(String[] args) throws Exception {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare("exchange_priority","direct",true);
    
        Map<String,Object> queueParam = new HashMap<>();
        queueParam.put("x-max-priority",10);
        channel.queueDeclare("queue_priority",true,false,false,queueParam);
        channel.queueBind("queue_priority","exchange_priority","rk_priority");
    
        for(int i = 0 ; i < 10 ; i++){
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            if(i % 2 == 0){
                builder.priority(5);
            }
            AMQP.BasicProperties build = builder.build();
            channel.basicPublish("exchange_priority","rk_priority",build,("message-"+i).getBytes());
        }
    
        channel.close();
        connection.close();
    
    }

    }
    消费者代码:

    public class PriorityConsumer {

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        while(true) {
            GetResponse response = channel.basicGet("queue_priority", false);
            System.out.println(new String(response.getBody()));
            channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
            TimeUnit.MILLISECONDS.sleep(1000);
        }
    }

    }
    ==》

    message-0
    message-2
    message-4
    message-6
    message-8
    message-1
    message-3
    message-5
    message-7
    message-9

    2019-07-17 23:03:51
    赞同 1 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
RocketMQ Client-GO 介绍 立即下载
RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载