1.消息获取
/*
* 接收消息
* 发布订阅模式
* 1.一次性获取队列名称为fanout_queue_notify1 的所有准备好的消息, messageCount 为总条数
* @return
*/
@Override
public AjaxResult consumption() throws Exception {
JSONArray jsonArray = new JSONArray();
//获取队列中的消息个数
AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<AMQP.Queue.DeclareOk>() {
public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
return channel.queueDeclarePassive("fanout_queue_notify1");
}
});
int messageCount = declareOk.getMessageCount();
System.out.printf("MQ的queues中对应消息总条数为====》" + messageCount);
rabbitTemplate.execute(
channel -> {
long deliveryTag = 0;//待确认的消息总数
int num = 0;
//每次读取一条消息
while (num < messageCount) {
StringBuilder stringBuilder = new StringBuilder();
GetResponse response = channel.basicGet("fanout_queue_notify1", false);
byte[] bodyBytes = response.getBody();
String json = new String(bodyBytes);
log.info("mq接收到的消息为1=====》" + response);
deliveryTag = response.getEnvelope().getDeliveryTag();
stringBuilder.append(json);
num = num + 1;
JSONObject jsonObject = JSON.parseObject(stringBuilder.toString());
jsonArray.add(jsonObject);
}
return null;
});
if (jsonArray.size() < 1) {
throw new CustomException("消息队列里面没有 已经准备好的消息!");
}
System.out.printf("************************" + jsonArray);
return AjaxResult.success(jsonArray);
}
- 2.消息确认
/*- 消费者 确认消息
- @return
- @throws Exception
*/
@Override
public AjaxResult consumption1(String messageId) {
try {
}catch (Exception e){rabbitTemplate.execute( channel -> { channel.basicAck(1, true); return null; } );
}// channel.basicReject(deliveryTag, false); throw new CustomException("消息确认失败,请联系管理员!");
return AjaxResult.success();
}