消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka; rocketmq底层封装

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
性能测试 PTS,5000VUM额度
云原生网关 MSE Higress,422元/月
简介: 消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka共同点都是消息队列,有mq的特性队列(先进先出原则)

消息中间件mq的比较:含RocketMQ、RabbitMQ、Kafka

共同点都是消息队列,有mq的特性

队列(先进先出原则)

RocketMQ

吞吐量经过了双十一的检验,比RabbitMQ好。

阿里开发的,阿里系用的比较多些。

RabbitMQ

RabbitMQ采用Erlang语言开发,是实现高级消息队列协议的开源消息中间件。

它的官网有个兔子。Rabbit意味兔子。

特点

性能很好,延时低

吞吐量到万级,相对低,功能完备

有良好管理界面用来管理mq

社区相对比较活跃

稳定

Kafka

特点:

吞吐量十万级,比RabbitMQ更好,是除了RocketMQ之外的一个选择。有些公司也在用。

rocketmq底层封装


1、pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lego-common</artifactId>
        <groupId>com.lego</groupId>
        <version>1.0.7</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <version>1.1.2</version>
    <artifactId>lego-common-rocketmq</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.lego</groupId>
            <artifactId>lego-common-core</artifactId>
            <version>${lego-common-core.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
        </dependency>
    </dependencies>
</project>

2、ConsumerClient

@Configuration
@AllArgsConstructor
public class ConsumerClient {
    private final MqConfig mqConfig;
    @PostConstruct
    public void init() {
        List<MqConfig.ConsumerGroup> consumerGroups = mqConfig.getConsumerGroups();
        if (!CollectionUtils.isEmpty(consumerGroups)) {
            Properties properties = mqConfig.getMqProperties();
            for (MqConfig.ConsumerGroup consumerGroup : consumerGroups) {
                properties.setProperty(PropertyKeyConst.GROUP_ID, consumerGroup.getGroupId());
                Consumer consumerBean = ONSFactory.createConsumer(properties);
                //订阅关系
                List<MqConfig.Consumer> consumers = consumerGroup.getConsumers();
                for (MqConfig.Consumer consumer : consumers) {
                    consumerBean.subscribe(consumer.getTopic(), consumer.getTag(),
                            SpringUtil.getBean(consumer.getBeanName()));
                }
                consumerBean.start();
            }
        }
    }
}

3、MqConfig

/**
 * MQ配置加载
 *
 * @author jaffee
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {
    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private List<ConsumerGroup> consumerGroups;
    public Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        //设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "4000");
        return properties;
    }
    @Data
    public static class ConsumerGroup {
        private String groupId;
        private List<Consumer> consumers;
    }
    @Data
    public static class Consumer {
        private String topic;
        private String tag;
        private String beanName;
    }
}

4、ProducerClient

/**
 * MQ配置注入生成消息实例
 *
 * @author jaffee
 */
@Configuration
@AllArgsConstructor
public class ProducerClient {
    private final MqConfig mqConfig;
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        //ProducerBean用于将Producer集成至Spring Bean中
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqProperties());
        return producer;
    }
}

5、RocketMqExecutorConfig

/**
 * RocketMq 线程池配置
 * @author jaffee
 */
