【消息队列】消息中间件RocketMQ4.X急速入门3

简介: 【消息队列】消息中间件RocketMQ4.X急速入门

6.消息队列Offset和CommitLog

6.1.RocketMQ消息偏移量Offset

(1)什么是offset

  • message queue是无限长的数组,一条消息进来下标就会增长1,小标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位这条消息,或者指示Consumer从这条消息开始消费。
  • message queue中maxOffset表示消息的最大offset,maxOffset并不是最新的那条Offset而是新的消息的offset+1,minOffset则是现存的最小的offset
  • fileReserveTime=48默认消息存储48消息后,消息就会被从磁盘中删除,message queue的min offset也就对应的增长,锁以比minOffset还要小的那些消息已经不在broker上了,就无法消费

(2)类型(父类是OffsetStore)

本地文件类型

DefaultMQPushConsumer的BROADCASTING模式,各个Conusmer没有相互干扰,使用LocalFileOffsetStore,把Offset存储在本地

Broker代存储类型

DefalutMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore

主要是记录消息消费的偏移量,由多个消费者进行消费

集群模式下采用RemoteBrokerOffsetStore,Broker控制offset的值

广播模式下采用LocalFileOffsetStore,消费端存储,消费者控制

建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果是PullConsumer则需要自己维护OffsetStore

6.2.RocketMQ消息存储CommitLog

(1)消息存储是由ConsumeQueue和CommitLog

ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,存储的是指向物理存储地址

Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持续化到磁盘

默认地址:store/consumequeue/{topicname}/{queueid}/fileName

CommitLog是消息文件的存储地址。CommitLog的生成规则是每个文件默认达到1G开始切割,CommitLog文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte,当着文件满了,第二个文件名字为00000000001073741824,起始便宜量为1073741825=上一个最大的偏移量+1,消息存储的时候会顺序写入,当文件满了则写入下一个文件。

Broker里面多个Topic,一个Topic中有多个MessageQueue,每个MessageQueue对应一个ConusmeQueue,ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址

6.3.ZeroCopy零拷贝技术

(1)高效原因

  • CommitLog顺序写,存储了MessageBody、MessageKey、Tag等消息
  • ConsumeQueue随机读+操作系统PageCache+零拷贝技术ZeroCopy

(2)Linux将一个File文件发送出去(Linux有两个上下文,内核态,用户态)

File经历了4次copy

调用read,将文件从磁盘拷贝到了kernel内核态

CPU控制kernel态的数据copy到用户态

调用write时,user态下的内容会copy到内核态的socket的buffer中

最后将内核态socket buffer的数据copy到网卡设备中传送

(3)ZeroCopy

请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区跟用户缓冲区的拷贝,从而减少CPU的开销和小勺kernel和user模式的上下文切换

(4)对应的零拷贝技术有mmap和sendfile

mmap:小文件传输快

sendfile:大文件传输快

7.RocketMQ实现分布式事务

7.1.分布式事务介绍

(1)什么是分布式事务

  • 来源:单体应用->拆分成分布式应用
  • 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保证
  • (2)常见的解决方案
  • 2PC:两阶段提交,基于XA协议
  • TCC:Try(尝试)、Confirm(证实)、Cancel(取消)

(3)框架

(4)RocketMQ分布式事务消息

  • RocketMQ提供分布式事务功能,通过RocketMQ事务消息道道分布式事务的一致性。

(5)半消息Half Message

  • 暂时不能消费的消息,是Producer投递到Broker的消息,但是服务端还未收到生产者的二次确认,此时该消息被标记为“暂不能投递状态”,也就是不能投递给消费者,处于该状态的消息为半消息。

(6)整体交互流程


1b670ff57fb04af98e9918f67cf15ac1.jpg

Producer向Broker发送消息

服务端将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为半消息

发送方执行本地事务逻辑

发送方(SpringBoot应用)根据本地事务的结果向服务器发送二次确认(Commit或者Rollback),服务端收到Commit状态则将版消息标记为可投递,订阅放最终消费该消息,如果收到Rollback状态,则订阅方不会消费该消息,三天后Broker进行删除该消息

在断网或者应用重启的情况下,也就是发送方没有给服务器提交二次确认的结果,这回服务端会经过固定时间对发送方的这条消息进行回查

发送方收到消息回查后,需要检查对应用消息的本地事务执行的最终结果

发送方检查得到本地事务的最终状态在进行二次确认,服务端仍按照Commit或者rollback结果进行相应的处理

(7)RocketMQ事务消息的状态

COMMIT_MESSAGE:提交事务的消息,消费者可以消费此消息

ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能在消费

UNKNOW:broker需要回查确认消息的状态

(8)关于事务消息的消费

事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息被consumer收到

7.2.RocketMQ分布式事务消息实战

(1)编写TransactionListenerImpl类实现TransactionListener接口

