RabbmitMQ学习笔记-producer的Confirm确认机制

简介: RabbmitMQ学习笔记-producer的Confirm确认机制

2020090521104561.png

20200905211102607.png

20200905211138341.pngconfrim机制可以尽量的保证生产端消息可靠投递。

producer端代码

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 获取C onnection
    Connection connection = connectionFactory.newConnection();
    //3 通过Connection创建一个新的Channel
    Channel channel = connection.createChannel();
    //4 指定我们的消息投递模式: 消息的确认模式 
    channel.confirmSelect();
    String exchangeName = "test_confirm_exchange";
    String routingKey = "confirm.save";
    //5 发送一条消息
    String msg = "Hello RabbitMQ Send confirm message!";
    channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    //6 添加一个确认监听
    channel.addConfirmListener(new ConfirmListener() {
      @Override
      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------no ack!-----------");
      }
      @Override
      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
      }
    });
  }
}

couumer 端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
  public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactory
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.11.76");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    //2 获取C onnection
    Connection connection = connectionFactory.newConnection();
    //3 通过Connection创建一个新的Channel
    Channel channel = connection.createChannel();
    String exchangeName = "test_confirm_exchange";
    String routingKey = "confirm.#";
    String queueName = "test_confirm_queue";
    //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
    channel.exchangeDeclare(exchangeName, "topic", true);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    //5 创建消费者 
    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, queueingConsumer);
    while(true){
      Delivery delivery = queueingConsumer.nextDelivery();
      String msg = new String(delivery.getBody());
      System.err.println("消费端: " + msg);
    }
  }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
ly~
|
4月前
|
消息中间件 存储 供应链
RocketMQ 消息的重试机制有什么优缺点?
RocketMQ 消息重试机制提高了消息处理的可靠性和系统的适应性,简化了错误处理,但也会增加系统延迟、可能导致消息重复处理并占用系统资源。适用于需要高可靠性的场景,如金融交易和电商订单处理。
ly~
96 5
|
9月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
6月前
|
消息中间件 存储 缓存
RocketMQ发送消息原理(含事务消息)
本文深入探讨了RocketMQ发送消息的原理,包括生产者端的发送流程、Broker端接收和处理消息的流程,以及事务消息的特殊处理机制,提供了对RocketMQ消息发送机制全面的理解。
RocketMQ发送消息原理(含事务消息)
|
6月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
7月前
|
消息中间件 存储 索引
MetaQ/RocketMQ 原理问题之Consumer在MetaQ中工作的问题如何解决
MetaQ/RocketMQ 原理问题之Consumer在MetaQ中工作的问题如何解决
|
7月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
205 1
|
8月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息顺序性
3分钟白话RocketMQ系列—— 如何保证消息顺序性
1726 1
3分钟白话RocketMQ系列—— 如何保证消息顺序性
RabbmitMQ学习笔记-producer的return Listern机制
retuen 主要处理message 不可达的问题,生产中也遇到过,例如exchanger 未建立、或者queue 和exchanger未绑定关系。这些消息应该让生产者知晓并做相应处理。
52 0
|
消息中间件 存储 Java
RocketMQ事务消息原理简析
在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等。利用RocketMQ事务消息特性,可以轻松达到这个目的。本文将从RocketMQ事务消息使用方法说起,探究RocketMQ事务消息实现原理。
311 0