开发者社区> 问答> 正文

activemq高并发下点对点应答模式,消息不一致问题? 400 报错

activemq高并发下点对点应答模式,消息不一致问题? 400 报错

mq版本:5.9

参考相关博客以及官方文档,做了mq的P2P回复模式。思路如下

用户请求,放入队列A,然后等待回复消息(队列B中的内容),A处理完成后,发送回复消息到队列B。

根据官方文档也设置了

msg.setCorrelationId(correlationId);

msg.setJMSReplyTo(replyTo);

但是高并发情况下,用户X,用户Y同时请求,但是得到的回复消息就可能是用户X获得了用户Y的请求回复消息,用户Y获得了本应该是用户X的回复消息。部分代码如下:

用户请求入队了代码:

Connection connection=    jmsTemplate.getConnectionFactory().createConnection();
             Session session=connection.createSession(Boolean.TRUE,  
                            Session.AUTO_ACKNOWLEDGE);
             Destination destination=session.createQueue(queueName); // 创建消息队列  
             jmsTemplate.convertAndSend(destination, message);
             Destination recall_destination=session.createQueue(reciveQueueName); // 创建消息队列  
             TextMessage textMsg=(TextMessage) jmsTemplate.receive(recall_destination);
             replyMsg=messageText;

消费者代码:

@Override
    public void onMessage(Message message) {
        IPolycostDAO polycostDAO=(IPolycostDAO)             DqdpAppContext.getSpringContext().getBean("polycostDAO");
            TbJhsOrderPO jhsOrderPO=null;
        if(message instanceof ObjectMessage){
            final ObjectMessage objectMessage=(ObjectMessage) message;
            try {

                BaseProducer producer=new BaseProducer("jhs.orderReply.queue", jmsTemplate);
                jhsOrderPO=(TbJhsOrderPO) objectMessage.getObject();
                TbJhsGoodsHotstockPO             hotstockPO=polycostDAO.getStock(jhsOrderPO.getGoodsDetail(),jhsOrderPO.getGoodsColor());
                if(!AssertUtil.isEmpty(hotstockPO)&&Integer.parseInt(hotstockPO.getStock())>0){
                    polycostDAO.insert(jhsOrderPO);
                    polycostDAO.UpdateStock(jhsOrderPO.getGoodsDetail(),jhsOrderPO.getGoodsColor());
                    producer.sendRepMsg("ok", objectMessage);
                }else{
                    producer.sendRepMsg("fail", objectMessage);
                }
                
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (DataConfictException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }


由于使用了jmsTemplate.convertAndSend()方法,所以,使用了转换器,设置关联iD也在转换器设置,代码如下:
   @Override
    public Message toMessage(Object object, Session session)
            throws JMSException, MessageConversionException {  
        ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
        Destination replyTo=session.createQueue("jhs.orderReply.queue");
        String correlationId = RandomStringUtils.randomNumeric(5);
        msg.setCorrelationId(correlationId);
        msg.setJMSReplyTo(replyTo);
        msg.setObject((Serializable) object);
        return msg;
     }

发送回复消息的sendRepMsg()代码如下:

//发送应答消息
    public void sendRepMsg(final String repMsg, final ObjectMessage message){
        try {
             jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    // TODO Auto-generated method stub
                             TextMessage msg =session.createTextMessage(repMsg);
                             msg.setJMSReplyTo(message.getJMSReplyTo());
                             msg.setJMSCorrelationID(message.getJMSCorrelationID());  
                            return msg;
                    
                }
            });
        } catch (JmsException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       
    }

基于以上代码,X,Y用户同时请求,假设正常情况下,X获取的回复消息为“ok”,Y获取的回复消息为“fail”,但是如果再高并发的情况下,就有可能发送X获取到“fail”,Y获取到“ok”。

这个问题困扰几天了,还希望这方面的大神给予指点。由于代码写的很烂,希望大家能包含。谢谢了!

展开
收起
爱吃鱼的程序员 2020-06-04 15:18:59 590 0
1 条回答
写回答
取消 提交回答
  • https://developer.aliyun.com/profile/5yerqm5bn5yqg?spm=a2c6h.12873639.0.0.6eae304abcjaIB

    回复队列设置为exclusive,独享的。x只能读x的,y只能读y的。兄弟很面善,是不是姓吕 ######加上排他性设置后,同一时间只有一个消费者去回复队列消费,它会根据设置的id去匹配对应的回复消息吗?假设队列里面都有了X,Y的回复消息,它会去里面拿第一个还是会根据自己的id去匹配?######我不姓吕,姓罗。######

    这种破玩意

    两个凡是不用

    ###### 失败,肯定是有原因的,看别人代码找原因,否则,自己造轮子更可靠,如果觉得水平造不出来,那还是用别人轮子吧######

    引用来自“一只小桃子”的评论

    回复队列设置为exclusive,独享的。x只能读x的,y只能读y的。兄弟很面善,是不是姓吕

    虽然这个问题没能解决,就你的回答靠谱,最佳给你了

    2020-06-04 16:00:02
    赞同 展开评论 打赏
问答标签:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
徐雷-Java为王,互联网高并发架构设计与选型之路6.0 立即下载
Redis 的高并发实战:抢购系统 立即下载
MySQL高并发场景实战 立即下载