@Configuration
public class RocketMqExecutorConfig {
    @Bean("rocketMqExecutor")
    public ExecutorService getThreadPool() {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNamePrefix("rocket-pool-").build();
        return new ThreadPoolExecutor(
                corePoolSize, corePoolSize * 2, 10,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

6、ProducerUtil

/**
 * MQ发送消息
 *
 * @author jaffee
 */
@AllArgsConstructor
public class ProducerUtil {
    private static final LegoLogger LOGGER = LegoLogger.getLogger(ProducerUtil.class);
    private final ProducerBean producer;
    @Autowired
    @Qualifier("rocketMqExecutor")
    private final ExecutorService executorConfig;
    /**
     * 同步发送消息
     *
     * @param topic       消息topic
     * @param messageBody 消息body内容,生产者自定义内容
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String topic, String messageBody) {
        return this.sendMsg(topic, null, messageBody, null);
    }
    /**
     * 同步发送消息
     *
     * @param topic       消息topic
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes(StandardCharsets.UTF_8));
        return this.send(msg, Boolean.FALSE);
    }
    /**
     * 同步发送定时/延时消息
     *
     * @param topic       消息topic
     * @param messageBody 消息body内容,生产者自定义内容
     * @param delayTime   服务端发送消息时间,立即发送输入0或比更早的时间
     * @return success:SendResult or error:null
     */
    public SendResult sendTimeMsg(String topic, String messageBody, long delayTime) {
        return this.sendTimeMsg(topic, null, messageBody, null, delayTime);
    }
    /**
     * 同步发送定时/延时消息
     *
     * @param topic       消息topic
     * @param msgTag      标签,可用于消息小分类标注,对消息进行再归类
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一值,可不设置,不影响消息收发
     * @param delayTime   服务端发送消息时间,立即发送输入0或比更早的时间
     * @return success:SendResult or error:null
     */
    public SendResult sendTimeMsg(String topic, String msgTag, String messageBody, String msgKey, long delayTime) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes(StandardCharsets.UTF_8));
        msg.setStartDeliverTime(delayTime);
        return this.send(msg, Boolean.FALSE);
    }
    /**
     * 发送单向消息
     */
    public void sendOneWayMsg(String topic, String messageBody) {
        this.sendOneWayMsg(topic, null, messageBody, null);
    }
    /**
     * 发送单向消息
     */
    public void sendOneWayMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes(StandardCharsets.UTF_8));
        this.send(msg, Boolean.TRUE);
    }
    /**
     * 普通消息发送发放
     *
     * @param msg      消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg, Boolean isOneWay) {
        try {
            if (isOneWay) {
                 //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producer.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            } else {
                //可靠同步发送
                SendResult sendResult = producer.send(msg);
                //获取发送结果,不抛异常即发送成功
                if (sendResult != null) {
                    success(msg, sendResult.getMessageId());
                    return sendResult;
                } else {
                    error(msg, null);
                    return null;
                }
            }
        } catch (Exception e) {
            error(msg, e);
            return null;
        }
    }
    /**
     * 异步发送普通消息
     *
     * @param topic       消息topic
     * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
     */
    public void sendAsyncMsg(String topic, String messageBody) {
        this.sendAsyncMsg(topic, null, messageBody, null);
    }
    /**
     * 异步发送普通消息
     *
     * @param topic       消息topic
     * @param msgTag      标签,可用于消息小分类标注,对消息进行再归类
     * @param messageBody 消息body内容,生产者自定义内容,二进制形式的数据
     * @param msgKey      消息key值,建议设置全局唯一值,可不设置,不影响消息收发
     */
    public void sendAsyncMsg(String topic, String msgTag, String messageBody, String msgKey) {
        producer.setCallbackExecutor(executorConfig);
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes(StandardCharsets.UTF_8));
        try {
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    assert sendResult != null;
                    success(msg, sendResult.getMessageId());
                }
                @Override
                public void onException(final OnExceptionContext context) {
                    //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
                    error(msg, context.getException());
                }
            });
        } catch (ONSClientException e) {
            error(msg, e);
        }
    }
    private void error(Message msg, Exception e) {
        LOGGER.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}",
                msg.getTopic(), msg.getKey(), msg.getTag(),
                new String(msg.getBody(), StandardCharsets.UTF_8));
        LOGGER.error("errorMsg --- {}", e.getMessage());
    }
    private void success(Message msg, String messageId) {
        LOGGER.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}",
                msg.getTopic(), messageId, msg.getKey(), msg.getTag(),
                new String(msg.getBody(), StandardCharsets.UTF_8));
    }
}```
相关文章
|
17天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
3月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
65 3
|
27天前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
69 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
10天前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
55 1
|
10天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
27天前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
63 1
|
15天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
17 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1276 0

相关产品

  • 云消息队列 Kafka 版
  • 云消息队列 MQ