RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1

简介: RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1



HelloWord

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区


第一步:导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.yc</groupId>
  <artifactId>rabbitmq-hello</artifactId>
  <version>1.0-SNAPSHOT</version>
  <!--指定jdk编译版本-->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <!--rabbitmq依赖客户端-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.0.0</version>
    </dependency>
    <!--操作文件源的一个依赖-->
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.6</version>
    </dependency>
  </dependencies>
</project>

第二步:创建生产者

//生产者:发消息
public class Producer {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("192.168.80.128");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123");
        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        //生成一个队列
        /*
        * 1.队列名称
        * 2.队列里面的消息是否持久化(磁盘)默认情况消息存储在内存中
        * 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费false:只能一个消费者消费
         *4.是否自动剧除最后一个消贫者端开连接以后该队列是否自动鹏除 true自动鹏除false不自动翮除
         * 5.其它参数*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";//初次使用
        /*发送一个消息
        * 1.发送到哪个交换机
        * 2.路由的Key值是哪个,本次是队列的名称
        * 3.其它参数信息
        * 4.发送消息的消息体*/
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

第三步:创建消费者

//消费者 接收消息的
public class Consumer {
    //队列名称
    public static final String QUEUE_NAME="hello";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.80.128");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消费消息被中断");
        };
        //消费者消费消息
        //1.消费哪个队列
        //2.消费成功之后是否要自动应答
        //3.消费者未成功消费的回调
        //4.消费者取消消费的回调
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

工作队列


因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了
三者之间的关系必须是竞争的关系
因为


这部分代码来来回回都是重复的,所以我们可以抽取连接工厂工具类


public class RabbitMqUtils {
    //得到一个连接的channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.80.128");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

工作线程代码

//这是一个工作线程(相当于之前的消费者)
public class Worker01 {
    //队列的名称
    public  static final String QUEUE_NAME = "hello";
    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消费消息被中断");
        };
        //消息的接收
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动两个工作线程




工作队列(生产者代码)

public class Task01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发送大量消息
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台当中接受信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送消息完成");
        }
    }
}

工作队列(结果成功)





消息应答

我们都知道消费者它完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?
为了保证消息在发送过程中不丢失,rabbitmq_引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。

自动应答

消息发送后,立即被认为已经传送成功了,这种模式需要在高吞吐量和数据传输安全性方面做权衡,使得内存耗尽,最终这些消费者线程被操作系统杀死,这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动消息应答

A.Channel. basicAck(用于肯定确认)
RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel. basicNack(用于否定确认)

C.Channel. basicReject(用于否定确认)

与Channel. basicNack相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了

multiple的解释

multiple 的 true和 false 代表不同意思
rue 代表批量应答channel上未应答的消息

比如说channel上有传送tag 的消息5,6,7,8 当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答
false同上面相比

只会应答 tag=8的消息5,6,7这三个消息依然不会被确认收到消息应答

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 网络协议
RabbitMQ消息的应答
RabbitMQ消息的应答
62 0
|
7月前
|
消息中间件 存储 缓存
RabbitMQ之消息应答和持久化
【1月更文挑战第11天】 一、消息应答 1.概念 2.自动应答 3.消息应答方法 4.Multiple 的解释 5.消息自动重新入队 6.消息手动应答代码 7.手动应答效果演示 二、RabbitMQ持久化 1.概念 2.队列如何实现持久化 3.消息实现持久化 4.不公平分发 5.预取值
330 8
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
754 1
|
3月前
|
消息中间件 存储 JSON
RocketMQ 消费进度持久化
本文介绍了RocketMQ中消费进度的持久化机制,包括普通消息和延迟消息的消费偏移量是如何存储的。普通消息的消费进度存储于`consumerOffset.json`文件,格式为`{Topic}@{ConsumerGroup}`,而延迟消息则存储于`delayOffset.json`文件,以`{delayLevel:offset}`的形式记录。文章详细分析了相关文件内容及代码实现,并指出Broker分别以5秒和10秒的间隔进行持久化操作。
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
74 0
|
7月前
|
消息中间件 存储 Java
RabbitMQ中的消息持久化是如何实现的?
RabbitMQ中的消息持久化是如何实现的?
131 0
|
消息中间件
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
233 0
|
消息中间件 中间件 微服务
RabbitMQ 入门简介及安装
RabbitMQ 入门简介及安装
126 0
|
消息中间件 Ubuntu Shell
ubuntu安装rabbitmq教程 避坑
ubuntu安装rabbitmq教程 避坑
520 0
下一篇
DataWorks