RabbitMQ工作模式(一)

简介: 简单工作模式

一、简单模式介绍

RabbitMQ中最普通的工作模式。


  • P:生产者,发送消息的程序
  • C:消费者,消息的接收者,会一直等待着消息到来
  • queue:消息队列,图中的红色部分,可以缓存消息。生产者投递消息到队列,消费者从队列中取出消息。

二、示例

1、创建项目

idea创建maven项目,新建三个模块,公共、生产者和消费者。在公共模块的pom文件中添加如下依赖,剩余两个模块直接依赖公共模块。

<dependencies><!--https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

2、创建连接工具类

packagecom.cui.common;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
publicclassRabbitMqUtils {
publicstaticConnectiongetConnection(){
// 创建连接工厂ConnectionFactoryfactory=newConnectionFactory();
// 设置参数factory.setHost("127.0.0.1");//IP地址,默认值为localhostfactory.setPort(5672);//端口号,默认值为5672factory.setVirtualHost("/");//虚拟主机,默认值为/factory.setUsername("root");//用户名,默认值为guestfactory.setPassword("root");//密码,默认值为guest// 创建连接Connectionconn=null;
try {
conn=factory.newConnection();
        } catch (Exceptione) {
e.printStackTrace();
        }
returnconn;
    }
publicstaticvoidclose(Channelchannel, Connectionconn){
if (channel!=null){
try {
channel.close();
            } catch (Exceptione) {
e.printStackTrace();
            }
        }
if (conn!=null){
try {
conn.close();
            } catch (Exceptione) {
e.printStackTrace();
            }
        }
    }
}

3、生产者

packagecom.cui.producer;
importcom.cui.common.RabbitMqUtils;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
/*** 发送消息* @author: CUI* @date: 2022-04-18 10:01*/publicclassProducer_HelloWord {
publicstaticvoidmain(String[] args) throwsIOException, TimeoutException {
//获得连接Connectionconnection=RabbitMqUtils.getConnection();
//创建channelChannelchannel=connection.createChannel();
//创建队列/*** 参数1:队列名称*  参数2:是否持久化 true:持久化  false:不持久化*  参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除*  参数4:是否自动删除 true:自动删除  false:不自动删除*  参数5:队列的其他参数*  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null);
//发送消息/**** 参数1: 交换机名称 简单模式下会创建默认的交换机,设置为””* 参数2:消息的路由键 routingKey* 参数3:配置信息 props* 参数4:消息的内容 body*  basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)*/channel.basicPublish("", "hello_Word", null, "hello word".getBytes());
//释放资源RabbitMqUtils.close(channel, connection);
    }
}

运行生产者的代码,访问http://localhost:15672,可查看到发送的一条消息

4、消费者

packagecom.cui.consumer;
importcom.cui.common.RabbitMqUtils;
importcom.rabbitmq.client.*;
importjava.io.IOException;
publicclassConsumer_HelloWord {
publicstaticvoidmain(String[] args) throwsIOException {
Connectionconnection=RabbitMqUtils.getConnection();
Channelchannel=connection.createChannel();
/*** 参数1:队列名称*  参数2:是否持久化 true:持久化  false:不持久化*  参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除*  参数4:是否自动删除 true:自动删除  false:不自动删除*  参数5:队列的其他参数*  queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null);
//接收消息Consumerconsumer=newDefaultConsumer(channel){
@Override/*** 当收到消息后,会自动执行该方法* @param consumerTag 消费者标签* @param envelope 消息包裹(可以获取交换机、路由的信息等)* @param properties 消息属性* @param body 消息体* @return: void*/publicvoidhandleDelivery(StringconsumerTag, Envelopeenvelope, AMQP.BasicPropertiesproperties, byte[] body) throwsIOException {
Stringmessage=newString(body, "UTF-8");
System.out.println("接收到消息:"+message);
            }
        };
/****  参数1:队列名称*  参数2:是否自动确认 true:自动确认  false:手动确认*  参数3:回调函数*  basicConsume(String queue, boolean autoAck, Consumer callback)*/channel.basicConsume("hello_Word", true, consumer);
    }
}

运行消费者代码,消费之前生产者发送的消息,rabbitmq的网页管理端可以发现待消费的消息为0了

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
210 1
|
12小时前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
5 0
|
4天前
|
消息中间件 缓存 数据库
rabbitmq系列(二)几种常见模式的应用场景及实现
rabbitmq系列(二)几种常见模式的应用场景及实现
|
15天前
|
消息中间件 Apache C语言
消息队列 MQ产品使用合集之在Cluster部署模式下,使用dashboard无法查询到消费组信息,一般是什么导致的
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
16天前
|
消息中间件 负载均衡 Apache
消息队列 MQ产品使用合集之是否支持Master/Slave模式进行部署?
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
16天前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之POP消费模式是否可以保证消息顺序性
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
1月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
37 0
|
1月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
48 0
|
1月前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
传感器 监控 网络协议
MQTT 发布、订阅模式介绍
【2月更文挑战第17天】
171 6
MQTT 发布、订阅模式介绍