activemq高并发下点对点应答模式,消息不一致问题? 400 报错
mq版本:5.9
参考相关博客以及官方文档,做了mq的P2P回复模式。思路如下
用户请求,放入队列A,然后等待回复消息(队列B中的内容),A处理完成后,发送回复消息到队列B。
根据官方文档也设置了
msg.setCorrelationId(correlationId);
msg.setJMSReplyTo(replyTo);
但是高并发情况下,用户X,用户Y同时请求,但是得到的回复消息就可能是用户X获得了用户Y的请求回复消息,用户Y获得了本应该是用户X的回复消息。部分代码如下:
用户请求入队了代码:
Connection connection= jmsTemplate.getConnectionFactory().createConnection();
Session session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createQueue(queueName); // 创建消息队列
jmsTemplate.convertAndSend(destination, message);
Destination recall_destination=session.createQueue(reciveQueueName); // 创建消息队列
TextMessage textMsg=(TextMessage) jmsTemplate.receive(recall_destination);
replyMsg=messageText;
消费者代码:
@Override
public void onMessage(Message message) {
IPolycostDAO polycostDAO=(IPolycostDAO) DqdpAppContext.getSpringContext().getBean("polycostDAO");
TbJhsOrderPO jhsOrderPO=null;
if(message instanceof ObjectMessage){
final ObjectMessage objectMessage=(ObjectMessage) message;
try {
BaseProducer producer=new BaseProducer("jhs.orderReply.queue", jmsTemplate);
jhsOrderPO=(TbJhsOrderPO) objectMessage.getObject();
TbJhsGoodsHotstockPO hotstockPO=polycostDAO.getStock(jhsOrderPO.getGoodsDetail(),jhsOrderPO.getGoodsColor());
if(!AssertUtil.isEmpty(hotstockPO)&&Integer.parseInt(hotstockPO.getStock())>0){
polycostDAO.insert(jhsOrderPO);
polycostDAO.UpdateStock(jhsOrderPO.getGoodsDetail(),jhsOrderPO.getGoodsColor());
producer.sendRepMsg("ok", objectMessage);
}else{
producer.sendRepMsg("fail", objectMessage);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (DataConfictException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
由于使用了jmsTemplate.convertAndSend()方法,所以,使用了转换器,设置关联iD也在转换器设置,代码如下:
@Override
public Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
Destination replyTo=session.createQueue("jhs.orderReply.queue");
String correlationId = RandomStringUtils.randomNumeric(5);
msg.setCorrelationId(correlationId);
msg.setJMSReplyTo(replyTo);
msg.setObject((Serializable) object);
return msg;
}
发送回复消息的sendRepMsg()代码如下:
//发送应答消息
public void sendRepMsg(final String repMsg, final ObjectMessage message){
try {
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
// TODO Auto-generated method stub
TextMessage msg =session.createTextMessage(repMsg);
msg.setJMSReplyTo(message.getJMSReplyTo());
msg.setJMSCorrelationID(message.getJMSCorrelationID());
return msg;
}
});
} catch (JmsException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
基于以上代码,X,Y用户同时请求,假设正常情况下,X获取的回复消息为“ok”,Y获取的回复消息为“fail”,但是如果再高并发的情况下,就有可能发送X获取到“fail”,Y获取到“ok”。
这个问题困扰几天了,还希望这方面的大神给予指点。由于代码写的很烂,希望大家能包含。谢谢了!
回复队列设置为exclusive,独享的。x只能读x的,y只能读y的。兄弟很面善,是不是姓吕 ######加上排他性设置后,同一时间只有一个消费者去回复队列消费,它会根据设置的id去匹配对应的回复消息吗?假设队列里面都有了X,Y的回复消息,它会去里面拿第一个还是会根据自己的id去匹配?######我不姓吕,姓罗。######
这种破玩意
两个凡是不用
###### 失败,肯定是有原因的,看别人代码找原因,否则,自己造轮子更可靠,如果觉得水平造不出来,那还是用别人轮子吧######虽然这个问题没能解决,就你的回答靠谱,最佳给你了
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。