一个基于RabbitMQ的可复用的事务消息方案

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。

微信截图_20220512201532.png


前提



分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQMySQLJDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。环境依赖如下:


  • JDK1.8+
  • spring-boot-start-web:2.x.xspring-boot-start-jdbc:2.x.xspring-boot-start-amqp:2.x.x
  • HikariCP:3.x.xspring-boot-start-jdbc自带)、mysql-connector-java:5.1.48
  • redisson:3.12.1


方案设计思路



事务消息原则上只适合弱一致性(或者说最终一致性)的场景,常见的弱一致性场景如:


  • 用户服务完成了注册动作,向短信服务推送一条营销相关的消息。
  • 信贷体系中,订单服务保存订单完毕,向审批服务推送一条待审批的订单记录信息。
  • ......

强一致性的场景一般不应该选用事务消息


微信截图_20220512201542.png


一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:消息内容正确的前提下,消费方出现异常需要自理


简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游义务已经结束。


为了降低代码的入侵性,事务消息需要借助Spring编程式事务或者声明式事务。编程式事务一般依赖于TransactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional


接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是本地消息表),用于保存每一条需要发送的消息记录。事务消息功能模块的主要功能是:


  • 保存消息记录。
  • 推送消息到RabbitMQ服务端。
  • 消息记录的查询、补偿推送等等。


事务执行的逻辑单元


在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务


微信截图_20220512201553.png


发送消息到RabbitMQ服务端这一步需要延后到事务提交之后,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。为了把保存待发送的事务消息发送消息到RabbitMQ两个动作从使用者感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置,主要参考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:


微信截图_20220512201603.png


上图仅仅演示了事务正确提交的场景(不包含异常的场景)。这里可以明确知道,事务同步器TransactionSynchronizationafterCommit()afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit()之后回调,因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的伪代码如下:


@Transactional
public Dto businessMethod(){
    business transaction code block ...
    // 保存事务消息
    [saveTransactionMessageRecord()]
    // 注册事务同步器 - 在afterCommit()方法中推送消息到RabbitMQ
    [register TransactionSynchronization,send message in method afterCommit()]
    business transaction code block ...
}
复制代码


上面伪代码中,保存事务消息注册事务同步器两个步骤可以安插在事务方法中的任意位置,也就是说与执行顺序无关。


事务消息的补偿


虽然之前提到笔者建议下游服务自理自身服务消费异常的场景,但是有些时候迫于无奈还是需要上游把对应的消息重新推送,这个算是特殊的场景。另外还有一个场景需要考虑:事务提交之后触发事务同步器TransactionSynchronizationafterCommit()方法失败。这是一个低概率的场景,但是在生产中一定会出现,一个比较典型的原因就是:事务提交完成后尚未来得及触发TransactionSynchronization#afterCommit()方法进行推送服务实例就被重启。如下图所示:


微信截图_20220512201614.png


为了统一处理补偿推送的问题,使用了有限状态判断消息是否已经推送成功:


  • 在事务方法内,保存事务消息的时候,标记消息记录推送状态为处理中
  • 事务同步器接口TransactionSynchronizationafterCommit()方法的实现中,推送对应的消息到RabbitMQ,然后更变事务消息记录的状态为推送成功


还有一种极为特殊的情况是RabbitMQ服务端本身出现故障导致消息推送异常,这种情况下需要进行重试(补偿推送),经验证明短时间内的反复重试是没有意义的,故障的服务一般不会瞬时恢复,所以可以考虑使用指数退避算法进行重试,同时需要限制最大重试次数。


微信截图_20220512201623.png


指数值、间隔值和最大重试次数上限需要根据实际情况设定,否则容易出现消息延时过大或者重试过于频繁等问题。


方案实施



引入核心依赖:


<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>
复制代码


spring-boot-starter-jdbcmysql-connector-javaspring-boot-starter-aopMySQL事务相关,而spring-boot-starter-amqpRabbitMQ客户端的封装,redisson主要使用其分布式锁,用于补偿定时任务的加锁执行(以防止服务多个节点并发执行补偿推送)。


表设计


事务消息模块主要涉及两张表,以MySQL为例,建表DDL如下:


