RabbmitMQ学习笔记-消费端ACK与重回队列机制

简介: RabbmitMQ学习笔记-消费端ACK与重回队列机制

一、消费端ack

二、重回队列

三、代码测试

3.1 producer 端代码

import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchange = "test_ack_exchange";
    String routingKey = "ack.save";
    for(int i =0; i<5; i ++){
      Map<String, Object> headers = new HashMap<String, Object>();
      headers.put("num", i);
      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .deliveryMode(2)
          .contentEncoding("UTF-8")
          .headers(headers)
          .build();
      String msg = "Hello RabbitMQ ACK Message " + i;
      channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
  }
}

3.2 consumer端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
  public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "test_ack_exchange";
    String queueName = "test_ack_queue";
    String routingKey = "ack.#";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    // 手工签收 必须要关闭 autoAck = false
    channel.basicConsume(queueName, false, new MyConsumer(channel));
  }
}

3.3 myconsumer 代码

           channel.basicNack(envelope.getDeliveryTag(), false, true);

第一个参数是deliverTag

第二参数是否批量

第三个测试是否重回队列

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyConsumer extends DefaultConsumer {
  private Channel channel ;
  public MyConsumer(Channel channel) {
    super(channel);
    this.channel = channel;
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");
    System.err.println("body: " + new String(body));
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    if((Integer)properties.getHeaders().get("num") == 0) {
      channel.basicNack(envelope.getDeliveryTag(), false, true);
    } else {
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
}

消息的确认类型:


1)channel.basicAck(deliveryTag, multiple);

consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。

例如:有值为5,6,7,8 deliveryTag的投递

如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。

如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。


2)channel.basicNack(deliveryTag, multiple, requeue);

consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。

如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。

如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。

如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。


3)channel.basicReject(deliveryTag, requeue);

相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。

如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
消息中间件
RabbitMQ手动ACK与死信队列
RabbitMQ手动ACK与死信队列
275 0
|
消息中间件 网络协议 Java
RabbitMQ实战-消费端ACK、NACK及重回队列机制
RabbitMQ实战-消费端ACK、NACK及重回队列机制
1089 0
RabbitMQ实战-消费端ACK、NACK及重回队列机制
|
8月前
|
人工智能 运维 Kubernetes
阿里云容器服务ACK AI助手正式上线带来的便利性
作为开发者想必大家都知道,云原生容器技术的优势,尤其是近两年的随着容器技术的迅猛发展,Kubernetes(K8s)已成为广泛应用于容器编排和管理的领先解决方案,但是K8s的运维复杂度一直是挑战之一。为了应对这一问题,就在最近,阿里云容器服务团队正式发布了ACK AI助手,这是一款旨在通过大模型增强智能诊断的产品,旨在帮助企业和开发者降低Kubernetes(K8s)的运维复杂度。那么本文就来详细讲讲关于这款产品,让我们结合实际案例分享一下K8s的运维经验,探讨ACK AI助手能否有效降低K8s的运维复杂度,并展望ACK AI助手正式版上线后的新功能。
376 2
阿里云容器服务ACK AI助手正式上线带来的便利性
|
8月前
|
存储 Kubernetes Docker
容器服务ACK常见问题之阿里云控制台进不去了如何解决
容器服务ACK(阿里云容器服务 Kubernetes 版)是阿里云提供的一种托管式Kubernetes服务,帮助用户轻松使用Kubernetes进行应用部署、管理和扩展。本汇总收集了容器服务ACK使用中的常见问题及答案,包括集群管理、应用部署、服务访问、网络配置、存储使用、安全保障等方面,旨在帮助用户快速解决使用过程中遇到的难题,提升容器管理和运维效率。
|
8月前
|
人工智能 弹性计算 调度
阿里云容器服务 ACK 产品技术动态(202312)
容器服务 Kubernetes 版 ACK 【新功能】 Feature:支持基于机密虚拟机的 AI 模型推理保护 ACK 现已支持将基于 Intel® Trusted Domain Extension(Intel® TDX)技术的 ECS 实例加入 TDX 机密虚拟机计算节点池,使集群具备 TDX 机密计算能力,实现 AI 模型的可信推理和微调,保障模型数据的机密性与完整性。结合 PyTorch 与 Intel® AMX指令集,您可以在 32 核实例上实现秒级出图的推理能力。
513 1
|
25天前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
4月前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
3年前的云栖大会,我们发布分布式云容器平台ACK One,随着3年的发展,很高兴看到ACK One在混合云,分布式云领域帮助到越来越多的客户,今天给大家汇报下ACK One 3年来的发展演进,以及如何帮助客户解决分布式领域多云多集群管理的挑战。
阿里云容器服务 ACK One 分布式云容器企业落地实践
|
4月前
|
人工智能 Prometheus 监控
使用 NVIDIA NIM 在阿里云容器服务(ACK)中加速 LLM 推理
本文介绍了在阿里云容器服务 ACK 上部署 NVIDIA NIM,结合云原生 AI 套件和 KServe 快速构建高性能模型推理服务的方法。通过阿里云 Prometheus 和 Grafana 实现实时监控,并基于排队请求数配置弹性扩缩容策略,提升服务稳定性和效率。文章提供了详细的部署步骤和示例,帮助读者快速搭建和优化模型推理服务。
215 7
使用 NVIDIA NIM 在阿里云容器服务(ACK)中加速 LLM 推理
|
5月前
|
弹性计算 运维 负载均衡
基于阿里云容器服务Kubernetes版(ACK)| 容器化管理云上应用
【8月更文挑战第3天】基于阿里云容器服务Kubernetes版(ACK)| 容器化管理云上应用
|
6月前
|
人工智能 运维 安全
阿里云容器服务ACK:高效管理云上应用的容器化解决方案
阿里云容器服务ACK(Alibaba Cloud Container Service for Kubernetes)为开发者提供了一套全面的容器化管理解决方案,旨在简化云上应用的部署、运维和管理。本文将深入探讨ACK的功能、优势及应用场景,为开发者展现容器化技术在云环境下的强大能力。
394 0

热门文章

最新文章