Topic可以理解为在rocketMq体系当中作为一个逻辑消息组织形式,一般情况下一类业务消息会申请一个topic来实现业务之间隔离。
Topic是一个逻辑上的概念,实际上在每个broker上以queue的形式保存,也就是说每个topic在broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。
Topic创建的时候可以用集群模式去创建(这样集群里面每个broker的queue的数量相同),也可以用单个broker模式去创建(这样每个broker的queue数量可以不一致)。
Queue是Topic在一个Broker上的分片等分为指定份数后的其中一份,是负载均衡过程中资源分配的基本单元。
概念:
Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
Topic:消息主题,负责标记一类消息,生产者将消息发送到Topic,消费者从该Topic消费消息
Broker:消息中转角色,负责存储消息,转发消息,一般也称为 Server,在 JMS 规范中称为 Provider
NameServer:服务发现Server,用于生产者和消费者获取Broker的服务;
Rocketmq模块划分:
特性:
Producer端:
发送方式:
Sync:同步的发送方式,会等待发送结果后才返回
Async:异步的发送方式,发送完后,立刻返回。Client 在拿到 Broker 的响应结果后,会回调指定的 callback. 这个 API 也可以指定 Timeout,不指定也是默认的 3000ms.
Oneway:比较简单,发出去后,什么都不管直接返回。Ps:日志
普通消息的发送:
- 准备工作 mesasge、网络相关、线程相关
- 从namesrv获取topic路由(缓存机制)
- 组装数据,broker需要的序列化数据(json)
- Netty发送(源码)
定时消息
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
顺序消息
事务消息
MQ应用场景
异步处理,应用解耦,流量削锋和日志处理,消息通讯5个场景
启动 name server(namesrv启动之后的默认端口号是9876):
nohup sh bin/mqnamesrv &
看日志:
tail -f ~/logs/rocketmqlogs/namesrv.log
关闭 name server:
sh bin/mqshutdown namesrv
调整启动内存等
vim /data/backup/rocketmq-all-4.4.0-bin-release/bin/runserver.sh
这里把Broker跟Namesrv装在一个服务器上面,使用的Broker配置文件是自带的2m-noslave/broker-a.properties。
注意这个配置文件里面没有属性brokerIP1,他默认取本机IP,如果你服务器的网卡设置过于复杂,他会取的IP报错,后续就连不上这个Broker,建议大家手动修改这个IP地址。
brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH brokerIP1=172.27.0.8
启动 broker:
sh mqbroker -n 172.27.0.8:9876 -c ../conf/2m-noslave/broker-a.properties &
-n后面代表的是namesrv的地址和端口
-c后面代表的是broker的配置文件地址
看日志:
tail -f ~/logs/rocketmqlogs/broker.log
关闭 broker:
sh bin/mqshutdown broker
调整启动内存等
vim /data/backup/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
rocketmq 默认会开启10909,10911
1. 目前这种写法Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909。若Rocket服务器未启动端口10909,则报connect to <> failed。
2. 解决方式:增加一行代码producer.setVipChannelEnabled(false);
package com.rokcetmq; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //声明并初始化一个producer //需要一个producer group名字作为构造方法的参数,这里为producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔 //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里 producer.setNamesrvAddr("132.232.85.11:9876"); // producer.setVipChannelEnabled(false); //调用start()方法启动一个producer实例 producer.start(); //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //调用producer的send()方法发送消息 //这里调用的是同步的方式,所以会有返回结果 SendResult sendResult = producer.send(msg); //打印返回结果,可以看到消息发送的状态以及一些相关信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //发送完消息之后,调用shutdown()方法关闭producer producer.shutdown(); } }
Consumer
package com.rokcetmq; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //声明并初始化一个consumer //需要一个consumer group名字作为构造方法的参数,这里为consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同样也要设置NameServer地址 consumer.setNamesrvAddr("132.232.85.11:9876"); //这里设置的是一个consumer的消费策略 //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息 //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置consumer所订阅的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "TagA"); //设置一个Listener,主要进行消息的逻辑处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for(MessageExt e:msgs){ System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs +":"+new String(e.getBody())); } //返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,需要稍后重新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //调用start()方法启动consumer consumer.start(); System.out.println("Consumer Started."); } }