RocketMQ-事务消息(分布式事务)

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
函数计算FC,每月15万CU 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文将阐述分布式事务的实现方式。

1、业务场景


在电商场景里面,成功付款后,会发放优惠券。


1.png


上面的场景:在电商系统中,会出现,付款成功后,准备发优惠券的时候,服务器宕机了。这个时候会造成用户成功付款,却没收到优惠券的情况。这种情况下,我们很容易想到用事务来保证付款和发优惠券的原子性即可:要么付款和发优惠券同时成功,要么同时失败,是不允许其他一个成功,另一个失败的。


但上面,存在一种情况:付款和发优惠券高度耦合,这样子容易出现:发优惠券一直失败,会导致付款一直失败的场景。


对于这种场景的解决方案:引入消息中间件MQ来解耦。image.gif


2.png


但是上述这种情景,存在MQ不可用,宕机的情况。会产生付款成功,发优惠券失败的情况。


针对这种情况,需要引入分布式事务。


2、事务消息


分布式事务是一种抽象的概念。


那具体的实现呢?


是有很多种实现的。


在这里,主要介绍:RocketMQ的事务消息。


事务消息的流程图

image.gif

3.png


流程步骤:


  • 1、生产者发送half消息
  • 2、MQ回复ACK确认消息
  • 3、执行本地事务:订单付款。如果订单付款成功,那么就给MQ发送commit消息。如果订单付款失败,就发送rollback消息
  • 4、如果步骤3发送消息失败,这个时候MQ的定时器会检查half消息。MQ回调方法,去检查本地事务的执行情况。如果执行成功,就返回commit消息。如果执行失败,就返回rollback消息。
  • 5、如果MQ收到的是commit消息,此时会把half消息复制到真正的topic中
  • 6、消费者对消息进行消费,下发优惠券


3、如何使用


上面,大概知道了事务消息的流程。


接下来,要知道如何使用。


还是以付款下发优惠券为例。


3.1 发送half消息-MQ回复ACK确认消息


 @Override
    public void finishedOrder(String orderNo, String phoneNumber) {
        try {
          // 退房事务消息,topic:完成订单
        Message msg = new Message(orderFinishedTopic, JSON.toJSONString(orderInfo).getBytes(StandardCharsets.UTF_8));
            // 发送half消息
            TransactionSendResult transactionSendResult = orderFinishedTransactionMqProducer.sendMessageInTransaction(msg, null);
        } catch (MQClientException e) {
        }
    }


3.2 执行本地事务:付款


@Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 修改订单的状态
            orderService.payOrder();
            // 成功 提交prepare消息
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 执行本地事务失败 回滚prepare消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }


3.3 MQ定时器回调查询half消息状态


