Rocket MQ的概念
一图胜千言,都在图里了。
从上面的图,我们可以看出主题是一类消息的集合,每条消息必须属于一个主题。 RocketMQ中的每个消息拥有唯一的Message ID,且可以携带具备业务标识的key,当然你也可以选择不给。 标签(tag)是更细一级的划分,用于同一主题下区分不同类型的消息。
同一类生产者构成生产者小组,同一类消费者构成消费者小组。 消费者消费的方式也有两种:
- 推动式消费
该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
- 拉取式消费
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
我们讲过RocketMQ是可以集群的,所以当多台Broker的时候,我们就不能按IP和端口去找了,我们用broker的名字去找,让nameServer去问,类似于注册中心。
架构图:
有了上面的基础概念,我们来开始写个Hello world吧。
安装与配置
RocketMQ对内存要求比较高,默认是8G来着,就算你改了默认的配置,改成2G, 基本上也跑不起来,知道真相的我眼泪掉下来,我的阿里云服务器学生版只有2G,当时改了配置,我还有疑问为啥跑不起来。 Rocket MQ 是阿里巴巴开源的,文档也比较丰富,我们按照文档走就行。
我们本次下载4.40.release版的然后上传到Linux上
-- 上传 rz rocketmq-all-4.4.0-bin-release.zip -- 解压 unzip rocketmq-all-4.4.0-bin-release.zip -- 名字太长改下名 mv rocketmq-all-4.4.0-bin-release.zip rocketmq 复制代码
主要配置的
配置的
- nameServer 哪台主机是nameServer
- master: 谁是主节点
- /etc/hosts 域名映射 在hosts文件中加入以下配置
-- 未来会搭建集群,mqmater是主节点,mqnameserver1是一个,未来会有多个 47.101.136.147 mqmaster1 47.101.136.147 mqnameserver1 复制代码
-- 持久化到哪里 mkdir mqstore cd mqstore -- 日志 mkdir commitlog -- 消费队列 mkdir consumequeue -- 索引 mkdir index -- 进入配置的文件夹 cd conf 两主两从异步 cd 2m-2s-async vi broker-a.properties namesrvAddr = mqnameserver1:9876 分号隔开 默认主题数量 autoCreateTopicEnable =true defaultTopicQueueNums = 4 -- 对外暴露的接口 listenPort = 10911 -- 无用文件保存时间 deleteWhen = 04 -- 最多保存多长时间 fileReservedTime = 48 -- 存储路径 storePathRootDir=/usr/rocketmq/mqstore -- 提交日志 storePathCommitLog=/usr/rocketmq/mqstore/commitlog -- 消费路径 storePathConsuQueue=/usr/rocketmq/mqstore/commitlog/consumequeue -- 索引路径 storePathIndex=/usr/rocketmq/mqstore/commitlog/consumequeue -- 消息的最大数量 maxMessageSize=65536 -- 主从同步,这是指明当前消息队列是异步,而且是主节点 brokerRole=ASYCN_MASTER -- 同步到硬盘,异步刷新 flushDiskType=ASYNC_FLUSH -- 0 表示mastter 大于0是从 brokerId = 0 -- 批量替换 这是配置RocketMq的日志路径,这个请在conf文件夹下配置 sed -i 's#${user.home}#/usr/rocketmq/#g' *.xml 复制代码
设置启动参数:
-- 把8G变成1G vi runbroker.sh -- 把8G变成1G vi runserver.sh 复制代码
启动:
--可能不少视频在教这类入门的时候,会建议你把防火墙关掉,我不建议你这么做,你搞一台云服务器就懂了,很容易受到攻击。 -- 所以这里我们将RocketMQ需要的端口在端口中放行婴喜爱。 firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --zone=public --add-port=10909/tcp --permanent firewall-cmd --reload -- 后台启动 nohup sh mqnamesrv & -- 后台启动 nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties& -- jps 在linux上可以查出有启动了多少java应用,看到下面这些,说明启动正常。 31972 BrokerStartup 31930 NamesrvStartup 3485 Application 32031 Jps 复制代码
控制台监控MQ
通常对于MQ这类比较重要的产品,我们是需要监控一些的,RocketMQ提供了监控服务:
下载之后,我们引入到IDEA中,下载的是源码。 server.contextPath= server.port=8080 改下配置文件
### SSL setting #server.ssl.key-store=classpath:rmqcngkeystore.jks #server.ssl.key-store-password=rocketmq #server.ssl.keyStoreType=PKCS12 #server.ssl.keyAlias=rmqcngkey #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr= 这里是你RocketMQ的地址 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=false 复制代码
这个控制台是用Spring Boot写的,直接启动即可。 正常启动,浏览器中的页面是这样的:
Hello World
接着我们来大致的写一个生产消息,消费消息的demo吧。学到一定地步,有的时候可以不用看接口文档的,直接根据思想推断即可。
上面我们讲到生产者和消费者之间的桥梁是nameServer,在java中肯定是要先new的,先new消费者,指明消费者在哪个组。 然后消费消息应该怎么消费,一个一个消费是跳着消费,消费的主题是哪一个,主题的哪一个标签。 基于这种思想,我们的DEMO如下:
public class MyProducer { private static final String NAMESRV_ADDR = "47.101.136.147:9876"; public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("myProducer"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10 ; i++) { Message message = new Message("MyTopic4","MyTags1","key"+i,("mq"+i).getBytes()); try { SendResult result = producer.send(message,1000000); System.out.println("发送成功"+result); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } producer.shutdown(); } } 复制代码
public class MyConsumer { // 指明nameServer的地址 private static final String NAMESRV_ADDR = "47.101.136.147:9876"; public static void main(String[] args) throws MQClientException { // 消息队列向消费者推,你也可以拉,构造中,指明消费者所属的组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer"); consumer.setNamesrvAddr(NAMESRV_ADDR); // 设置为顺序消息 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 指明消费主题 consumer.subscribe("MyTopic4","*"); // 监听, consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { MessageExt message = list.get(0); System.out.println(message.getTopic()); System.out.println(message.getKeys()); System.out.println(message.getTags()); try { System.out.println(new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } } 复制代码
所需要的依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-namesrv</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-broker</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>${aspectj.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>${aspectj.version}</version> </dependency> <!-- Spring AOP + AspectJ --> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.jooq</groupId> <artifactId>joor</artifactId> <version>0.9.6</version> </dependency> </dependencies>