RabbitMQ工作模式(一)

简介: 简单工作模式

一、简单模式介绍

RabbitMQ中最普通的工作模式。


  • P:生产者,发送消息的程序
  • C:消费者,消息的接收者,会一直等待着消息到来
  • queue:消息队列,图中的红色部分,可以缓存消息。生产者投递消息到队列,消费者从队列中取出消息。

二、示例

1、创建项目

idea创建maven项目,新建三个模块,公共、生产者和消费者。在公共模块的pom文件中添加如下依赖,剩余两个模块直接依赖公共模块。

<dependencies><!--https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

2、创建连接工具类

packagecom.cui.common;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassRabbitMqUtils {
publicstaticConnectiongetConnection(){
// 创建连接工厂ConnectionFactoryfactory=newConnectionFactory();
// 设置参数factory.setHost("127.0.0.1");//IP地址,默认值为localhostfactory.setPort(5672);//端口号,默认值为5672factory.setVirtualHost("/");//虚拟主机,默认值为/factory.setUsername("root");//用户名,默认值为guestfactory.setPassword("root");//密码,默认值为guest// 创建连接Connectionconn=null;
try {
conn=factory.newConnection();
        } catch (Exceptione) {
e.printStackTrace();
        }
returnconn;
    }
publicstaticvoidclose(Channelchannel, Connectionconn){
if (channel!=null){
try {
channel.close();
            } catch (Exceptione) {
e.printStackTrace();
            }
        }
if (conn!=null){
try {
conn.close();
            } catch (Exceptione) {
e.printStackTrace();
            }
        }
    }
}

3、生产者

packagecom.cui.producer;
importcom.cui.common.RabbitMqUtils;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
/*** 发送消息* @author: CUI* @date: 2022-04-18 10:01*/publicclassProducer_HelloWord {
publicstaticvoidmain(String[] args) throwsIOException, TimeoutException {
//获得连接Connectionconnection=RabbitMqUtils.getConnection();
//创建channelChannelchannel=connection.createChannel();
//创建队列/*** 参数1:队列名称*  参数2:是否持久化 true:持久化  false:不持久化*  参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除*  参数4:是否自动删除 true:自动删除  false:不自动删除*  参数5:队列的其他参数*  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null);
//发送消息/**** 参数1: 交换机名称 简单模式下会创建默认的交换机,设置为””* 参数2:消息的路由键 routingKey* 参数3:配置信息 props* 参数4:消息的内容 body*  basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)*/channel.basicPublish("", "hello_Word", null, "hello word".getBytes());
//释放资源RabbitMqUtils.close(channel, connection);
    }
}

运行生产者的代码,访问http://localhost:15672,可查看到发送的一条消息

4、消费者

packagecom.cui.consumer;
importcom.cui.common.RabbitMqUtils;
importcom.rabbitmq.client.*;
importjava.io.IOException;
publicclassConsumer_HelloWord {
publicstaticvoidmain(String[] args) throwsIOException {
Connectionconnection=RabbitMqUtils.getConnection();
Channelchannel=connection.createChannel();
/*** 参数1:队列名称*  参数2:是否持久化 true:持久化  false:不持久化*  参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除*  参数4:是否自动删除 true:自动删除  false:不自动删除*  参数5:队列的其他参数*  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null);
//接收消息Consumerconsumer=newDefaultConsumer(channel){
@Override/*** 当收到消息后,会自动执行该方法* @param consumerTag 消费者标签* @param envelope 消息包裹(可以获取交换机、路由的信息等)* @param properties 消息属性* @param body 消息体* @return: void*/publicvoidhandleDelivery(StringconsumerTag, Envelopeenvelope, AMQP.BasicPropertiesproperties, byte[] body) throwsIOException {
Stringmessage=newString(body, "UTF-8");
System.out.println("接收到消息:"+message);
            }
        };
/****  参数1:队列名称*  参数2:是否自动确认 true:自动确认  false:手动确认*  参数3:回调函数*  basicConsume(String queue, boolean autoAck, Consumer callback)*/channel.basicConsume("hello_Word", true, consumer);
    }
}

运行消费者代码,消费之前生产者发送的消息,rabbitmq的网页管理端可以发现待消费的消息为0了

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
7天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
39 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
86 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
87 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
76 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决