108.【RabbitsMQ】(四)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 108.【RabbitsMQ】

4. RabbitMQ入门案例 -Work模式

  1. 消息产生者将消息放入队列消费者可以有多个,消费者 1, 消费者 2。同时监听同一个队列,消息被消费?C1 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息 (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关 (syncronize, 与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
  2. 应用场景:红包;大项目中的资源调度 (任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

一个消息只能被一个消费者获取

工作队列模式的特点有三:

  1. 一个生产者,一个队列,多个消费者同时竞争消息
  2. 任务量过高时可以提高工作效率
  3. 消费者获得的消息是无序的
(1).轮询分发模式 (Polling)

一个消息提供者发送十条消息

package com.jsxs.rabbitmq.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 *  生产者
 */
public class Producer {
    public static void main(String[] args) {
        //  1.消息头
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPort(5672);
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");  // 虚拟机
        Connection connection =null;
        Channel channel=null;
        try {
            connection= connectionFactory.newConnection();  // 连接
            channel=connection.createChannel();  //管道
            String queName="queue1";
            channel.queueDeclare(queName,false,false,false,null);  //声明
            String message="工作模式";
            for (int i = 0; i < 10; i++) {
                String a=i+"";
                channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("信息发送通过");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            //  关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection!=null&&connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

创建第一个消费者 性能比较差->(Thread.Sleep(2000))

// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
package com.jsxs.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        // 连接
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
        // 4、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取消息:" + new String(body));
                // 模拟消息处理延时,加个线程睡眠时间
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // basicConsume(队列名称, 是否自动确认, 回调对象)
        channel.basicConsume("queue1", false, defaultConsumer);
    }
}

创建第二个消费者-》(性能比较好 Thread.Sleep(1000))

// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        // 连接和管道的创建
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
        // 4、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取消息:" + new String(body));
                // 模拟消息处理延时,加个线程睡眠时间
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // basicConsume(队列名称, 是否自动确认, 回调对象)
        channel.basicConsume("queue1", false, defaultConsumer);
    }
}

首先我们先启动生产者生产消息,通过交互机向消息队列中存储十条消息,然后我们分别开启两个消费者,进行对消息的消费。

生产者->消息

两个消费者->消费


上面的代码实现就是轮询分发的方式。现象:消费者1 处理完消息之后,消费者2 才能处理,它两这样轮着来处理消息,直到消息处理完成,这种方式叫轮询分发(round-robin),结果就是不管两个消费者谁忙,「数据总是你一个我一个」,不管消费者处理数据的性能。

假如说我们生产者在设置队列的时候进行配置的是持久化,那么我们消费者就应该在接受的时候进行设置删除消息的配置(也就是布尔值相同)

注意autoAck属性设置为true,表示消息自动确认。消费者在消费时消息的确认模式可以分为『自动确认和手动确认』。

// basicConsume(队列名称, 是否自动确认autoAck, 回调对象)
        channel.basicConsume("queue1", false, defaultConsumer);

自动确认:在队列中的消息被消费者读取之后会自动从队列中删除。不管消息是否被消费者消费成功,消息都会删除。

手动确认:当消费者读取消息后,消费端需要手动发送ACK用于确认消息已经消费成功了(也就是需要自己编写代码发送ACK确认),如果设为手动确认而没有发送ACK确认,那么消息就会一直存在队列中(前提是进行了持久化操作),后续就可能会造成消息重复消费,如果过多的消息堆积在队列中,还可能造成内存溢出,『手动确认消费者在处理完消息之后要及时发送ACK确认给队列』。

// 手动回执消息
   channel.basicAck(envelope.getDeliveryTag(), false);

使用轮询分发的方式会有一个明显的缺点,例如消费者1 处理数据的效率很慢,消费者2 处理数据的效率很高,正常情况下消费者2处理的数据应该多一点才对,而轮询分发则不管你的性能如何,反正就是每次处理一个消息,对于这种情况可以使用公平分发的方式来解决。

(2).公平分发模式 (Fair)

前提:

  1. 生产者设置一次只分发一个消息
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
  1. 如果生产者设置持久化,我们要设置自动提交或者手动提交
    创建消费者
// 第二个参数是否持久化,加入持久化我们要进行提交
 channel.queueDeclare(queName,false,false,false,null);  //声明

生产者

// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 *  生产者
 */
public class Producer {
    public static void main(String[] args) {
        //  1.消息头
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPort(5672);
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");  // 虚拟机
        Connection connection =null;
        Channel channel=null;
        try {
            connection= connectionFactory.newConnection();  // 连接
            channel=connection.createChannel();  //管道
            String queName="queue1";
            channel.queueDeclare(queName,false,false,false,null);  //声明
            String message="工作模式";
            for (int i = 0; i < 10; i++) {
                String a=i+"";
                channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("信息发送通过");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            //  关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection!=null&&connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者->性能不好的Thrad.Sleep(2000)

// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        // 连接和管道的创建
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
        // 4、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取消息:" + new String(body));
                // 模拟消息处理延时,加个线程睡眠时间
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // basicConsume(队列名称, 是否自动确认, 回调对象)
        channel.basicConsume("queue1", false, defaultConsumer);
    }
}

性能好的->Threadd.Sleep(1000)

// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.48.9");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("xxx");
        connectionFactory.setVirtualHost("/");
        // 连接和管道的创建
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
        channel.basicQos(1);
        // 4、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取消息:" + new String(body));
                // 模拟消息处理延时,加个线程睡眠时间
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // basicConsume(队列名称, 是否自动确认, 回调对象)
        channel.basicConsume("queue1", false, defaultConsumer);
    }
}

运行结果: 我们发现会受到消费者的性能的影响





相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
消息中间件
|
机器学习/深度学习 人工智能 自然语言处理
|
消息中间件 存储 JSON
RabbitMQ之幂等性问题处理
RabbitMQ之幂等性问题处理
|
SQL 缓存 关系型数据库
什么情况下需要考虑分库分表?
什么情况下需要考虑分库分表?
144 0
|
机器学习/深度学习 人工智能 算法
【保姆级教程】用PAI-DSW修复亚运历史老照片
本教程整合了来自开源社区的高质量图像修复、去噪、上色等算法,并使用 Stable Diffusion WebUI 进行交互式图像修复。参与者可以根据需要进行参数调整,组合不同的处理方式以获得最佳修复效果。参与者还可以在活动页面上传修复后的成果图片,参与比赛,获胜者将有机会获得丰厚的奖品。
44345 189
【保姆级教程】用PAI-DSW修复亚运历史老照片
|
存储 运维 Dubbo
SAE急速部署
Serverless应用引擎 2.0的推出,您是否用过?更快速的部署,更低的成本,SAE 2.0 极简体验、极致弹性,助力企业降本增效!
10027 14
SAE急速部署
|
消息中间件 Serverless Kafka
基于 EventBridge 轻松搭建消息集成应用
基于 EventBridge 轻松搭建消息集成应用
22173 8
|
人工智能 Serverless 开发者
阿里云 X 森马 AIGC T恤设计大赛开启! 穿什么由你定,赢Airpods,作品定制联名T恤
函数计算部署 Stable Diffusion, 内置常用插件+ControlNet,支持 SDXL1.0。阿里云 X 森马 AIGC T 恤设计大赛开启! 使用 SD 展现创意和技术,即有机会赢得 Airpods 、作品定制阿里云X森马联名T恤等丰厚奖励.
阿里云 X 森马 AIGC T恤设计大赛开启! 穿什么由你定,赢Airpods,作品定制联名T恤
|
人工智能 人机交互 语音技术
INTERSPEECH2023论文解读|BAT一种低延迟低内存消耗的RNN-T模型
INTERSPEECH2023论文解读|BAT一种低延迟低内存消耗的RNN-T模型
225 0
|
IDE 编译器 开发工具
善用 vs 中的错误列表和输出窗口,高效查找 C++ 多工程编译错误
善用 vs 中的错误列表和输出窗口,高效查找 C++ 多工程编译错误