【消息队列】消息中间件RocketMQ4.X急速入门1

简介: 【消息队列】消息中间件RocketMQ4.X急速入门

1.RocketMQ4.X简介

阿里开源消息队列RocketMQ介绍

  • Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。
  • 特点:
  • 支持Broker和Consumer端消息过滤,既可以在Broker端过滤消息,也可以在Consumer端过虑消息。
  • 支持发布订阅,点对点模型。
  • 支持消费端pull消息和Broker来push消息。
  • 单一队列百万消息、亿级消息堆积。
  • 支持单master节点,多master节点,多master多slave节点
  • 任意一点都是高可用,水平扩展,Producer、Consumer、队列都可以分布式
  • 消息失败重试机制、支持特定level的定时消息
  • 新版本底层采用Netty
  • 4.3x支持分布式事务
  • 适合金融类业务,高可用性跟踪和审计功能
  • 概念:
  • Producer:消息生产者
  • Producer Group:消息生产者组,发送同类消息的一个消息生产者组
  • Consumer:消费者
  • Consumer Group:消费同类消息的多个实例
  • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一主题下的不同业务的消息。
  • Topic:主题,如订单类消息,queue式消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue。默认自动创建4个,手动创建是8个。
  • Message:消息,每个message必须指定一个topic。
  • Broker:MQ程序,接收生产的消息,提供给消费者消费的程序,一个Broker就是一个MQ节点。
  • Name Server:给生产者和消费者提供路由信息,提供轻量级的服务发现、路哟与、元数据信息,可以多个部署,相互独立(比Zookeeper更轻量)。
  • Offset:偏移量,每个消费者都会维护一个当前最大的消费偏移量。
  • commit log:消费存储会写在Commit log文件里面,消息持久化存储的地方。

2.Springboot整合RocketMQ4.X

(1)创建SpringBoot项目,加入相关依赖

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>

(2)Message对象

  • topic:主题名称
  • tag:标签,用于过滤
  • key:消息唯一标识,可以是业务字段组合
  • body:消息体,字节数组
  • 注意:发送消息到Broker,需要判断是否有此topic,启动Broker的时候,本地建议开启自动创建topic,生产环境加以关闭自动化创建topic。建议手工创建topic,如果靠程序自动创建,然后在投递消息,会出现延迟的情况。
  • 概念模型:一个topic下面对应多个queue,可以创建Topic时指定,如订单类topic

(3)生产者发送消息编码实战

@Component
public class PayProducer{
    private String producerGroup = "pay_producer_group";
    public DefaultMQProducer producer;
    public PayProducer(){
        //创建Producer对象,设置生产者组
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址逗号隔开
        producer.setNamesrvAdder(JmsConfig.NAME_SERVER);
        //开启生产者
        start();
    }
    //得到Producer对象
    public DefaultMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try{
             this.producer.start(); 
        }catch(MQClientException e){
             e.printStackTrace();    
        }
    }
     /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}
public class JmsConfig {
    public static final String NAME_SERVER = "8.140.116.67:9876";
    public static final String TOPIC  = "pay_topic";
}
@RestController
@RequestMapping("api/v1/pay")
public class PayController {
    @Autowired
    private PayProducer payProducer;
    @RequestMapping("/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    Message message = new Message(JmsConfig.TOPIC,"taga",text.getBytes());
        SendResult sendResult = payProducer.getProducer().send(message);
        System,out,println(sendResult);
        return sendResult;
    }
}

(4)常见错误一

MQClientException: No route info of this topic, TopicTest1
原因:Broker禁止自动创建Topic,且用户没有通过手工取创建此Topic,或者broker和nameServer网络不通
解决:sh bin/mqbroker -m 查看配置信息
autoCreateTopicEnable=true 则自动创建topic
centos7关闭防火墙 systemctl stop fitewalld
手动创建topic

(5)常见错误二

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多个网卡,rocketmq都会根据当前网卡选择一个ip使用,当你的机器有多个网卡时,很有可能会有问题,阿里云机器有两个网卡,因此需要broker.conf来配置当前公网ip,然后重启broker
broker.conf新增:brokerIP1=公网IP
在以配置文件的方式重启broker

(6)常见错误三

控制台查询不了数据,连接提示10909错误
原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组加上10909端口

(7)其他问题

https://blog.csdn.net/qq_14853889/article/details/81053145
https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8
%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156
https://blog.csdn.net/wangmx1993328/article/details/81588217

(8)消费者消费消息编码实战

@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "pay_consumer_group";
    public PayConsumer(){
        //new一个Consumer对象,这是ConsumerGroup
        consumer = new DefaultMQPushConsumer(consumerGroup);
        //设置NameServer地址
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //设置消费的策略,从最后一个开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            //订阅主题
            consumer.subscribe(JmsConfig.TOPIC,"*");
            //注册消息监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //业务逻辑处理
                    MessageExt message = msgs.get(0);
                    String body = new String(msgs.get(0).getBody());
                    String tags = msgs.get(0).getTags();
                    String keys = msgs.get(0).getKeys();
                    System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys);
                    //消费完成返回给broker消费的状态,成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        try {
            //开启消费者
            consumer.start();
            System.out.println("consumer.start()...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

(9)常见问题

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 
2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]
3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local,   MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
  1、设置producer:  producer.setVipChannelEnabled(false);
  2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
    namesrvAddr = 192.168.0.101:9876
    brokerIP1 = 192.168.0.101