CREATE TABLE `t_transactional_message`
(
    id                  BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time         DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time           DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator             VARCHAR(20)     NOT NULL DEFAULT 'admin',
    editor              VARCHAR(20)     NOT NULL DEFAULT 'admin',
    deleted             TINYINT         NOT NULL DEFAULT 0,
    current_retry_times TINYINT         NOT NULL DEFAULT 0 COMMENT '当前重试次数',
    max_retry_times     TINYINT         NOT NULL DEFAULT 5 COMMENT '最大重试次数',
    queue_name          VARCHAR(255)    NOT NULL COMMENT '队列名',
    exchange_name       VARCHAR(255)    NOT NULL COMMENT '交换器名',
    exchange_type       VARCHAR(8)      NOT NULL COMMENT '交换类型',
    routing_key         VARCHAR(255) COMMENT '路由键',
    business_module     VARCHAR(32)     NOT NULL COMMENT '业务模块',
    business_key        VARCHAR(255)    NOT NULL COMMENT '业务键',
    next_schedule_time  DATETIME        NOT NULL COMMENT '下一次调度时间',
    message_status      TINYINT         NOT NULL DEFAULT 0 COMMENT '消息状态',
    init_backoff        BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',
    backoff_factor      TINYINT         NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT '事务消息表';
CREATE TABLE `t_transactional_message_content`
(
    id         BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',
    content    TEXT COMMENT '消息内容'
) COMMENT '事务消息内容表';
复制代码


因为此模块有可能扩展出一个后台管理模块,所以要把消息的管理和状态相关字段和大体积的消息内容分别存放在两个表,从而避免大批量查询消息记录的时候MySQL服务IO使用率过高的问题(这是和上一个公司的DBA团队商讨后得到的一个比较合理的方案)。预留了两个业务字段business_modulebusiness_key用于标识业务模块和业务键(一般是唯一识别号,例如订单号)。


微信截图_20220512201633.png


一般情况下,如果服务通过配置自行提前声明队列和交换器的绑定关系,那么发送RabbitMQ消息的时候其实只依赖于exchangeNameroutingKey两个字段(header类型的交换器是特殊的,也比较少用,这里暂时不用考虑),考虑到服务可能会遗漏声明操作,发送消息的时候会基于队列进行首次绑定声明并且缓存相关的信息(RabbitMQ中的队列-交换器绑定声明只要每次声明绑定关系的参数一致,则不会抛出异常)。


方案代码设计


下面的方案设计描述中,暂时忽略了消息事务管理后台的API设计,这些可以在后期补充。


定义贫血模型实体类TransactionalMessageTransactionalMessageContent


@Data
public class TransactionalMessage {
    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}
@Data
public class TransactionalMessageContent {
    private Long id;
    private Long messageId;
    private String content;
}
复制代码


然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):


public interface TransactionalMessageDao {
    void insertSelective(TransactionalMessage record);
    void updateStatusSelective(TransactionalMessage record);
    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
                                                               LocalDateTime maxScheduleTime,
                                                               int limit);
}
public interface TransactionalMessageContentDao {
    void insert(TransactionalMessageContent record);
    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}
复制代码


接着定义事务消息服务接口TransactionalMessageService


// 对外提供的服务类接口
public interface TransactionalMessageService {
    void sendTransactionalMessage(Destination destination, TxMessage message);
}
@Getter
@RequiredArgsConstructor
public enum ExchangeType {
    FANOUT("fanout"),
    DIRECT("direct"),
    TOPIC("topic"),
    DEFAULT(""),
    ;
    private final String type;
}
// 发送消息的目的地
public interface Destination {
    ExchangeType exchangeType();
    String queueName();
    String exchangeName();
    String routingKey();
}
@Builder
public class DefaultDestination implements Destination {
    private ExchangeType exchangeType;
    private String queueName;
    private String exchangeName;
    private String routingKey;
    @Override
    public ExchangeType exchangeType() {
        return exchangeType;
    }
    @Override
    public String queueName() {
        return queueName;
    }
    @Override
    public String exchangeName() {
        return exchangeName;
    }
    @Override
    public String routingKey() {
        return routingKey;
    }
}
// 事务消息
public interface TxMessage {
    String businessModule();
    String businessKey();
    String content();
}
@Builder
public class DefaultTxMessage implements TxMessage {
    private String businessModule;
    private String businessKey;
    private String content;
    @Override
    public String businessModule() {
        return businessModule;
    }
    @Override
    public String businessKey() {
        return businessKey;
    }
    @Override
    public String content() {
        return content;
    }
}
// 消息状态
@RequiredArgsConstructor
public enum TxMessageStatus {
    /**
     * 成功
     */
    SUCCESS(1),
    /**
     * 待处理
     */
    PENDING(0),
    /**
     * 处理失败
     */
    FAIL(-1),
    ;
    private final Integer status;
}
复制代码


TransactionalMessageService的实现类是事务消息的核心功能实现,代码如下:


