【消息队列】消息中间件RocketMQ4.X急速入门3

简介: 【消息队列】消息中间件RocketMQ4.X急速入门

6.消息队列Offset和CommitLog

6.1.RocketMQ消息偏移量Offset

(1)什么是offset

  • message queue是无限长的数组,一条消息进来下标就会增长1,小标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位这条消息,或者指示Consumer从这条消息开始消费。
  • message queue中maxOffset表示消息的最大offset,maxOffset并不是最新的那条Offset而是新的消息的offset+1,minOffset则是现存的最小的offset
  • fileReserveTime=48默认消息存储48消息后,消息就会被从磁盘中删除,message queue的min offset也就对应的增长,锁以比minOffset还要小的那些消息已经不在broker上了,就无法消费

(2)类型(父类是OffsetStore)

本地文件类型

DefaultMQPushConsumer的BROADCASTING模式,各个Conusmer没有相互干扰,使用LocalFileOffsetStore,把Offset存储在本地

Broker代存储类型

DefalutMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore

主要是记录消息消费的偏移量,由多个消费者进行消费

集群模式下采用RemoteBrokerOffsetStore,Broker控制offset的值

广播模式下采用LocalFileOffsetStore,消费端存储,消费者控制

建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果是PullConsumer则需要自己维护OffsetStore

6.2.RocketMQ消息存储CommitLog

(1)消息存储是由ConsumeQueue和CommitLog

ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,存储的是指向物理存储地址

Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持续化到磁盘

默认地址:store/consumequeue/{topicname}/{queueid}/fileName

CommitLog是消息文件的存储地址。CommitLog的生成规则是每个文件默认达到1G开始切割,CommitLog文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte,当着文件满了,第二个文件名字为00000000001073741824,起始便宜量为1073741825=上一个最大的偏移量+1,消息存储的时候会顺序写入,当文件满了则写入下一个文件。

Broker里面多个Topic,一个Topic中有多个MessageQueue,每个MessageQueue对应一个ConusmeQueue,ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址

6.3.ZeroCopy零拷贝技术

(1)高效原因

  • CommitLog顺序写,存储了MessageBody、MessageKey、Tag等消息
  • ConsumeQueue随机读+操作系统PageCache+零拷贝技术ZeroCopy

(2)Linux将一个File文件发送出去(Linux有两个上下文,内核态,用户态)

File经历了4次copy

调用read,将文件从磁盘拷贝到了kernel内核态

CPU控制kernel态的数据copy到用户态

调用write时,user态下的内容会copy到内核态的socket的buffer中

最后将内核态socket buffer的数据copy到网卡设备中传送

(3)ZeroCopy

请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区跟用户缓冲区的拷贝,从而减少CPU的开销和小勺kernel和user模式的上下文切换

(4)对应的零拷贝技术有mmap和sendfile

mmap:小文件传输快

sendfile:大文件传输快

7.RocketMQ实现分布式事务

7.1.分布式事务介绍

(1)什么是分布式事务

  • 来源:单体应用->拆分成分布式应用
  • 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保证
  • (2)常见的解决方案
  • 2PC:两阶段提交,基于XA协议
  • TCC:Try(尝试)、Confirm(证实)、Cancel(取消)

(3)框架

(4)RocketMQ分布式事务消息

  • RocketMQ提供分布式事务功能,通过RocketMQ事务消息道道分布式事务的一致性。

(5)半消息Half Message

  • 暂时不能消费的消息,是Producer投递到Broker的消息,但是服务端还未收到生产者的二次确认,此时该消息被标记为“暂不能投递状态”,也就是不能投递给消费者,处于该状态的消息为半消息。

(6)整体交互流程


1b670ff57fb04af98e9918f67cf15ac1.jpg

Producer向Broker发送消息

服务端将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为半消息

发送方执行本地事务逻辑

发送方(SpringBoot应用)根据本地事务的结果向服务器发送二次确认(Commit或者Rollback),服务端收到Commit状态则将版消息标记为可投递,订阅放最终消费该消息,如果收到Rollback状态,则订阅方不会消费该消息,三天后Broker进行删除该消息

在断网或者应用重启的情况下,也就是发送方没有给服务器提交二次确认的结果,这回服务端会经过固定时间对发送方的这条消息进行回查

发送方收到消息回查后,需要检查对应用消息的本地事务执行的最终结果

发送方检查得到本地事务的最终状态在进行二次确认,服务端仍按照Commit或者rollback结果进行相应的处理

(7)RocketMQ事务消息的状态

COMMIT_MESSAGE:提交事务的消息,消费者可以消费此消息

ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能在消费

UNKNOW:broker需要回查确认消息的状态

(8)关于事务消息的消费

事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息被consumer收到

7.2.RocketMQ分布式事务消息实战

(1)编写TransactionListenerImpl类实现TransactionListener接口

public class TransactionListenerImpl implements TransactionListener {
    /**
     * 本地事务方法
     * @param msg 消息体
     * @param arg 附加参数
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("executeLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody().toString());
        if("1".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if("2".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else if("3".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }
    /**
     * 回查方法
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("checkLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(2)编写TransactionProdcuer类

@Component
public class TransactionProducer {
    private String producerGroup = "transaction_producer_group";
    private TransactionListener transactionListener = new TransactionListenerImpl();
    private TransactionMQProducer producer;
    /**
     * 设置自定义线程池
     * corePoolSize:池中锁保存的核心线程数
     * maximumPoolSize:池中允许的最大线程数
     * keepActiveTime:非核心线程空闲等待新任务的最长时间
     * timeUnit:keepActiveTime参数的时间单位
     * blockingQueue:任务队列
     */
    ExecutorService executorService = new ThreadPoolExecutor(2, 5,
            100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    public TransactionProducer(){
        producer = new TransactionMQProducer(producerGroup);
        //设置生产者发送broker失败重复发送的次数
        producer.setRetryTimesWhenSendFailed(5);
        //指定NameServer地址,多个地址;隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //设置事务监听器
        producer.setTransactionListener(transactionListener);
        //设置线程池
        producer.setExecutorService(executorService);
        start();
    }
    public TransactionMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

(3)TransactionController编写

@RestController
@RequestMapping("api/v1/tran")
public class TransactionController {
    @Autowired
    private TransactionProducer transactionProducer;
    @RequestMapping("/pay_cb1")
    public Object callback1(String tag,String otherArgs) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JmsConfig.TOPIC,tag,"",tag.getBytes());
        SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(message,otherArgs);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

8.RocketMQ搭建双主双从架构

  • 4台机器,2态部署NameServer,4台部署Broker,双主双从同步双鞋,异步刷盘
  • 环境准备:jdk、maven、rocketmq

(1)机器列表

server1 ssh root@192.168.10.200   部署nameServer  Broker-a
server2 ssh root@192.168.10.201   部署nameServer   Broker-a-s
server3 ssh root@192.168.10.202         Broker-b
server4 ssh root@192.168.10.203         Broker-b-s

(2)修改RocketMQ配置文件(启动内存配置,runbroker.sh和runserver.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(3)修改RocketMQ的配置文件并启动

broker-a主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-a从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo

    autoCreateTopicEnable=true
    #是否允许自动创建订阅组,建议线下开启,线上关闭
  • autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面

#storePathRootDir=

#storePathCommitLog

```bash
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
15 0
手撸MQ消息队列——循环数组
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
1月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
1月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
53 5
|
1月前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
75 1
|
2月前
|
消息中间件
云消息队列RabbitMQ版入门训练营 打卡领好礼
云消息队列RabbitMQ版入门训练营 打卡领好礼
37 3
|
2月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决