RabbitMQ消息模型详解(2)

简介: RabbitMQ消息模型详解(2)

四、简单消息模型

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。


RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

image.png

P(producer/ publisher):生产者,一个发送消息的用户应用程序。


C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序


队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。


总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。


代码演示

引入依赖

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
       <version>2.0.6.RELEASE</version>
</dependency>

配置文件

spring:
  rabbitmq:
    host: 192.168.99.99
    username: leyou
    password: leyou
    virtual-host: /leyou

获取连接

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.99.99");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/leyou");
        factory.setUsername("leyou");
        factory.setPassword("leyou");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

生产者

import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
 * 生产者
 */
public class Send {
    private final static String QUEUE_NAME = "simple_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道,使用通道才能完成消息相关的操作
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        for (int i = 0; i < 10; i++) {
            // 向指定的队列中发送消息
            message=message+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消费者

import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
/**
 * 消费者
 */
public class Recv {
    private final static String QUEUE_NAME = "simple_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。


如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!


因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:


自动ACK:消息一旦被接收,消费者自动发送ACK


手动ACK:消息接收后,不会发送ACK,需要手动调用


手动ACK

import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
/**
 * 消费者,手动进行ACK
 */
public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 创建通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                //int i=1/0;
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // 手动进行ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列,第二个参数false,手动进行ACK
        // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

五、工作模式

工作队列或者竞争消费者模式

image.png

工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。


这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。


接下来我们来模拟这个流程:


P:生产者:任务的发布者


C1:消费者,领取任务并且完成任务,假设完成速度较快


C2:消费者2:领取任务并完成任务,假设完成速度慢


我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

image.png

代码演示:

生产者

import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 生产者
public class Send {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 循环发布任务
        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 2);
        }
        // 关闭通道和连接
        channel.close();
        connection.close();
    }
}

消费者1和消费者2

import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
// 消费者1
public class Recv {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 设置每个消费者同时只能处理一条消息
        //channel.basicQos(1);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者1] received : " + msg + "!");
                try {
                    // 模拟完成任务的耗时:1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
//消费者2
public class Recv2 {
    private final static String QUEUE_NAME = "test_work_queue";
    public static void main(String[] argv) throws Exception {
        // 获取到连接
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 设置每个消费者同时只能处理一条消息
        //channel.basicQos(1);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body 即消息体
                String msg = new String(body);
                System.out.println(" [消费者2] received : " + msg + "!");
                // 手动ACK
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 监听队列。
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

面试题:避免消息堆积?


1)采用workqueue,多个消费者监听同一队列。


2)接收到消息以后,而是通过线程池,异步消费。


六、发布订阅模式

image.png

解读:


1、1个生产者,多个消费者


2、每一个消费者都有自己的一个队列


3、生产者没有将消息直接发送到队列,而是发送到了交换机


4、每个队列都要绑定到交换机


5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的


X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!


Exchange类型有以下几种:


Fanout:广播,将消息交给所有绑定到交换机的队列


Direct:定向,把消息交给符合指定routing key 的队列


Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11天前
|
消息中间件
RabbitMQ消息模型之Routing-Direct
RabbitMQ消息模型之Routing-Direct
19 1
|
11天前
|
消息中间件
RabbitMQ消息模型之发布订阅Publish-Subscribe
RabbitMQ消息模型之发布订阅Publish-Subscribe
16 0
RabbitMQ消息模型之发布订阅Publish-Subscribe
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
|
4月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
9月前
|
消息中间件 算法 Java
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ灵活运用,怎么理解五种消息模型
83 0
|
消息中间件 Java Spring
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
75 0
【RabbitMQ】Basic Queue 简单队列模型与WorkQueue
|
消息中间件 Java Kafka
RabbitMQ安装以及消息模型使用攻略
🍅程序员小王的博客:程序员小王的博客 🍅 欢迎点赞 👍 收藏 ⭐留言 📝 🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕 🍅java自学的学习路线:java自学的学习路线
200 0
RabbitMQ安装以及消息模型使用攻略
|
消息中间件 Java 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
233 0
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
|
消息中间件 JavaScript 前端开发
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)