引言
代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo
前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。
主要分为:
- 点对点队列模式(简单)
- 工作队列模式(公平性)
- 发布订阅模式
- 路由模式Routing
- 通配符模式Topics
本文主要讲解工作队列模式。
工作队列模式
1.新建Maven项目RabbitMQ-Demo
2.添加Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
3.连接工具类
package com.ylw.rabbitmq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQConnecUtils { public static Connection newConnection() throws IOException, TimeoutException { // 1.定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置服务器地址 factory.setHost("127.0.0.1"); // 3.设置协议端口号 factory.setPort(5672); // 4.设置vhost factory.setVirtualHost("OrderHost"); // 5.设置用户名称 factory.setUsername("OrderAdmin"); // 6.设置用户密码 factory.setPassword("123456"); // 7.创建新的连接 Connection newConnection = factory.newConnection(); return newConnection; } }
1. 生产者
public class Producer { private static final String QUEUE_NAME = "add_order_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);// 保证一次只分发一次,限制发送给同一个消费者,不得超过一条消息 for (int i = 1; i <= 10; i++) { String msg = "index=" + i; System.out.println("生产者发送消息 -> " + msg); // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } channel.close(); newConnection.close(); } }
2. 消费者
消费者1:
public class Consumer1 { private static final String QUEUE_NAME = "add_order_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.获取通道 final Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者1获取消息:" + msgString); // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
消费者2:
public class Consumer2 { private static final String QUEUE_NAME = "add_order_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1.获取连接 Connection newConnection = RabbitMQConnecUtils.newConnection(); // 2.获取通道 final Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
3. 测试
启动消费者1和消费者2,在RabbitMQ控制台可以看到,消息已经注册到了队列了:
然后开始生产,启动生产者:
可以看到Consumer1比Comsumer2 消费多9个,因为Consumer延时了2:
这就是典型的工作队列模式,多劳多得。