public class TransactionListenerImpl implements TransactionListener {
    /**
     * 本地事务方法
     * @param msg 消息体
     * @param arg 附加参数
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("executeLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody().toString());
        if("1".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if("2".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else if("3".equalsIgnoreCase(arg.toString())){
            return LocalTransactionState.UNKNOW;
        }
        return null;
    }
    /**
     * 回查方法
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("checkLocalTransaction.....");
        System.out.println("tag:"+msg.getTags());
        System.out.println("transactionId:"+msg.getTransactionId());
        System.out.println("body:"+msg.getBody());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(2)编写TransactionProdcuer类

@Component
public class TransactionProducer {
    private String producerGroup = "transaction_producer_group";
    private TransactionListener transactionListener = new TransactionListenerImpl();
    private TransactionMQProducer producer;
    /**
     * 设置自定义线程池
     * corePoolSize:池中锁保存的核心线程数
     * maximumPoolSize:池中允许的最大线程数
     * keepActiveTime:非核心线程空闲等待新任务的最长时间
     * timeUnit:keepActiveTime参数的时间单位
     * blockingQueue:任务队列
     */
    ExecutorService executorService = new ThreadPoolExecutor(2, 5,
            100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    public TransactionProducer(){
        producer = new TransactionMQProducer(producerGroup);
        //设置生产者发送broker失败重复发送的次数
        producer.setRetryTimesWhenSendFailed(5);
        //指定NameServer地址,多个地址;隔开
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //设置事务监听器
        producer.setTransactionListener(transactionListener);
        //设置线程池
        producer.setExecutorService(executorService);
        start();
    }
    public TransactionMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 对象在使用之前必须调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}

(3)TransactionController编写

@RestController
@RequestMapping("api/v1/tran")
public class TransactionController {
    @Autowired
    private TransactionProducer transactionProducer;
    @RequestMapping("/pay_cb1")
    public Object callback1(String tag,String otherArgs) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(JmsConfig.TOPIC,tag,"",tag.getBytes());
        SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(message,otherArgs);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

8.RocketMQ搭建双主双从架构

  • 4台机器,2态部署NameServer,4台部署Broker,双主双从同步双鞋,异步刷盘
  • 环境准备:jdk、maven、rocketmq

(1)机器列表

server1 ssh root@192.168.10.200   部署nameServer  Broker-a
server2 ssh root@192.168.10.201   部署nameServer   Broker-a-s
server3 ssh root@192.168.10.202         Broker-b
server4 ssh root@192.168.10.203         Broker-b-s

(2)修改RocketMQ配置文件(启动内存配置,runbroker.sh和runserver.sh)

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"

(3)修改RocketMQ的配置文件并启动

broker-a主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-a从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b主节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo

    autoCreateTopicEnable=true
    #是否允许自动创建订阅组,建议线下开启,线上关闭
  • autoCreateSubscriptionGroup=false

#存储路径,根据需求进行配置绝对路径,默认是家目录下面

#storePathRootDir=

#storePathCommitLog

```bash
broker-b从节点
nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties &
namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
brokerClusterName=XdclassCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
defaultTopicQueueNums=4
#是否允许自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir= 
#storePathCommitLog

(4)双主双从控制台搭建

修改事项
pom.xml  里面的rocketmq版本号
路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources
application.properties  里面的nameServer
增加
rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876
mvn install -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

(5)RocketMQ生产环境配置

  • Topic创建上线禁止自动创建
  • 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
  • 生产环境推荐配置
  • NameServer配置多个不同机器多个节点
  • 多Master每个Master带有Slave
  • 主从设置SYNC_MASTER同步双写
  • Producer用同部方式投递Broker
  • 刷盘策略为ASYNC_FLUSH
  • 性能思路分析
  • CPU:top
  • 网卡:sar -n DEV 2 10 、netstat -t
  • 磁盘:iostat -xdm 1
  • JVM:jstack、MAT、jinfo


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2天前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
42 2
|
2天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
82 0
|
17小时前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
2天前
|
消息中间件 存储 运维
为什么选择云消息队列 RocketMQ 版
为什么选择云消息队列 RocketMQ 版
5 1
|
2天前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
26 3
|
2天前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
|
2天前
|
消息中间件 大数据 Java
消息队列 MQ
消息队列 MQ
28 3
|
2天前
|
消息中间件 数据安全/隐私保护
MQTT微消息队列服务器连接报错:Error: Connection refused: Not authorized
使用MQTTX工具进行测试时,通过AccessKey创建了Client ID的用户名和密码。配置了公网接入点及端口1883,但尝试连接时出现错误。已附上工具截图:![](https://ucc.alicdn.com/pic/developer-ecology/3byii5uar64gg_36327474e991439da422f38c450ef153.png)。确认过用户名、密码和Client ID无误,问题仍未解决,期待回复!