前言
rabbitmq作为现在流行的消息队列,它拥有流量削峰、应用解耦、异步处理等优点,使用数量也是较多的。其中重要的特性也就是手动应答避免消息丢失的特点更是使其更上一层楼。消息队列基础的处理流程是:**生产者---》队列---》消费者。
应答机制
rabbitmq的自动应答会导致在消息的分发途中,如果一台消费者角色的服务器宕机了,其处理的消息在自动应答模式下就会丢失,而手动应答则不会,而且退回队列分发给下一个消费者进行处理,有效的规避了因服务器宕机而造成的消息丢失。接下来进行代码演示
生产者
首先我们进行创建一个生产者进行消息的输入:
publicclassTask02 { privatefinalstaticStringTASK_QUEUE_NAME="queue01"; publicstaticvoidmain(String[] args) throwsIOException, TimeoutException { Channelchannel=RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); Scannerscanner=newScanner(System.in); while (scanner.hasNext()){ Stringmessage=scanner.next(); channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("生产者发出的消息是:"+message); } } }
消费者
紧接着编写消费者的代码:
消费者1号
publicclassWork01 { privatefinalstaticStringTASK_QUEUE_NAME="queue01"; publicstaticvoidmain(String[] args) throwsIOException, TimeoutException { Channelchannel=RabbitMQUtils.getChannel(); System.out.println("消费者1号等待接收消息处理时间较短!"); DeliverCallbackdeliverCallback=(StringconsumerTag, Deliverymessage)->{ //挂起一秒try { Thread.sleep(1000); } catch (InterruptedExceptione) { e.printStackTrace(); } System.out.println("接收到的消息是:"+newString(message.getBody(),"UTF-8")); //手动应答操作/*** 第一个参数:tag消息的标记* 第二个参数:是否批量处理,手动应答就为false*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallbackcancelCallback=(StringconsumerTag)->{ System.out.println(consumerTag+"消费者消费信息被终止!"); }; //采用手动应答booleanautoAck=false; channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
消费者2号:
publicclassWork02 { privatefinalstaticStringTASK_QUEUE_NAME="queue01"; publicstaticvoidmain(String[] args) throwsIOException, TimeoutException { Channelchannel=RabbitMQUtils.getChannel(); System.out.println("消费者2号等待接收消息处理时间较短!"); DeliverCallbackdeliverCallback=(StringconsumerTag, Deliverymessage)->{ //挂起一秒try { Thread.sleep(30000); } catch (InterruptedExceptione) { e.printStackTrace(); } System.out.println("接收到的消息是:"+newString(message.getBody(),"UTF-8")); //手动应答操作/*** 第一个参数:tag消息的标记* 第二个参数:是否批量处理,手动应答就为false*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallbackcancelCallback=(StringconsumerTag)->{ System.out.println(consumerTag+"消费者消费信息被终止!"); }; //采用手动应答booleanautoAck=false; channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
测试结果说明:
以上代码写完后我们依次点击运行,我们打开rabbitmq的web管理页面会发现刚刚创建得队列,这个队列就是一个消息队列,生产者所发送的消息首先会通过一个连接中的某个信道传输到队列中进行排列等待出队列,每个消息在出队列时又通过信道轮次传输到消费者进行消费。所以这个队列是很重要的,我们可以通过对其观察消息的输入输出的变化等相关信息
接着我们发两条信息进行验证两个消费者是否可以正常进行消费
然后检查消费者消费情况,观察是否正常消费:
1号消费者立即就收到了消息,2号因为设置了挂起30秒所以30后收到了消息,接着我们做一个测试,就是在2号消费者接收到消息处理时,关掉它造成服务器宕机,检测消息是否被1号消费者消费或是丢失。
最终结论:
经过测试在2号服务器宕机之后,消息并未丢失而且重新回到队列轮次被1号消费者消费,这样以来就不会存在消息丢失问题,所以消息队列中一般多推荐使用手动应答防止消息丢失从而造成损失。