1.RocketMQ4.X简介
阿里开源消息队列RocketMQ介绍
- Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。
- 特点:
- 支持Broker和Consumer端消息过滤,既可以在Broker端过滤消息,也可以在Consumer端过虑消息。
- 支持发布订阅,点对点模型。
- 支持消费端pull消息和Broker来push消息。
- 单一队列百万消息、亿级消息堆积。
- 支持单master节点,多master节点,多master多slave节点
- 任意一点都是高可用,水平扩展,Producer、Consumer、队列都可以分布式
- 消息失败重试机制、支持特定level的定时消息
- 新版本底层采用Netty
- 4.3x支持分布式事务
- 适合金融类业务,高可用性跟踪和审计功能
- 概念:
- Producer:消息生产者
- Producer Group:消息生产者组,发送同类消息的一个消息生产者组
- Consumer:消费者
- Consumer Group:消费同类消息的多个实例
- Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一主题下的不同业务的消息。
- Topic:主题,如订单类消息,queue式消息的物理管理单位,而topic是逻辑管理单位,一个topic下可以有多个queue。默认自动创建4个,手动创建是8个。
- Message:消息,每个message必须指定一个topic。
- Broker:MQ程序,接收生产的消息,提供给消费者消费的程序,一个Broker就是一个MQ节点。
- Name Server:给生产者和消费者提供路由信息,提供轻量级的服务发现、路哟与、元数据信息,可以多个部署,相互独立(比Zookeeper更轻量)。
- Offset:偏移量,每个消费者都会维护一个当前最大的消费偏移量。
- commit log:消费存储会写在Commit log文件里面,消息持久化存储的地方。
2.Springboot整合RocketMQ4.X
(1)创建SpringBoot项目,加入相关依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
(2)Message对象
- topic:主题名称
- tag:标签,用于过滤
- key:消息唯一标识,可以是业务字段组合
- body:消息体,字节数组
- 注意:发送消息到Broker,需要判断是否有此topic,启动Broker的时候,本地建议开启自动创建topic,生产环境加以关闭自动化创建topic。建议手工创建topic,如果靠程序自动创建,然后在投递消息,会出现延迟的情况。
- 概念模型:一个topic下面对应多个queue,可以创建Topic时指定,如订单类topic
(3)生产者发送消息编码实战
@Component public class PayProducer{ private String producerGroup = "pay_producer_group"; public DefaultMQProducer producer; public PayProducer(){ //创建Producer对象,设置生产者组 producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多个地址逗号隔开 producer.setNamesrvAdder(JmsConfig.NAME_SERVER); //开启生产者 start(); } //得到Producer对象 public DefaultMQProducer getProducer(){ return this.producer; } /** * 对象在使用之前必须调用一次,只能初始化一次 */ public void start(){ try{ this.producer.start(); }catch(MQClientException e){ e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } }
public class JmsConfig { public static final String NAME_SERVER = "8.140.116.67:9876"; public static final String TOPIC = "pay_topic"; }
@RestController @RequestMapping("api/v1/pay") public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga",text.getBytes()); SendResult sendResult = payProducer.getProducer().send(message); System,out,println(sendResult); return sendResult; } }
(4)常见错误一
MQClientException: No route info of this topic, TopicTest1 原因:Broker禁止自动创建Topic,且用户没有通过手工取创建此Topic,或者broker和nameServer网络不通 解决:sh bin/mqbroker -m 查看配置信息 autoCreateTopicEnable=true 则自动创建topic centos7关闭防火墙 systemctl stop fitewalld 手动创建topic
(5)常见错误二
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 原因:阿里云存在多个网卡,rocketmq都会根据当前网卡选择一个ip使用,当你的机器有多个网卡时,很有可能会有问题,阿里云机器有两个网卡,因此需要broker.conf来配置当前公网ip,然后重启broker broker.conf新增:brokerIP1=公网IP 在以配置文件的方式重启broker
(6)常见错误三
控制台查询不了数据,连接提示10909错误 原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909 解决:阿里云安全组加上10909端口
(7)其他问题
https://blog.csdn.net/qq_14853889/article/details/81053145 https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8 %B8%E8%AF%B4%E6%98%8E https://www.jianshu.com/p/bfd6d849f156 https://blog.csdn.net/wangmx1993328/article/details/81588217
(8)消费者消费消息编码实战
@Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer(){ //new一个Consumer对象,这是ConsumerGroup consumer = new DefaultMQPushConsumer(consumerGroup); //设置NameServer地址 consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //设置消费的策略,从最后一个开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); try { //订阅主题 consumer.subscribe(JmsConfig.TOPIC,"*"); //注册消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //业务逻辑处理 MessageExt message = msgs.get(0); String body = new String(msgs.get(0).getBody()); String tags = msgs.get(0).getTags(); String keys = msgs.get(0).getKeys(); System.out.println("message:"+message+"--body:"+body+"--tags:"+tags+"--keys:"+keys); //消费完成返回给broker消费的状态,成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } catch (MQClientException e) { e.printStackTrace(); } try { //开启消费者 consumer.start(); System.out.println("consumer.start()..."); } catch (MQClientException e) { e.printStackTrace(); } } }
(9)常见问题
1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null] 3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, MacBook-Air.local, MacBook-Air.local] 解决:多网卡问题处理 1、设置producer: producer.setVipChannelEnabled(false); 2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip) namesrvAddr = 192.168.0.101:9876 brokerIP1 = 192.168.0.101 4、DESC: service not available now, maybe disk full, CL: 解决:修改启动脚本runbroker.sh,在里面增加一句话即可: JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98" (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息) 常见问题处理 https://blog.csdn.net/sqzhao/article/details/54834761 https://blog.csdn.net/mayifan0/article/details/67633729 https://blog.csdn.net/a906423355/article/details/78192828
3.RocketMQ4.X集群架构
3.1.RocketMQ4.X集群模式架构
(1)单机模式
优点:本地开发测试用,配置简单,同步刷盘消息不会丢失
缺点:不可靠,服务宕机,导致服务不可用,数据丢失
(2)主从模式(异步复制、同步双写)
优点:同步双写不丢失数据,异步复制存在少量数据丢失,主节点宕机,从节点可以对外提供消费服务,但是不提供写服务。
缺点:主备有短暂的消息延迟,毫秒级,主节点宕机后,目前不支持自动切换,需要手动设置从节点成为主节点。
(3)双主、多主模式
优点:配置简单,可以根据配置RAID10磁盘阵列保证消息的可靠性,异步刷盘丢失少量消息
缺点:master宕机期间,未被消费的消息不能被消费,只有当节点恢复才会恢复消费
(4)双主双从,多主多从模式(异步复制)
优点:磁盘损坏,消息丢失的量少,消息消费的实时性不受影响,Master宕机后,会从Slave消费消息
缺点:主备消息同步由延迟,Master宕机会存在少量信息丢失
(5)双主双从,多主多从模式(同步双写)
优点:同步双写方式,主备都写成功,才返回成功,服务可用性与数据的可用性都非常高
缺点:新跟那个相比异步复制要低,主节点宕机后不支持自动切换主机
(6)消息可靠性之同步、异步刷盘
异步刷盘:每个节点,不论是从节点还是主节点都会有一个服务内存数据和磁盘同步的过程,异步刷盘就是当数据到达节点内存后,就返回成功,采用异步复制的方式给磁盘同步数据。
同步刷盘:同步刷盘就是当节点中的数据全部同步到磁盘中时,才会返回确认成功。
(7)消息可靠性之同步、异步复制
同步复制:数据安全性搞,性能相对差一点
异步复制:数据安全性低,性能会高一点
最终建议:同步双写,异步刷盘
3.2.RocketMQ主从模式搭建
机器列表
192.168.10.200 192.168.10.201
(1)修改RocketMQ(runserver.sh,runbroker.sh)
vim runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" vim runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms528m -Xmx528m -Xmn256m"
(2)启动两个节点的nameServer
nohup sh runserver.sh
(3)编辑broker配置文件并启动两个broker
主节点: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & 配置: namesrvAddr=192.168.159.129:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH
从节点: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & 配置: namesrvAddr=192.168.159.129:9876;192.168.159.130:9876 brokerClusterName=XdclassCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH
(4)安装控制台
修改事项 pom.xml 里面的rocketmq版本号 application.properties里面的nameserver 增加 rocketmq.config.namesrvAddr=192.168.159.129:9876;192.168.159.130:9876 mvn install -Dmaven.test.skip=true java -jar rocketmq-console-ng-1.0.0.jar
3.3.RocketMQ4.X主从同步总结
- Broker分为master和slave,一个master可以对应多个slave,一个slave只能对应一个master,master与slave通过相同的Broker Name来匹配,不同的broker id来定义时master还是slave。
- Broker向所有的NameServer节点建立长连接,定时注册Topic和发送元数据信息。
- NameServer定时扫描(默认2分钟)所有存货broker的连接,如果超时未响应则断开连接(心跳检测),但是consumer客户端感知不到,consumer定时30s从NameServer获取topic最新信息,所以broker不可用时,consumer最多也就30s就能发现broker宕机。
producer和consumer一样,在未发现broker宕机前发送的消息会失败
只有master才会有写消息的操作,slave只能提供消费,同步的策略取决于master的配置
客户端消费可以从master和slave消费,默认消费者都从master消费,如果master宕机后,客户端从NameServer中感知到Broker宕机,就会从slave消费,感知非实时,存在一定的之后性,slave不能保证master的消息100%同步过来,会有少量的消息丢失,但是一旦master恢复,未同步过去的消息会最终被消费掉。
如果consumer实例的数量比message queue的总数量还要多,多出来的consumer将无法分到queue,也就无法消费消息,无法起到负载的作用,所以需要控制让queue的总数量大于等于consumer的数量。