公众号merlinsea
工作队列介绍
在消息生产能力大于消费能力的情况下,增加多几个消费节点可以平衡消息堆积问题,这就是工作队列的由来。
和简单队列类似,增加多个几个消费节点,让消费者消费消息处于竞争关系 。
工作队列默认采用轮l询的方式分配消息,即不管消费者消费能力如何,都是将消息均分给所有消费者。 这导致消费能力弱的消费者需要和消费能力强的消费者消费相等数量的消息。
也可以不采用轮询的方式,消费者添加如下代码即可!!
channel.basicQos(1);
工作队列模型:
代码实战
生产者端
public class Send { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { /** * 队列名称 * 持久化配置:mq重启后还在 * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占 * 自动删除: 当没有消费者的时候,自动删除掉,一般是false * 其他参数 * * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); for(int i=0; i<10 ; i++){ String message = "hello world!!! i="+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } }
消费者1【消费能力弱】
public class Recv1 { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //模拟消费者消费慢 try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者2【消费能力强】
public class Recv1 { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.107.221.166"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //模拟消费者消费慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者消费结果:
消费者1消费的时间要远大于消费者2完成的时间,由于默认采用轮询机制,导致消费能力强的消费者2每次消费完一个消息都必须等待消费能力弱的消费者1完成消费才能进行下一次消费。