RabbitMQ学习笔记 02、生产者与消费者、多消费者平均压力

简介: RabbitMQ学习笔记 02、生产者与消费者、多消费者平均压力

一、第一个生产者与消费者


前提准备条件

首先我们连接使用xshell连接服务器,紧接着启动mq以及管理后台:


systemctl start rabbitmq-server  # 启动
rabbitmqctl status  # 查看状态
rabbitmq-plugins enable rabbitmq_management  # 启动管理后台 http://192.168.118.128:15672/



若是没有分配虚拟主机的最好进行配置一下!


下面是本次示例的目录:



引入依赖:rabbitmq的客户端以及日志实现


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.27</version>
</dependency>



①生产者(发送信息)

下面的功能:连接远程rabbitmq,指定消息队列并发送一条数据。


说明:发送给rabbitmq的消息会暂时存储在队列中,直到对应消费者连接即可进行消费,也就说发送的记录能够被暂存!!!


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @ClassName Send
 * @Author ChangLu
 * @Date 2021/9/23 21:15
 * @Description 发送给RabbitMQ一条消息
 */
public class Send {
    //队列名称
    private static String QUEUE_NAME = "Hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2、设置rabbitmq地址
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //3、建立连接
        Connection connection = connectionFactory.newConnection();
        //4、获取信道
        Channel channel = connection.createChannel();
        //5、声明队列
        //①队列名称;
        //②是否需要持久,例如服务器重启该队列是否需要存在,测试环境一般不需要所以设置为false,生产环境根据需求来定
        //③是否独有的意思,表明该队列是否只能给我这个连接使用。
        //④是否需要自动删除,在该队列没有使用的情况下自动删除
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //6、发布消息(发送出去!)
        String message = "Hello,world!";
        //①设置交换机,这里设置为空表示不以交换机形式进行匹配,而是直接匹配队列的形式
        //②routingkey:就是我们队列名称
        //③props:除了消息体之外还有该属性作为配置,这两者组成消息
        //④消息体:是字节数组形式的
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("发送了消息:"+message);
        //7、关闭连接
        channel.close();
        connection.close();
    }
}



我们运行该程序,若是连接没有问题并且成功发送,程序是不会有报错提示:



给出连接失败的报错提示:




②消费者(接收信息)

其中1、2、3、4、5部分与生产者配置几乎一致,仅仅第6步是调用了接收方法,配置了三个参数,最后一个参数是回调函数是用来拿到指定队列的消息可用来进行消费的!!!


注意:接收消息后不要直接就进行资源的关闭了,因为该接收消息方法是异步的,一旦你进行连接关闭那么就收不到指定队列的消息了!!!


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @ClassName Recv
 * @Author ChangLu
 * @Date 2021/9/23 21:27
 * @Description 接收消息,并打印,持续运行(持续消费)   除了第6部分是接收,其他操作都是相同的
 */
public class Recv {
    //队列名称
    private static String QUEUE_NAME = "Hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2、设置rabbitmq地址
        connectionFactory.setHost("192.168.118.128");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("password");
        //3、建立连接
        Connection connection = connectionFactory.newConnection();
        //4、获取信道
        Channel channel = connection.createChannel();
        //5、声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //6、核心:接收消息。额外说明:由于发送方发送的消息没有消费者接受就会一直存储在队列里,一旦有消费方就会进行消费对应队列中的内容!
        //①队列的名称:指定队列
        //②是否是自动消息确认,意思就是我拿到这个消息之后告诉你发送的人我收到了,有签收的机制
        //③回调函数:获取到这个消息之后进行消费
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取到消息,body就是对应传输过来的消息体
                String message = new String(body,"UTF-8");
                System.out.println("收到消息:"+message);
            }
        });
        //注意:这里就不进行关闭了,因为这里直接关闭连接,上面的回调函数很有可能在进行回调前就结束程序,导致还没有消费程序就over了。
        //上面的回调函数是异步的!!!
    }
}



