RabbmitMQ学习笔记-消费端ACK与重回队列机制

简介: RabbmitMQ学习笔记-消费端ACK与重回队列机制

一、消费端ack

二、重回队列

三、代码测试

3.1 producer 端代码

import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchange = "test_ack_exchange";
    String routingKey = "ack.save";
    for(int i =0; i<5; i ++){
      Map<String, Object> headers = new HashMap<String, Object>();
      headers.put("num", i);
      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .deliveryMode(2)
          .contentEncoding("UTF-8")
          .headers(headers)
          .build();
      String msg = "Hello RabbitMQ ACK Message " + i;
      channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
  }
}

3.2 consumer端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_ack_exchange";
    String queueName = "test_ack_queue";
    String routingKey = "ack.#";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    // 手工签收 必须要关闭 autoAck = false
    channel.basicConsume(queueName, false, new MyConsumer(channel));
  }
}

3.3 myconsumer 代码

           channel.basicNack(envelope.getDeliveryTag(), false, true);

第一个参数是deliverTag

第二参数是否批量

第三个测试是否重回队列

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyConsumer extends DefaultConsumer {
  private Channel channel ;
  public MyConsumer(Channel channel) {
    super(channel);
    this.channel = channel;
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("body: " + new String(body));
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    if((Integer)properties.getHeaders().get("num") == 0) {
      channel.basicNack(envelope.getDeliveryTag(), false, true);
    } else {
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
}

消息的确认类型:


1)channel.basicAck(deliveryTag, multiple);

consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。

例如:有值为5,6,7,8 deliveryTag的投递

如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。

如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。


2)channel.basicNack(deliveryTag, multiple, requeue);

consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。

如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。

如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。

如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。


3)channel.basicReject(deliveryTag, requeue);

相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。

如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。


相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
目录
相关文章
|
12月前
|
消息中间件
RabbitMQ手动ACK与死信队列
RabbitMQ手动ACK与死信队列
156 0
|
消息中间件
RabbitMQ实战-消费端ACK、NACK及重回队列机制(下)
RabbitMQ实战-消费端ACK、NACK及重回队列机制(下)
150 0
RabbitMQ实战-消费端ACK、NACK及重回队列机制(下)
|
消息中间件 网络协议 Java
RabbitMQ实战-消费端ACK、NACK及重回队列机制(上)
RabbitMQ实战-消费端ACK、NACK及重回队列机制(上)
822 0
RabbitMQ实战-消费端ACK、NACK及重回队列机制(上)
|
2月前
|
存储 Kubernetes Docker
容器服务ACK常见问题之阿里云控制台进不去了如何解决
容器服务ACK(阿里云容器服务 Kubernetes 版)是阿里云提供的一种托管式Kubernetes服务,帮助用户轻松使用Kubernetes进行应用部署、管理和扩展。本汇总收集了容器服务ACK使用中的常见问题及答案,包括集群管理、应用部署、服务访问、网络配置、存储使用、安全保障等方面,旨在帮助用户快速解决使用过程中遇到的难题,提升容器管理和运维效率。
|
3月前
|
人工智能 运维 Kubernetes
阿里云容器服务ACK AI助手正式上线带来的便利性
作为开发者想必大家都知道,云原生容器技术的优势,尤其是近两年的随着容器技术的迅猛发展,Kubernetes(K8s)已成为广泛应用于容器编排和管理的领先解决方案,但是K8s的运维复杂度一直是挑战之一。为了应对这一问题,就在最近,阿里云容器服务团队正式发布了ACK AI助手,这是一款旨在通过大模型增强智能诊断的产品,旨在帮助企业和开发者降低Kubernetes(K8s)的运维复杂度。那么本文就来详细讲讲关于这款产品,让我们结合实际案例分享一下K8s的运维经验,探讨ACK AI助手能否有效降低K8s的运维复杂度,并展望ACK AI助手正式版上线后的新功能。
277 2
阿里云容器服务ACK AI助手正式上线带来的便利性
|
9月前
|
人工智能 Cloud Native 文件存储
阿里云容器服务ACK云原生AI套件测评
随着人工智能(AI)技术的快速发展,越来越多的企业开始在其业务中引入AI能力,以提高运营效率、优化用户体验,以及创造新的商业价值。像我们这种小型企业也不例外,希望通过集成先进的AI技术来提升业务运营的智能化水平。在这样的背景下,阿里云容器服务ACK推出了云原生AI套件,它能够帮助企业在Kubernetes容器平台上快速构建和运行AI应用,实现全栈优化。本次通过一次实验体验,简单对云原生AI套件进行测评。
96735 1
|
8月前
|
边缘计算 运维 Kubernetes
阿里云原生容器服务产品体系-阿里云边缘容器服务ACK@Edge介绍
阿里云原生容器服务产品体系-阿里云边缘容器服务ACK@Edge介绍
1022 0
阿里云原生容器服务产品体系-阿里云边缘容器服务ACK@Edge介绍
|
4月前
|
Kubernetes 监控 调度
阿里云容器服务ACK
阿里云容器服务ACK(Alibaba Cloud Container Service for Kubernetes)提供高性能、可伸缩的容器应用管理服务,支持企业级Kubernetes容器化应用的生命周期管理。在ACK中,利用cGPU(Containerized GPU)技术可以实现GPU资源的共享,提高GPU利用率,降低整体成本。
68 6
|
6月前
|
JSON Kubernetes 监控
阿里云容器服务 ACK 产品技术动态(202310)
容器服务 Kubernetes 版 ACK Feature:ACK 支持服务网格场景 Sidecar 加速 ACK 支持服务网格场景 Sidecar 加速,即服务网格 Sidecar 模式下,业务注入 Sidecar 之后,可通过开启 Sidecar Acceleration using eBPF 组件来实现同节点下 Sidecar 和 Sidecar 之间、同 Pod 下业务容器和 Sidecar之间的 TCP 网络通信加速。
298 1
|
9月前
阿里云容器服务 ACK 产品技术动态(202307)
阿里云容器服务 ACK 产品技术动态(202307)