一、简单模式介绍
RabbitMQ中最普通的工作模式。
- P:生产者,发送消息的程序
- C:消费者,消息的接收者,会一直等待着消息到来
- queue:消息队列,图中的红色部分,可以缓存消息。生产者投递消息到队列,消费者从队列中取出消息。
二、示例
1、创建项目
idea创建maven项目,新建三个模块,公共、生产者和消费者。在公共模块的pom文件中添加如下依赖,剩余两个模块直接依赖公共模块。
<dependencies><!--https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
2、创建连接工具类
packagecom.cui.common; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.ConnectionFactory; publicclassRabbitMqUtils { publicstaticConnectiongetConnection(){ // 创建连接工厂ConnectionFactoryfactory=newConnectionFactory(); // 设置参数factory.setHost("127.0.0.1");//IP地址,默认值为localhostfactory.setPort(5672);//端口号,默认值为5672factory.setVirtualHost("/");//虚拟主机,默认值为/factory.setUsername("root");//用户名,默认值为guestfactory.setPassword("root");//密码,默认值为guest// 创建连接Connectionconn=null; try { conn=factory.newConnection(); } catch (Exceptione) { e.printStackTrace(); } returnconn; } publicstaticvoidclose(Channelchannel, Connectionconn){ if (channel!=null){ try { channel.close(); } catch (Exceptione) { e.printStackTrace(); } } if (conn!=null){ try { conn.close(); } catch (Exceptione) { e.printStackTrace(); } } } }
3、生产者
packagecom.cui.producer; importcom.cui.common.RabbitMqUtils; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.Connection; importjava.io.IOException; importjava.util.concurrent.TimeoutException; /*** 发送消息* @author: CUI* @date: 2022-04-18 10:01*/publicclassProducer_HelloWord { publicstaticvoidmain(String[] args) throwsIOException, TimeoutException { //获得连接Connectionconnection=RabbitMqUtils.getConnection(); //创建channelChannelchannel=connection.createChannel(); //创建队列/*** 参数1:队列名称* 参数2:是否持久化 true:持久化 false:不持久化* 参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除* 参数4:是否自动删除 true:自动删除 false:不自动删除* 参数5:队列的其他参数* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null); //发送消息/**** 参数1: 交换机名称 简单模式下会创建默认的交换机,设置为””* 参数2:消息的路由键 routingKey* 参数3:配置信息 props* 参数4:消息的内容 body* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)*/channel.basicPublish("", "hello_Word", null, "hello word".getBytes()); //释放资源RabbitMqUtils.close(channel, connection); } }
运行生产者的代码,访问http://localhost:15672,可查看到发送的一条消息
4、消费者
packagecom.cui.consumer; importcom.cui.common.RabbitMqUtils; importcom.rabbitmq.client.*; importjava.io.IOException; publicclassConsumer_HelloWord { publicstaticvoidmain(String[] args) throwsIOException { Connectionconnection=RabbitMqUtils.getConnection(); Channelchannel=connection.createChannel(); /*** 参数1:队列名称* 参数2:是否持久化 true:持久化 false:不持久化* 参数3:是否独占队列 exclusive 只能有一个消费者监听这个队列,当消费者断开连接后,队列自动删除* 参数4:是否自动删除 true:自动删除 false:不自动删除* 参数5:队列的其他参数* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*///如果没有这个队列,则创建channel.queueDeclare("hello_Word", true, false, false, null); //接收消息Consumerconsumer=newDefaultConsumer(channel){ /*** 当收到消息后,会自动执行该方法* @param consumerTag 消费者标签* @param envelope 消息包裹(可以获取交换机、路由的信息等)* @param properties 消息属性* @param body 消息体* @return: void*/publicvoidhandleDelivery(StringconsumerTag, Envelopeenvelope, AMQP.BasicPropertiesproperties, byte[] body) throwsIOException { Stringmessage=newString(body, "UTF-8"); System.out.println("接收到消息:"+message); } }; /**** 参数1:队列名称* 参数2:是否自动确认 true:自动确认 false:手动确认* 参数3:回调函数* basicConsume(String queue, boolean autoAck, Consumer callback)*/channel.basicConsume("hello_Word", true, consumer); } }
运行消费者代码,消费之前生产者发送的消息,rabbitmq的网页管理端可以发现待消费的消息为0了