运行一下:




二、对消息内容进行处理(一中扩展)


其实就是在一中进行改进测试,我们下面贴出生产者与消费者的核心代码:


生产者:发布了10条消息出去


//6、发布消息(发送出去!),指定队列task
for (int i = 0; i < 10; i++) {
    String message = "消息任务" + i + "!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("已发送信息:" + message);
}


消费者:不断的接收指定队列task的信息


//6、接收消息,指定队列task
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //获取到消息,body就是对应传输过来的消息体
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
    }
});
//做任务处理的
public static void execTask(String msg){
    char[] chars = msg.toCharArray();
    System.out.println("开始处理消息:"+msg);
    for (char aChar : chars) {
        //由于发送方发送的信息结尾都有!我们就可以对其来进行作为判断来进行睡眠操作
        if(aChar == '!'){
            try {
                //睡眠一秒,表示用来执行业务
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}



我们可以对发送到指定队列的对应信息来进行相应的处理操作!!!



三、多个消费者平均压力(三步骤)


3.1、前者一、二示例可能会出现问题


之前直接进行处理消息的问题


示例:


//生产者  省略核心代码
for (int i = 0; i < 15; i++) {
    String message = "消息任务"+i;
    if(i%2==0){
        message+= "!";
    }
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("已发送信息:" + message);
}
//消费者  根据!情况来进行睡眠模拟实际执行业务时间
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //获取到消息,body就是对应传输过来的消息体
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
    }
});
//做任务处理的
public static void execTask(String msg){
    char[] chars = msg.toCharArray();
    System.out.println("开始处理消息:"+msg);
    for (char aChar : chars) {
        //由于发送方发送的信息结尾都有!我们就可以对其来进行作为判断来进行睡眠操作
        if(aChar == '!'){
            try {
                //睡眠一秒,表示用来执行业务
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}



根据示例处理方式实际上都是按照消息的数量分配,这就造成了可能一个服务早早的接收了对应数量的任务,并且这些任务执行的很快就全部处理好了;另一个服务也接收到相对等数量的任务,但是由于任务度复杂,耗费时间很长。这就形成了服务1早早结束不再接收消息,服务2一直在很繁忙的处理,服务1也不会进行帮其分担,因为它已经完成了自己的所有任务就不会再接收!


目标方案:


实现公平派遣,不按照消息的数量分配,而是按照压力进行分配

目的:若是有多个消费者,默认是按照消息的数量进行分配,而我们要实现循环调度与公平派遣!!!

需要加入消息确认的机制。


采用合理分配效果


使用了合理分配之后,其实流程为如下:


接收到任务,执行完成任务后进行确认。

确认完成后再向rabbitmq申请任务,这样优先执行完的服务能够继续得到新的任务,此时就达到了分担压力的效果!

对一、二中代码进行更改:


①basicConsume(前设置basicQos(1)
②basicConsume()方法第二个参数设置false,将自动接收关闭掉,需要手动确认
③在回调任务结尾调用channel.basicAck(envelope.getDeliveryTag(),false);,第二个表示是不是同时的把多个一起进行确认不需要。



3.2、实战


其实生产者不用动,还是原来的代码,我们只需要对消费者进行改动,其核心就是说让每个消费者消费完消息后进行手动确认再获取消息,这样就能够达到按需分配,减轻服务器压力,更高效的解决问题,性能提升!


//就是在以前第6步上做文章,分为三个部分更改操作
//1、设置公平
channel.basicQos(1);
//2、设置第二个参数为false,表示要进行手动消息签收
channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body,"UTF-8");
        execTask(message);
        System.out.println("消息:"+message+"处理完毕!");
        //3、执行签收确认操作,此时再向rabbitmq请求消息。(第二个参数表示是否帮多个进行同时确认)
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
});


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
5月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
5月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
137 0
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
87 0
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
75 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
67 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
91 0
|
6月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
175 2
|
6月前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。