@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {
    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;
    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();
    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        // 原子性的预声明
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        // 保存事务消息记录
        managementService.saveTransactionalMessageRecord(record, content);
        // 注册事务同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}
复制代码


消息记录状态和内容持久化的管理统一放在TransactionalMessageManagementService中:


@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {
    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;
    private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;
    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,
                DEFAULT_BACKOFF_FACTOR, 0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }
    public void sendMessageSync(TransactionalMessage record, String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
            if (log.isDebugEnabled()) {
                log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content);
            }
            // 标记成功
            markSuccess(record);
        } catch (Exception e) {
            // 标记失败
            markFail(record, e);
        }
    }
    private void markSuccess(TransactionalMessage record) {
        // 标记下一次执行时间为最大值
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }
    private void markFail(TransactionalMessage record, Exception e) {
        log.error("发送消息失败,目标队列:{}", record.getQueueName(), e);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        // 计算下一次的执行时间
        LocalDateTime nextScheduleTime = calculateNextScheduleTime(
                record.getNextScheduleTime(),
                record.getInitBackoff(),
                record.getBackoffFactor(),
                record.getCurrentRetryTimes()
        );
        record.setNextScheduleTime(nextScheduleTime);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }
    /**
     * 计算下一次执行时间
     *
     * @param base          基础时间
     * @param initBackoff   退避基准值
     * @param backoffFactor 退避指数
     * @param round         轮数
     * @return LocalDateTime
     */
    private LocalDateTime calculateNextScheduleTime(LocalDateTime base,
                                                    long initBackoff,
                                                    long backoffFactor,
                                                    long round) {
        double delta = initBackoff * Math.pow(backoffFactor, round);
        return base.plusSeconds((long) delta);
    }
    /**
     * 推送补偿 - 里面的参数应该根据实际场景定制
     */
    public void processPendingCompensationRecords() {
        // 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了
        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        // 时间的左值为右值减去1小时
        LocalDateTime min = max.plusHours(-1);
        Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
        if (!collect.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",", "(", ")");
            collect.keySet().forEach(x -> joiner.add(x.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage message = collect.get(item.getMessageId());
                        sendMessageSync(message, item.getContent());
                    });
        }
    }
}
复制代码


这里有一点尚待优化:更新事务消息记录状态的方法可以优化为批量更新,在limit比较大的时候,批量更新的效率会更高。


最后是定时任务的配置类:


@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {
    private final TransactionalMessageManagementService managementService;
    /**
     * 这里用的是本地的Redis,实际上要做成配置
     */
    private final RedissonClient redisson = Redisson.create();
    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        // 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制
        boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                // 以防锁过早释放
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start);
            } finally {
                lock.unlock();
            }
        }
    }
}
复制代码


基本代码编写完,整个项目的结构如下:


微信截图_20220512201653.png


最后添加两个测试类:


@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {
    private final MockBusinessService mockBusinessService;
    @Override
    public void run(String... args) throws Exception {
        mockBusinessService.saveOrder();
    }
}
@Slf4j
@RequiredArgsConstructor
@Service
public class MockBusinessService {
    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;
    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {
            p.setString(1, orderId);
            p.setBigDecimal(2, amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),
                DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("保存订单:{}成功...", orderId);
    }
}
复制代码


某次测试结果如下:


2020-02-05 21:10:13.287  INFO 49556 --- [           main] club.throwable.cm.MockBusinessService    : 保存订单:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...
复制代码


微信截图_20220512201704.png


模拟订单数据成功保存,而且RabbitMQ消息在事务成功提交后正常发送到RabbitMQ服务端中,如RabbitMQ控制台数据所示。


小结



事务消息模块的设计仅仅是使异步消息推送这个功能实现趋向于完备,其实一个合理的异步消息交互系统,一定会提供同步查询接口,这一点是基于异步消息没有回调或者没有响应的特性导致的。一般而言,一个系统的吞吐量和系统的异步化处理占比成正相关(这一点可以参考Amdahl's Law),所以在系统架构设计实际中应该尽可能使用异步交互,提高系统吞吐量同时减少同步阻塞带来的无谓等待。事务消息模块可以扩展出一个后台管理,甚至可以配合MicrometerPrometheusGrafana体系做实时数据监控。


本文demo项目仓库:rabbit-transactional-message

demo必须本地安装MySQLRedisRabbitMQ才能正常启动,本地必须新建一个数据库命名local

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
13 2
|
2天前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
8 0
|
2月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版方案评测
本文评估了阿里云《高弹性,低成本,云消息队列 RabbitMQ 实践》方案,从实践原理理解、部署体验、方案优势展现及业务场景匹配四个方面进行了深入分析。文中指出,该方案在解决消息积压、提高系统稳定性、支持弹性伸缩等方面表现优异,但也提出了在组件功能解释、实战案例提供等方面的改进建议,以期帮助用户更好地理解和应用该技术解决方案。
146 0
|
4月前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
6月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
133 1
|
7月前
|
消息中间件 网络协议 Ubuntu
实现高效消息传递:使用RabbitMQ构建可复用的企业级消息系统
RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
|
消息中间件 Linux
3、RabbitMQ教程-在Linux上安装RabbitMQ报错解决方案
3、RabbitMQ教程-在Linux上安装RabbitMQ报错解决方案
232 0
3、RabbitMQ教程-在Linux上安装RabbitMQ报错解决方案
|
存储 消息中间件 数据库
RocketMQ平台的消息灰度方案(2)
RocketMQ平台的消息灰度方案
588 0
RocketMQ平台的消息灰度方案(2)
|
消息中间件 NoSQL Redis
Redis进阶-Redis 4种MQ 方案对比
Redis进阶-Redis 4种MQ 方案对比
227 0
下一篇
DataWorks