@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        try {
            //查询订单状态
            Integer orderStatus = orderService.getOrderStatus();
            if (Objects.equals(orderStatus, OrderStatusEnum.FINISHED.getStatus())) {          //返回commit消息
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                //返回rollback消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            // 查询订单状态失败
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }


3.4 消费者进行消费,下发优惠券


@Bean(value = "orderFinishedConsumer")
    public DefaultMQPushConsumer finishedConsumer(@Qualifier(value = "orderFinishedMessageListener") OrderFinishedMessageListener orderFinishedMessageListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(orderFinishedConsumerGroup);
        consumer.setNamesrvAddr(namesrvAddress);
        //topic:完成订单
        consumer.subscribe(orderFinishedTopic, "*");
        consumer.setMessageListener(orderFinishedMessageListener);
        consumer.start();
        return consumer;
    }


监听器:OrderFinishedMessageListener


@Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {            
                //下发优惠券
                couponService.distributeCoupon();
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


4、知其然知其所以然


你看完上面,已经知道如何使用事务消息。


接下来,你需要了解其底层原理:看看源码(面试常问)


step1:首先看发送half消息的代码:

image.gif

4.png


step2:进入代码里面:

image.gif

5.png


step3:其实就是默认调用了

DefaultMQProducer#sendMessageInTransaction。


public TransactionSendResult sendMessageInTransaction(final Message msg,
        ...省略一堆代码
        SendResult sendResult = null;
        // 给待发送消息添加属性,表名是一个事务消息,即半消息,这里设置为true。(这个属性后面会用到)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            //发送消息--重点0
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            //消息发送成功
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //执行本地事务,executeLocalTransaction需要子类去具体实现
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
        try {
            // 最后,给broker发送提交或者回滚事务的RPC请求
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        // 组装结果返回
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }


上面的DefaultMQProducerImpl#sendMessageInTransaction方法主要流程:


  • 简单的数据校验
  • 给消息添加属性,表明这个事务消息
  • 发送消息,且返回消息的结果--重点0
  • 根据消息不同结果,进行不同的处理
  • 如果消息发送成功,那么就执行本地事务(付款),返回本地事务的结果--重点1
  • 最后,根据本地事务的结果,给broker发送Commit或rollback的消息--重点2


上面我们简述了一个大概的流程。未涉及到太多细节,是对一个整体流程的了解。

接下来,我们深入了解一些细节:


我们先研究一下重点0:sendResult = this.send(msg); 我们点进去会发现,send的底层其实就是调用了DefaultMQProducerImpl#sendKernelImpl方法。


step4:接着到SendMessageProcessor#sendMessage

image.gif

6.png


step5:事务消息,继续进入TransactionalMessageServiceImpl#prepareMessage-->TransactionalMessageBridge#putHalfMessage-->TransactionalMessageBridge#parseHalfMessageInner

image.gif

6.png


step6:接着,我们坐着研究一下重点1,即transactionListener.executeLocalTransaction(msg, arg);


public interface TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}


你会发现,这是一个接口,有2个方法,一个是执行本地事务executeLocalTransaction。另一个是检查本地事务checkLocalTransaction。这两个方法需要实现类去实现。


比如:执行本地事务:付款


step7:接着我们来看重点2:this.endTransaction(sendResult, localTransactionState, localException);


public void endTransaction(
        // 省略一堆代码
        //事务id
        String transactionId = sendResult.getTransactionId();
        // broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        // 根据事务消息和本地事务的执行结果,发送不同的结果给broker
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        //发送给broker
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }


到这个时候,我们已经把消息从生产者发送到了broker里面。


那接下来,我们就需要了解broker是如何处理事务消息的。


step8: 事务消息如何回查


直接看代码注解即可

TransactionalMessageCheckService#onWaitEnd


@Override
    protected void onWaitEnd() {
        //timeout是从broker配置文件中获取transactionTimeOut值,代表事务的过期时间,(一个消息的存储时间 + timeout) > 系统当前时间,才会对该消息执行事务状态会查
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        //checkMax是从broker配置文件中获取transactionCheckMax值,代表事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即rollback消息
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        //回查:核心点org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.check
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }


step9:进入check方法:TransactionalMessageServiceImpl#check。


直接看注解即可


@Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            //RMQ_SYS_TRANS_HALF_TOPIC主题
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            //获取RMQ_SYS_TRANS_HALF_TOPIC主题下的所有队列
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            //数据校验
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            //遍历队列
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                //根据队列获取对应topic:RMQ_SYS_TRANS_OP_HALF_TOPIC下的opQueue
                //RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
                //RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下
                MessageQueue opQueue = getOpQueue(messageQueue);
                //messageQueue队列的偏移量
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                //opQueue队列的偏移量
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                //如果其中一个队列的偏移量小于0,就跳过
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                //doneOpOffset和removeMap主要的目的是避免重复调用事务回查接口
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                //空消息的次数
                int getMessageNullCount = 1;
                //RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新偏移量
                long newOffset = halfOffset;
                //RMQ_SYS_TRANS_HALF_TOPIC的偏移量
                long i = halfOffset;
                while (true) {
                    //限制每次最多处理的时间是60s
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    //removeMap包含当前信息,则跳过,处理下一条信息
                    //removeMap的信息填充是在上面的fillOpRemoveMap
                    //fillOpRemoveMap具体逻辑是:具体实现逻辑是从RMQ_SYS_TRANS_OP_HALF_TOPIC主题中拉取32条,
                    //如果拉取的消息队列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时
                    //会添加到removeMap中,表示已处理过
                    if (removeMap.containsKey(i)) {
                        log.info("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        //根据消息队列偏移量i从RMQ_SYS_TRANS_HALF_TOPIC队列中获取消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        //如果消息为空
                        if (msgExt == null) {
                            //则根据允许重复次数进行操作,默认重试一次  MAX_RETRY_COUNT_WHEN_HALF_NULL=1
                            //如果超过重试次数,直接跳出while循环,结束该消息队列的事务状态回查
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            //如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查。
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                //其他原因,则将偏移量i设置为:getResult.getPullResult().getNextBeginOffset(),重新拉取
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        //判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
                        //needDiscard 依据:如果该消息回查的次数超过允许的最大回查次数,
                        // 则该消息将被丢弃,即事务消息提交失败,不能被消费者消费,其做法,
                        // 主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次。
                        //needSkip依据:如果事务消息超过文件的过期时间,
                        // 默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息。
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        //消息的存储时间大于开始时间,中断while循环
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        //该消息已存储的时间=系统当前时间-消息存储的时间戳
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        //checkImmunityTime:检测事务的时间
                        //transactionTimeout:事务消息的超时时间
                        long checkImmunityTime = transactionTimeout;
                        //用户设定的checkImmunityTimeStr
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            //checkImmunityTime=Long.valueOf(checkImmunityTimeStr)
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    //最近进度=当前消息进度+1
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {//如果当前时间小于事务超时时间,则结束while循环
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        //是否需要回查,判断依据如下:
                        //消息已存储的时间大于事务超时时间
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);
                        if (isNeedCheck) {
                            if (!putBackHalfMsgQueue(msgExt, i)) {//11
                                continue;
                            }
                            //重点:进行事务回查(异步)
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            //加载已处理的消息进行筛选
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                //保存half消息队列的回查进度
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                //保存处理队列opQueue的处理今夕
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }


step10:继续深入研究一下:resolveHalfMsg


public void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //针对每个待反查的half消息,进行回查本地事务结果
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    }


step11:继续追进sendCheckMessage(msgExt)方法


/**
     * 发送回查消息
     * @param msgExt
     * @throws Exception
     */
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        //原主题
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        //原队列id
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            //回调查询本地事务状态
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }


到这里,基本上把事务消息的流程和实现细节走了一遍。


还有什么问题的话,在留言区或者私信大头菜


5、问题:分布式事务还有其他实现


上面的事务消息是分布式事务的一种实现。


事务消息被称为二段提交。


问题:分布式事务,还有哪些具体的实现方式?


欢迎留言


6、后续文章

...


欢迎各位入(guan)股(zhu),后续文章干货多多。


—本文转载自「大头菜技术」公众号

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
3月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
29天前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
25天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
177 7
|
3月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
3月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
监控
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
|
3月前
|
消息中间件 监控 供应链
深度剖析 RocketMQ 事务消息!
本文深入探讨了 RocketMQ 的事务消息原理及其应用场景。通过详细的源码分析,阐述了事务消息的基本流程,包括准备阶段、提交阶段及补偿机制。文章还提供了示例代码,帮助读者更好地理解整个过程。此外,还讨论了事务消息的优缺点、适用场景及注意事项,如确保本地事务的幂等性、合理设置超时时间等。尽管事务消息增加了系统复杂性,但在需要保证消息一致性的场景中,它仍是一种高效的解决方案。
179 2
|
4月前
Saga模式在分布式系统中如何保证事务的隔离性
Saga模式在分布式系统中如何保证事务的隔离性
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
151 2

相关产品

  • 云消息队列 MQ