【消息队列】消息中间件RocketMQ4.X急速入门3

简介: 【消息队列】消息中间件RocketMQ4.X急速入门

6.消息队列Offset和CommitLog

6.1.RocketMQ消息偏移量Offset

(1)什么是offset

  • message queue是无限长的数组,一条消息进来下标就会增长1,小标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位这条消息,或者指示Consumer从这条消息开始消费。
  • message queue中maxOffset表示消息的最大offset,maxOffset并不是最新的那条Offset而是新的消息的offset+1,minOffset则是现存的最小的offset
  • fileReserveTime=48默认消息存储48消息后,消息就会被从磁盘中删除,message queue的min offset也就对应的增长,锁以比minOffset还要小的那些消息已经不在broker上了,就无法消费

(2)类型(父类是OffsetStore)

本地文件类型

DefaultMQPushConsumer的BROADCASTING模式,各个Conusmer没有相互干扰,使用LocalFileOffsetStore,把Offset存储在本地

Broker代存储类型

DefalutMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore

主要是记录消息消费的偏移量,由多个消费者进行消费

集群模式下采用RemoteBrokerOffsetStore,Broker控制offset的值

广播模式下采用LocalFileOffsetStore,消费端存储,消费者控制

建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果是PullConsumer则需要自己维护OffsetStore

6.2.RocketMQ消息存储CommitLog

(1)消息存储是由ConsumeQueue和CommitLog

ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,存储的是指向物理存储地址

Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持续化到磁盘

默认地址:store/consumequeue/{topicname}/{queueid}/fileName

CommitLog是消息文件的存储地址。CommitLog的生成规则是每个文件默认达到1G开始切割,CommitLog文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte,当着文件满了,第二个文件名字为00000000001073741824,起始便宜量为1073741825=上一个最大的偏移量+1,消息存储的时候会顺序写入,当文件满了则写入下一个文件。

Broker里面多个Topic,一个Topic中有多个MessageQueue,每个MessageQueue对应一个ConusmeQueue,ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址

6.3.ZeroCopy零拷贝技术

(1)高效原因

  • CommitLog顺序写,存储了MessageBody、MessageKey、Tag等消息
  • ConsumeQueue随机读+操作系统PageCache+零拷贝技术ZeroCopy

(2)Linux将一个File文件发送出去(Linux有两个上下文,内核态,用户态)

File经历了4次copy

调用read,将文件从磁盘拷贝到了kernel内核态

CPU控制kernel态的数据copy到用户态

调用write时,user态下的内容会copy到内核态的socket的buffer中

最后将内核态socket buffer的数据copy到网卡设备中传送

(3)ZeroCopy

请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区跟用户缓冲区的拷贝,从而减少CPU的开销和小勺kernel和user模式的上下文切换

(4)对应的零拷贝技术有mmap和sendfile

mmap:小文件传输快

sendfile:大文件传输快

7.RocketMQ实现分布式事务

7.1.分布式事务介绍

(1)什么是分布式事务

  • 来源:单体应用->拆分成分布式应用
  • 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保证
  • (2)常见的解决方案
  • 2PC:两阶段提交,基于XA协议
  • TCC:Try(尝试)、Confirm(证实)、Cancel(取消)

(3)框架

(4)RocketMQ分布式事务消息

  • RocketMQ提供分布式事务功能,通过RocketMQ事务消息道道分布式事务的一致性。

(5)半消息Half Message

  • 暂时不能消费的消息,是Producer投递到Broker的消息,但是服务端还未收到生产者的二次确认,此时该消息被标记为“暂不能投递状态”,也就是不能投递给消费者,处于该状态的消息为半消息。

(6)整体交互流程


1b670ff57fb04af98e9918f67cf15ac1.jpg

Producer向Broker发送消息

服务端将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为半消息

发送方执行本地事务逻辑

发送方(SpringBoot应用)根据本地事务的结果向服务器发送二次确认(Commit或者Rollback),服务端收到Commit状态则将版消息标记为可投递,订阅放最终消费该消息,如果收到Rollback状态,则订阅方不会消费该消息,三天后Broker进行删除该消息

在断网或者应用重启的情况下,也就是发送方没有给服务器提交二次确认的结果,这回服务端会经过固定时间对发送方的这条消息进行回查

发送方收到消息回查后,需要检查对应用消息的本地事务执行的最终结果

发送方检查得到本地事务的最终状态在进行二次确认,服务端仍按照Commit或者rollback结果进行相应的处理

(7)RocketMQ事务消息的状态

COMMIT_MESSAGE:提交事务的消息,消费者可以消费此消息

ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能在消费

UNKNOW:broker需要回查确认消息的状态

(8)关于事务消息的消费

事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息被consumer收到

7.2.RocketMQ分布式事务消息实战

(1)编写TransactionListenerImpl类实现TransactionListener接口

public class TransactionListenerImpl implements TransactionListener {
    /**
     * 本地事务方法
     * @param msg 消息体
     * @param arg 附加参数
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("executeLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody().toString());
        if("1".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if("2".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else if("3".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }
    /**
     * 回查方法
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("checkLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(2)编写TransactionProdcuer类

@Component
public class TransactionProducer {
    private String producerGroup = "transaction_producer_group";
    private TransactionListener transactionListener = new TransactionListenerImpl();
    private TransactionMQProducer producer;
    /**
     * 设置自定义线程池
     * corePoolSize:池中锁保存的核心线程数
     * maximumPoolSize:池中允许的最大线程数
     * keepActiveTime:非核心线程空闲等待新任务的最长时间
     * timeUnit:keepActiveTime参数的时间单位
     * blockingQueue:任务队列
     */
    ExecutorService executorService = new ThreadPoolExecutor(2, 5,
            100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    public TransactionProducer(){
        producer = new TransactionMQProducer(producerGroup);
        //设置生产者发送broker失败重复发送的次数
        producer.setRetryTimesWhenSendFailed(5);
        //指定NameServer地址,多个地址;隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //设置事务监听器
        producer.setTransactionListener(transactionListener);
        //设置线程池
        producer.setExecutorService(executorService);
        start();
    }
    public TransactionMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

(3)TransactionController编写

@RestController
@RequestMapping("api/v1/tran")
public class TransactionController {
    @Autowired
    private TransactionProducer transactionProducer;
    @RequestMapping("/pay_cb1")
    public Object callback1(String tag,String otherArgs) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JmsConfig.TOPIC,tag,"",tag.getBytes());
        SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(message,otherArgs);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

8.RocketMQ搭建双主双从架构

  • 4台机器,2态部署NameServer,4台部署Broker,双主双从同步双鞋,异步刷盘
  • 环境准备:jdk、maven、rocketmq

(1)机器列表

server1 ssh root@192.168.10.200   部署nameServer  Broker-a
server2 ssh root@192.168.10.201   部署nameServer   Broker-a-s
server3 ssh root@192.168.10.202         Broker-b
server4 ssh root@192.168.10.203         Broker-b-s

(2)修改RocketMQ配置文件(启动内存配置,runbroker.sh和runserver.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(3)修改RocketMQ的配置文件并启动

broker-a主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-a从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo

    autoCreateTopicEnable=true
    #是否允许自动创建订阅组,建议线下开启,线上关闭
  • autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面

#storePathRootDir=

#storePathCommitLog

```bash
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
24天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
1月前
|
消息中间件 监控 中间件
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
87 5
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
83 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
23天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
63 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
18天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
29天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4