【RabbitMQ】——简单队列和work模式

简介: 【RabbitMQ】——简单队列和work模式

引言

 

在rabbitmq中大概有五种这种消费模式,简单来说是三种,因为后面三种都是基于路由的模式,在这小编就暂且分开来介绍吧。

第一种、简单队列

 

首先我们来看一下这种模式的图解


20170511222044516.png


P:消息的生产者


       C:消息的消费者

       红色:队列

 

生产者发送消息到队列中,消费中从队列中获取消息。每个消息只能被一个消费着消费,即:当消息被消费中获取后,该消息将会在队列中移除。这中模式也可以成为“阅后即焚”。

 

示例代码:

 

获得连接的工具类:

package cn.itcast.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
  public static Connection getConnection() throws Exception {
    // 定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务地址
    factory.setHost("192.168.92.9");
    // 端口
    factory.setPort(5672);
    // 设置账号信息,用户名、密码、vhost
    factory.setVirtualHost("/taotao");
    factory.setUsername("taotao");
    factory.setPassword("taotao");
    // 通过工程获取连接
    Connection connection = factory.newConnection();
    return connection;
  }
}

生产者代码:

package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
    private final static String QUEUE_NAME = "test_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}


当生产者代码成功运行后,我们通过管理工具查看会发现一个队列,并且队列中有一条信息。

 

20170511223337865.png


我们点击队列名称可以查看消息的详细信息。

 

消费者代码:

package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
  private final static String QUEUE_NAME = "test_queue";
  public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列 因为消费者事先并不知道该队列的存在,所以在获取消息之前,自己先定义一个队列
    // 如果消费者事先非常确定,该队列的存在,则不需要在创建队列,创建已经存在的队列,盖伊队列会被忽略
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列
    channel.basicConsume(QUEUE_NAME, true, consumer);
    // 获取消息
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }
  }
}

当消费者代码成功运行后,我们通过管理工具会发现队列中消息的数量从1变为0,这就说明消息被消费者获取以后,会被队列删除。

 

第二种、work模式


20170511223749398.png

一个生产者、两个消费者;一个消息只能被一个消费者获取。

 

在这种模式中又可以分为两种模式,一种是两个消费中平均消费队列中的消息。也就说无论两个消费者的消费能力如何,都会平均获取消息。另一种方式则是,能者多劳模式,处理消息能力强的消费者会获取更多的消息,这种模式似乎更符合一些。

 

生产者向队列中发送50条消息。


package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        channel.close();
        connection.close();
    }
}


我们看到上面的代码中,生产者每生产一条消息后都会休眠一段时间,并且越往后面休眠的时间越长。

消费者1代码:

package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        //channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消费者2和消费者1的代码一样,只是消费者2休眠的时间更长一些为1000ms.

上面的代码也就说,消费者1的处理能力比消费者2更强,但是当我们看到结果的时候会发现,两个消费者获得消息的数量却是一样的。这就是work模式中的平均消费模式。


在消费中的代码中红色部分是被注释掉了,现在我们将其打开,我们再次运行会发现,这时候消费者2仅仅获得了8条消息,而消费者1却获得了42条消息。这也就是我们说的work模式中“能者多劳”模式。

 

这句代码的意思大概是,服务器同一时刻只会发送一条消息给你消费者,但是并不指定发送给那个消费者,这时候那个消费者有能力获得谁就获取。

 

两种模式对比

 

通过上面的介绍我们会发现,这两种工作模式是有几处不相同的地方。在消费者数量上面:简单模式只有一个消费者,而work模式可以有多个消费者。认真的读者会发现,work模式中需要手动返回完成状态,而简单模式shiite自动完成的,这两种模式是通过监听队列中的一个参数设定的.false说明需要手动返回,true说明自动返回。


                 // 监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME, false, consumer);


小结

上面介绍了比较简单的两种,这两种模式比较简单,还没有用到exchange的概念,在下面的博客中将介绍三种路由模式。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
14 1
|
20小时前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
7 0
|
22天前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
26天前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
26天前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
1月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
35 0
说说RabbitMQ延迟队列实现原理?
|
22天前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
43 1
|
26天前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
|
2月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
39 0