在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2
处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是
RabbitMQ并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数channel.basicQos(1);
public class UnfairWork { public final static String QUEUE_NAME="hello3"; public static void main(String[] args) throws Exception { System.out.println("c1应答短...."); Channel channel = untils.getChannel(); /** * 不公平分发 * 设置参数 */ int prefetchCount=1; channel.basicQos(1); /** * 消费者信息 * 1.消费哪个队列 * 2.消费成功以后是否要自动应答,true自动应答,false手动挡 * 3.消费者未成功消费的回调内容1 * 4.消费者取消的回调 * */ //声明 接收消息 DeliverCallback deliverCallback=(consumerTag, delivery)->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //1.消息标记 // 2.false 代表只应答接收到的哪个传递的信息,true为应答所有的消息包括传递过来的消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); System.out.println("接收到的消息"+new String(delivery.getBody())); }; //取消 消息的回调 CancelCallback cancelCallback= consumerTag -> { System.out.println(consumerTag+"消息消费者中断"); }; channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
/** * 这个和轮循的代码是一样的 */ public class UnfairProduce { public static final String QUEUE_NAME="hello3"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = untils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制太中接受消息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message=scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));; System.out.println("发送消息完成"+message); } } }