rabbitmq是spring一个公司的,所以很多公司 企业选择用rabbitmq。
所有的中间件技术都是基于TCP、IP的协议,只不过rabbitmq遵循的是amqp,在tcp、ip基础之上定义的。
五个用的比较多的模式。现在来实现一个简单的步骤。
这就是最简单的模式。一个生产者一个消费者,一个MQ。
因为是基于tcp和ip的。所以一定会有ip和端口。
具体步骤:
1、创建连接工程。
2、创建连接Connection
3、通过连接获取通道Channel。
4、通过通道创建交换机、声明队列、绑定关系、路由Key、发送消息、接收消息。
5、准备消息内容。
6、发送消息给消息队列queue。
7、关闭连接。
8、关闭通道。
1、构建一个maven工程
2、导入rabbitmq的maven依赖
java的原声依赖 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
spring的依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.2.5.RELEASE</version> </dependency>
springboot依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因
3、启动rabbitmq-server服务
systemctl start rabbitmq-server 或者 docker start myrabbit
4、定义生产者
package com.xuexiangban.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("47.104.141.27"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare("queue1", false, false, false, null); // 6: 准备发送消息的内容 String message = "你好,学相伴!!!"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
5、web端观察运行结果
先有了链接、然后有了通道、然后慢慢就有了。
(配合java打断点一步一步 进行查看)
在我们设置代码的时候,设置的是非持久化,所以随着最后一个消息完毕后,是否把消息队列删除。
6、消费者代码
package com.xuexiangban.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author: 学相伴-飞哥 * @description: Producer 简单队列生产者 * @Date : 2021/3/2 */ public class Producer { public static void main(String[] args) { // 1: 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2: 设置连接属性 connectionFactory.setHost("47.104.141.27"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3: 从连接工厂中获取连接 connection = connectionFactory.newConnection("生产者"); // 4: 从连接中获取通道channel channel = connection.createChannel(); // 5: 申明队列queue存储消息 /* * 如果队列不存在,则会创建 * Rabbitmq不允许创建两个相同的队列名称,否则会报错。 * * @params1: queue 队列的名称 * @params2: durable 队列是否持久化 * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭 * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。 * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。 * */ channel.queueDeclare("queue1", false, false, false, null); // 6: 准备发送消息的内容 String message = "你好,学相伴!!!"; // 7: 发送消息给中间件rabbitmq-server // @params1: 交换机exchange // @params2: 队列名称/routing // @params3: 属性配置 // @params4: 发送消息的内容 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息发送成功!"); } catch (Exception ex) { ex.printStackTrace(); System.out.println("发送消息出现异常..."); } finally { // 7: 释放连接关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception ex) { ex.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (Exception ex) { ex.printStackTrace(); } } } } }
持久化和非持久化就是:服务器重启队列是否还在,持久化肯定在,同时需要注意持久化肯定会存盘。但是非持久化会 存盘,但是随着服务器的重启会丢失的。