6.消息队列Offset和CommitLog
6.1.RocketMQ消息偏移量Offset
(1)什么是offset
- message queue是无限长的数组,一条消息进来下标就会增长1,小标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位这条消息,或者指示Consumer从这条消息开始消费。
- message queue中maxOffset表示消息的最大offset,maxOffset并不是最新的那条Offset而是新的消息的offset+1,minOffset则是现存的最小的offset
- fileReserveTime=48默认消息存储48消息后,消息就会被从磁盘中删除,message queue的min offset也就对应的增长,锁以比minOffset还要小的那些消息已经不在broker上了,就无法消费
(2)类型(父类是OffsetStore)
本地文件类型
DefaultMQPushConsumer的BROADCASTING模式,各个Conusmer没有相互干扰,使用LocalFileOffsetStore,把Offset存储在本地
Broker代存储类型
DefalutMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
主要是记录消息消费的偏移量,由多个消费者进行消费
集群模式下采用RemoteBrokerOffsetStore,Broker控制offset的值
广播模式下采用LocalFileOffsetStore,消费端存储,消费者控制
建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果是PullConsumer则需要自己维护OffsetStore
6.2.RocketMQ消息存储CommitLog
(1)消息存储是由ConsumeQueue和CommitLog
ConsumeQueue是逻辑队列,CommitLog是真正存储消息文件的,存储的是指向物理存储地址
Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持续化到磁盘
默认地址:store/consumequeue/{topicname}/{queueid}/fileName
CommitLog是消息文件的存储地址。CommitLog的生成规则是每个文件默认达到1G开始切割,CommitLog文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte,当着文件满了,第二个文件名字为00000000001073741824,起始便宜量为1073741825=上一个最大的偏移量+1,消息存储的时候会顺序写入,当文件满了则写入下一个文件。
Broker里面多个Topic,一个Topic中有多个MessageQueue,每个MessageQueue对应一个ConusmeQueue,ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址
6.3.ZeroCopy零拷贝技术
(1)高效原因
- CommitLog顺序写,存储了MessageBody、MessageKey、Tag等消息
- ConsumeQueue随机读+操作系统PageCache+零拷贝技术ZeroCopy
(2)Linux将一个File文件发送出去(Linux有两个上下文,内核态,用户态)
File经历了4次copy
调用read,将文件从磁盘拷贝到了kernel内核态
CPU控制kernel态的数据copy到用户态
调用write时,user态下的内容会copy到内核态的socket的buffer中
最后将内核态socket buffer的数据copy到网卡设备中传送
(3)ZeroCopy
请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区跟用户缓冲区的拷贝,从而减少CPU的开销和小勺kernel和user模式的上下文切换
(4)对应的零拷贝技术有mmap和sendfile
mmap:小文件传输快
sendfile:大文件传输快
7.RocketMQ实现分布式事务
7.1.分布式事务介绍
(1)什么是分布式事务
- 来源:单体应用->拆分成分布式应用
- 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保证
- (2)常见的解决方案
- 2PC:两阶段提交,基于XA协议
- TCC:Try(尝试)、Confirm(证实)、Cancel(取消)
(3)框架
- GTS->开源Fescar:地址:https://github.com/alibaba/fescar
- LCN:地址:https://github.com/codingapi/tx-lcn
(4)RocketMQ分布式事务消息
- RocketMQ提供分布式事务功能,通过RocketMQ事务消息道道分布式事务的一致性。
(5)半消息Half Message
- 暂时不能消费的消息,是Producer投递到Broker的消息,但是服务端还未收到生产者的二次确认,此时该消息被标记为“暂不能投递状态”,也就是不能投递给消费者,处于该状态的消息为半消息。
(6)整体交互流程
Producer向Broker发送消息
服务端将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为半消息
发送方执行本地事务逻辑
发送方(SpringBoot应用)根据本地事务的结果向服务器发送二次确认(Commit或者Rollback),服务端收到Commit状态则将版消息标记为可投递,订阅放最终消费该消息,如果收到Rollback状态,则订阅方不会消费该消息,三天后Broker进行删除该消息
在断网或者应用重启的情况下,也就是发送方没有给服务器提交二次确认的结果,这回服务端会经过固定时间对发送方的这条消息进行回查
发送方收到消息回查后,需要检查对应用消息的本地事务执行的最终结果
发送方检查得到本地事务的最终状态在进行二次确认,服务端仍按照Commit或者rollback结果进行相应的处理
(7)RocketMQ事务消息的状态
COMMIT_MESSAGE:提交事务的消息,消费者可以消费此消息
ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能在消费
UNKNOW:broker需要回查确认消息的状态
(8)关于事务消息的消费
事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息被consumer收到
7.2.RocketMQ分布式事务消息实战
(1)编写TransactionListenerImpl类实现TransactionListener接口
public class TransactionListenerImpl implements TransactionListener { /** * 本地事务方法 * @param msg 消息体 * @param arg 附加参数 * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("executeLocalTransaction....."); System.out.println("tag:"+msg.getTags()); System.out.println("transactionId:"+msg.getTransactionId()); System.out.println("body:"+msg.getBody().toString()); if("1".equalsIgnoreCase(arg.toString())){ return LocalTransactionState.COMMIT_MESSAGE; }else if("2".equalsIgnoreCase(arg.toString())){ return LocalTransactionState.ROLLBACK_MESSAGE; }else if("3".equalsIgnoreCase(arg.toString())){ return LocalTransactionState.UNKNOW; } return null; } /** * 回查方法 * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("checkLocalTransaction....."); System.out.println("tag:"+msg.getTags()); System.out.println("transactionId:"+msg.getTransactionId()); System.out.println("body:"+msg.getBody()); return LocalTransactionState.COMMIT_MESSAGE; } }
(2)编写TransactionProdcuer类
@Component public class TransactionProducer { private String producerGroup = "transaction_producer_group"; private TransactionListener transactionListener = new TransactionListenerImpl(); private TransactionMQProducer producer; /** * 设置自定义线程池 * corePoolSize:池中锁保存的核心线程数 * maximumPoolSize:池中允许的最大线程数 * keepActiveTime:非核心线程空闲等待新任务的最长时间 * timeUnit:keepActiveTime参数的时间单位 * blockingQueue:任务队列 */ ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); public TransactionProducer(){ producer = new TransactionMQProducer(producerGroup); //设置生产者发送broker失败重复发送的次数 producer.setRetryTimesWhenSendFailed(5); //指定NameServer地址,多个地址;隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); //设置事务监听器 producer.setTransactionListener(transactionListener); //设置线程池 producer.setExecutorService(executorService); start(); } public TransactionMQProducer getProducer(){ return this.producer; } /** * 对象在使用之前必须调用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } }
(3)TransactionController编写
@RestController @RequestMapping("api/v1/tran") public class TransactionController { @Autowired private TransactionProducer transactionProducer; @RequestMapping("/pay_cb1") public Object callback1(String tag,String otherArgs) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,tag,"",tag.getBytes()); SendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(message,otherArgs); System.out.println(sendResult); return new HashMap<>(); } }
8.RocketMQ搭建双主双从架构
- 4台机器,2态部署NameServer,4台部署Broker,双主双从同步双鞋,异步刷盘
- 环境准备:jdk、maven、rocketmq
(1)机器列表
server1 ssh root@192.168.10.200 部署nameServer Broker-a server2 ssh root@192.168.10.201 部署nameServer Broker-a-s server3 ssh root@192.168.10.202 Broker-b server4 ssh root@192.168.10.203 Broker-b-s
(2)修改RocketMQ配置文件(启动内存配置,runbroker.sh和runserver.sh)
vim runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" vim runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"
(3)修改RocketMQ的配置文件并启动
broker-a主节点 nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties & namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH defaultTopicQueueNums=4 #是否允许自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false #存储路径,根据需求进行配置绝对路径,默认是家目录下面 #storePathRootDir= #storePathCommitLog
broker-a从节点 nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties & namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH defaultTopicQueueNums=4 #是否允许自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false #存储路径,根据需求进行配置绝对路径,默认是家目录下面 #storePathRootDir= #storePathCommitLog
broker-b主节点 nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b.properties & namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH defaultTopicQueueNums=4 #是否允许自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false #存储路径,根据需求进行配置绝对路径,默认是家目录下面 #storePathRootDir= #storePathCommitLog
broker-b从节点 nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties & namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH defaultTopicQueueNums=4 #是否允许自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false #存储路径,根据需求进行配置绝对路径,默认是家目录下面 #storePathRootDir= #storePathCommitLog
(4)双主双从控制台搭建
修改事项 pom.xml 里面的rocketmq版本号 路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources application.properties 里面的nameServer 增加 rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 mvn install -Dmaven.test.skip=true java -jar rocketmq-console-ng-1.0.0.jar
(5)RocketMQ生产环境配置
- Topic创建上线禁止自动创建
- 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
- 生产环境推荐配置
- NameServer配置多个不同机器多个节点
- 多Master每个Master带有Slave
- 主从设置SYNC_MASTER同步双写
- Producer用同部方式投递Broker
- 刷盘策略为ASYNC_FLUSH
- 性能思路分析
- CPU:top
- 网卡:sar -n DEV 2 10 、netstat -t
- 磁盘:iostat -xdm 1
- JVM:jstack、MAT、jinfo
闭
autoCreateTopicEnable=true
#是否允许自动创建订阅组,建议线下开启,线上关闭
- autoCreateSubscriptionGroup=false
#存储路径,根据需求进行配置绝对路径,默认是家目录下面
#storePathRootDir=
#storePathCommitLog
```bash broker-b从节点 nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-b-s.properties & namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH defaultTopicQueueNums=4 #是否允许自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false #存储路径,根据需求进行配置绝对路径,默认是家目录下面 #storePathRootDir= #storePathCommitLog
(4)双主双从控制台搭建
修改事项 pom.xml 里面的rocketmq版本号 路径:/usr/local/software/rocketmq-externals-master/rocketmq-console/src/main/resources application.properties 里面的nameServer 增加 rocket.config.namesrvAddr=192.168.159.133:9876;192.168.159.130:9876 mvn install -Dmaven.test.skip=true java -jar rocketmq-console-ng-1.0.0.jar
(5)RocketMQ生产环境配置
- Topic创建上线禁止自动创建
- 一般是由专门的后台管理队列的CRUD,应用上线需要申请队列名称
- 生产环境推荐配置
- NameServer配置多个不同机器多个节点
- 多Master每个Master带有Slave
- 主从设置SYNC_MASTER同步双写
- Producer用同部方式投递Broker
- 刷盘策略为ASYNC_FLUSH
- 性能思路分析
- CPU:top
- 网卡:sar -n DEV 2 10 、netstat -t
- 磁盘:iostat -xdm 1
- JVM:jstack、MAT、jinfo