4、DESC: service not available now, maybe disk full, CL:
  解决:修改启动脚本runbroker.sh,在里面增加一句话即可:   
  JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
  (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)
常见问题处理
  https://blog.csdn.net/sqzhao/article/details/54834761
  https://blog.csdn.net/mayifan0/article/details/67633729
  https://blog.csdn.net/a906423355/article/details/78192828

3.RocketMQ4.X集群架构

3.1.RocketMQ4.X集群模式架构

(1)单机模式


ff117dd9e9ca4f3b90573d0ce2b4249a.jpg

优点:本地开发测试用,配置简单,同步刷盘消息不会丢失

缺点:不可靠,服务宕机,导致服务不可用,数据丢失

(2)主从模式(异步复制、同步双写)



79022f2f745b40aeb74e0b8ce0c580ae.jpg

优点:同步双写不丢失数据,异步复制存在少量数据丢失,主节点宕机,从节点可以对外提供消费服务,但是不提供写服务。

缺点:主备有短暂的消息延迟,毫秒级,主节点宕机后,目前不支持自动切换,需要手动设置从节点成为主节点。

(3)双主、多主模式


eaf620f280ab42feb6500052b17c64a4.jpg

优点:配置简单,可以根据配置RAID10磁盘阵列保证消息的可靠性,异步刷盘丢失少量消息

缺点:master宕机期间,未被消费的消息不能被消费,只有当节点恢复才会恢复消费

(4)双主双从,多主多从模式(异步复制)


2df608a16e4f48d38b832b4c4ac0d640.jpg

优点:磁盘损坏,消息丢失的量少,消息消费的实时性不受影响,Master宕机后,会从Slave消费消息

缺点:主备消息同步由延迟,Master宕机会存在少量信息丢失

(5)双主双从,多主多从模式(同步双写)

优点:同步双写方式,主备都写成功,才返回成功,服务可用性与数据的可用性都非常高

缺点:新跟那个相比异步复制要低,主节点宕机后不支持自动切换主机

(6)消息可靠性之同步、异步刷盘

异步刷盘:每个节点,不论是从节点还是主节点都会有一个服务内存数据和磁盘同步的过程,异步刷盘就是当数据到达节点内存后,就返回成功,采用异步复制的方式给磁盘同步数据。

同步刷盘:同步刷盘就是当节点中的数据全部同步到磁盘中时,才会返回确认成功。

(7)消息可靠性之同步、异步复制

同步复制:数据安全性搞,性能相对差一点

异步复制:数据安全性低,性能会高一点

最终建议:同步双写,异步刷盘


fd1f41644d1a46b6b828c70425377f90.jpg

3.2.RocketMQ主从模式搭建

机器列表

192.168.10.200
192.168.10.201

(1)修改RocketMQ(runserver.sh,runbroker.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(2)启动两个节点的nameServer

nohup sh runserver.sh

(3)编辑broker配置文件并启动两个broker


bdaf9d39f29f4d59aff0ac7e45d2e556.jpg


d2542aacb5fc4ed39571280003d9c6f3.jpg


830d6e7f45de43e8a2fc93fb01a8b9ad.jpg

主节点:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
配置:
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
从节点:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
配置:
namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

26f49703c02c40d293ceb410b9c33bec.jpg

(4)安装控制台

修改事项
pom.xml  里面的rocketmq版本号
application.properties里面的nameserver
增加 rocketmq.config.namesrvAddr=192.168.159.129:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

3.3.RocketMQ4.X主从同步总结

  • Broker分为master和slave,一个master可以对应多个slave,一个slave只能对应一个master,master与slave通过相同的Broker Name来匹配,不同的broker id来定义时master还是slave。
  • Broker向所有的NameServer节点建立长连接,定时注册Topic和发送元数据信息。
  • NameServer定时扫描(默认2分钟)所有存货broker的连接,如果超时未响应则断开连接(心跳检测),但是consumer客户端感知不到,consumer定时30s从NameServer获取topic最新信息,所以broker不可用时,consumer最多也就30s就能发现broker宕机。

producer和consumer一样,在未发现broker宕机前发送的消息会失败


只有master才会有写消息的操作,slave只能提供消费,同步的策略取决于master的配置


客户端消费可以从master和slave消费,默认消费者都从master消费,如果master宕机后,客户端从NameServer中感知到Broker宕机,就会从slave消费,感知非实时,存在一定的之后性,slave不能保证master的消息100%同步过来,会有少量的消息丢失,但是一旦master恢复,未同步过去的消息会最终被消费掉。


如果consumer实例的数量比message queue的总数量还要多,多出来的consumer将无法分到queue,也就无法消费消息,无法起到负载的作用,所以需要控制让queue的总数量大于等于consumer的数量。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
1月前
|
消息中间件 存储 Apache
探索 RocketMQ:企业级消息中间件的选择与应用
RocketMQ 是一个高性能、高可靠、可扩展的分布式消息中间件,它是由阿里巴巴开发并贡献给 Apache 软件基金会的一个开源项目。RocketMQ 主要用于处理大规模、高吞吐量、低延迟的消息传递,它是一个轻量级的、功能强大的消息队列系统,广泛应用于金融、电商、日志系统、数据分析等领域。
74 0
探索 RocketMQ:企业级消息中间件的选择与应用
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
106 6
|
2月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
126 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
97 8
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
75 4