4. RabbitMQ入门案例 -Work模式
- 消息产生者将消息放入队列消费者可以有多个,消费者 1, 消费者 2。同时监听同一个队列,消息被消费?C1 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息 (隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关 (syncronize, 与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
- 应用场景:红包;大项目中的资源调度 (任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)
一个消息只能被一个消费者获取
工作队列模式的特点有三:
- 一个生产者,一个队列,多个消费者同时竞争消息
- 任务量过高时可以提高工作效率
- 消费者获得的消息是无序的
(1).轮询分发模式 (Polling)
一个消息提供者发送十条消息
package com.jsxs.rabbitmq.work.fair; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 生产者 */ public class Producer { public static void main(String[] args) { // 1.消息头 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setHost("8.130.48.9"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); connectionFactory.setVirtualHost("/"); // 虚拟机 Connection connection =null; Channel channel=null; try { connection= connectionFactory.newConnection(); // 连接 channel=connection.createChannel(); //管道 String queName="queue1"; channel.queueDeclare(queName,false,false,false,null); //声明 String message="工作模式"; for (int i = 0; i < 10; i++) { String a=i+""; channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8)); } System.out.println("信息发送通过"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel!=null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection!=null&&connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
创建第一个消费者 性能比较差->(Thread.Sleep(2000))
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
package com.jsxs.rabbitmq.work.fair; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("8.130.48.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); // 连接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume("queue1", false, defaultConsumer); } }
创建第二个消费者-》(性能比较好 Thread.Sleep(1000))
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("8.130.48.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); connectionFactory.setVirtualHost("/"); // 连接和管道的创建 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume("queue1", false, defaultConsumer); } }
首先我们先启动生产者生产消息,通过交互机向消息队列中存储十条消息,然后我们分别开启两个消费者,进行对消息的消费。
生产者->消息
两个消费者->消费
上面的代码实现就是轮询分发的方式。现象:消费者1 处理完消息之后,消费者2 才能处理,它两这样轮着来处理消息,直到消息处理完成,这种方式叫轮询分发(round-robin),结果就是不管两个消费者谁忙,「数据总是你一个我一个」,不管消费者处理数据的性能。
假如说我们生产者在设置队列的时候进行配置的是持久化,那么我们消费者就应该在接受的时候进行设置删除消息的配置
(也就是布尔值相同)
注意:autoAck属性设置为true,表示消息自动确认。消费者在消费时消息的确认模式可以分为『自动确认和手动确认』。
// basicConsume(队列名称, 是否自动确认autoAck, 回调对象) channel.basicConsume("queue1", false, defaultConsumer);
自动确认:在队列中的消息被消费者读取之后会自动从队列中删除。不管消息是否被消费者消费成功,消息都会删除。
手动确认:当消费者读取消息后,消费端需要手动发送ACK用于确认消息已经消费成功了(也就是需要自己编写代码发送ACK确认),如果设为手动确认而没有发送ACK确认,那么消息就会一直存在队列中(前提是进行了持久化操作),后续就可能会造成消息重复消费,如果过多的消息堆积在队列中,还可能造成内存溢出,『手动确认消费者在处理完消息之后要及时发送ACK确认给队列』。
// 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false);
使用轮询分发的方式会有一个明显的缺点,例如消费者1 处理数据的效率很慢,消费者2 处理数据的效率很高,正常情况下消费者2处理的数据应该多一点才对,而轮询分发则不管你的性能如何,反正就是每次处理一个消息,对于这种情况可以使用公平分发的方式来解决。
(2).公平分发模式 (Fair)
前提:
- 生产者设置一次只分发一个消息
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
- 如果生产者设置持久化,我们要设置自动提交或者手动提交
创建消费者
// 第二个参数是否持久化,加入持久化我们要进行提交 channel.queueDeclare(queName,false,false,false,null); //声明
生产者
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 生产者 */ public class Producer { public static void main(String[] args) { // 1.消息头 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setHost("8.130.48.9"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); connectionFactory.setVirtualHost("/"); // 虚拟机 Connection connection =null; Channel channel=null; try { connection= connectionFactory.newConnection(); // 连接 channel=connection.createChannel(); //管道 String queName="queue1"; channel.queueDeclare(queName,false,false,false,null); //声明 String message="工作模式"; for (int i = 0; i < 10; i++) { String a=i+""; channel.basicPublish("",queName,null, a.getBytes(StandardCharsets.UTF_8)); } System.out.println("信息发送通过"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel!=null && channel.isOpen()){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection!=null&&connection.isOpen()){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
消费者->性能不好的Thrad.Sleep(2000)
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("8.130.48.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); connectionFactory.setVirtualHost("/"); // 连接和管道的创建 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume("queue1", false, defaultConsumer); } }
性能好的->Threadd.Sleep(1000)
// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1);
package com.jsxs.rabbitmq.work.polling; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("8.130.48.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("xxx"); connectionFactory.setVirtualHost("/"); // 连接和管道的创建 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume("queue1", false, defaultConsumer); } }
运行结果: 我们发现会受到消费者的性能的影响