3. 消息发送(重点)
3.1 主要内容和相关概念
3.1.1 Topic
首先看看官方的定义:
Topic是生产者在发送消息和消费者在拉取消息的类别相当于操作类型。Topic与生产者和消费者之间的关系非常松散。具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息。类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。 Topic在Google翻译中解释为话题。我们可以理解为第一级消息类型,类比于书的标题。在应用系统中,一个Topic标识为一类消息类型,比如交易信息。 在Producer中使用Topic:
Message msg = new Message(“TopicTest” /* Topic */,“TagA”,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
在Consumer中订阅Topic:
consumer.subscribe(“TopicTest”, “*”);
3.1.2 Tag
同样,先看看官方怎么定义的:
标签,换句话的意思就是子主题,为用户提供了额外的灵活性。有了标签,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能。 Tag在Google翻译中解释为标签。我们可以理解为第二级消息类型,类比于书的目录,方便检索使用消息。在应用系统中,一个Tag标识为一类消息中的二级分类,比如交易信息下的交易创建、交易完成。 在Producer中使用Tag:
Message msg = new Message(“TopicTest”,“TagA” /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
在Consumer中订阅Tag:
consumer.subscribe(“TopicTest”, “TagA||TagB”);// * 代表订阅Topic下的所有消息
3.1.3 GroupName
和现实世界中一样,RocketMQ中也有组的概念。代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。
作用是在集群HA的情况下,一个生产者down之后,本地事务回滚后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标。 另外,有了GroupName,在集群下,动态扩展容量很方便。只需要在新加的机器中,配置相同的GroupName。启动后,就立即能加入到所在的群组中,参与消息生产或消费。 在Producer中使用GroupName:
DefaultMQProducer producer = new DefaultMQProducer(“group_name_1”);// 使用GroupName来初始化Producer,如果不指定,就会使用默认的名字:DEFAULT_PRODUCER
在Consumer中使用GroupName:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group_name_1”);// 使用GroupName来初始化Consumer,如果不指定,就会使用默认的名字:DEFAULT_CONSUMER
- 基于Java环境构建消息发送与消息接收基础程序
- 单生产者单消费者
- 单生产者多消费者
- 多生产者多消费者
- 发送不同类型的消息
- 同步消息
- 异步消息
- 单向消息
- 特殊的消息发送
- 延时消息
- 批量消息
- 特殊的消息接收
- 消息过滤
- 消息发送与接收顺序控制
- 事务消息
3.2 消息发送与接收开发流程
- 谁来发?
- 发给谁?
- 怎么发?
- 发什么?
- 发的结果是什么?
- 打扫战场
3.3 单生产者单消费者消息发送(OneToOne)
- 导入RocketMQ客户端坐标
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
3.4 单生产者单消费者消息发送(OneToOne)
- 生产者
//1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("192.168.184.128:9876"); //3.1启动发送的服务 producer.start(); //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8")); //3.2发送消息 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); //5.关闭连接 producer.shutdown();
返回结果:SendResult [ sendStatus=SEND_OK, msgId=C0A80F291A1018B4AAC268E060E00000, offsetMsgId=C0A8410100002A9F000000000005E699, messageQueue=MessageQueue [topic=topic1, brokerName=WIN-P2JLR6AM32M, queueId=3], queueOffset=7]
- 注意:关闭服务器防火墙
systemctl stop firewalld.service
3.5 单生产者单消费者消息发送(OneToOne)
- 消费者
//1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.184.128:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* consumer.subscribe("topic1","*"); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //接收结果是一个集合 for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); //4.启动接收消息的服务 consumer.start();
接收结果:消息:MessageExt [queueId=3, storeSize=163, queueOffset=5, sysFlag=0, bornTimestamp=1608470799151, bornHost=/192.168.65.1:58055, storeTimestamp=1608470799152, storeHost=/192.168.65.1:10911, msgId=C0A8410100002A9F000000000005E103, commitLogOffset=385283, bodyCRC=1932557065, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=8, CONSUME_START_TIME=1608511664435, UNIQ_KEY=C0A80F29350C18B4AAC26672CB2F0006, WAIT=true}, body=[104, 101, 108, 108, 111, 32, 114, 111, 99, 107, 101, 116, 109, 113], transactionId='null'}]
3.6 单生产者多消费者消息发送(OneToMany)
- 消费者(负载均衡模式:默认模式)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.184.128:9876"); consumer.subscribe("topic1","*"); //设置当前消费者的消费模式(默认模式:负载均衡) consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start();
3.7 单生产者多消费者消息发送(OneToMany)
- 消费者(广播模式)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.184.128:9876"); consumer.subscribe("topic1","*"); //设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start();
3.8 多生产者多消费者消息发送(ManyToMany)
- 多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
3.9 小节
- 消息发送
- One-To-One(基础发送与基础接收)
- One-To-Many(负载均衡模式与广播模式)
- Many-To-Many
3.10 消息类别
- 同步消息
- 异步消息
- 单向消息
3.11 同步消息
特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
3.12 异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dLNGDHtx-1608424332896)(img/image-20201211154108518.png)]
3.13 单向消息
特征:不需要有回执的消息,例如日志类消息
3.14 单向消息
- 同步消息
SendResult result = producer.send(msg);
- 异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
producer.send(msg, new SendCallback() { //表示成功返回结果 public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示发送消息失败 public void onException(Throwable t) { System.out.println(t); } });
- 单向消息
producer.sendOneway(msg);
3.15 延时消息
- 消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8")); //设置当前消息的延时效果 msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回结果:"+result);
- 目前支持的消息时间
- 秒级:1,5,10,30
- 分级:1~10,20,30
- 时级:1,2
- 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
3.16 批量消息
- 发送批量消息
List<Message> msgList = new ArrayList<Message>(); SendResult send = producer.send(msgList);
- 消息内容总长度不超过4M
- 消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
3.17 消息过滤
- 分类过滤
- 生产者
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2" 标题 Tag ).getBytes("UTF-8"));
- 消费者
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag consumer.subscribe("topic6","tag1 || tag2");
- 语法过滤(属性过滤/语法过滤/SQL过滤)
- 生产者
//为消息添加属性 msg.putUserProperty("vip","1"); msg.putUserProperty("age","20");
- 消费者
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法 consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
- 注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
- 启动服务器使启用对应配置文件
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
3.18 错乱的消息顺序
3.19 顺序消息
- 发送消息
//设置消息进入到指定的消息队列中 for(final Order order : orderList){ Message msg = new Message("orderTopic",order.toString().getBytes()); //发送时要指定对应的消息队列选择器 SendResult result = producer.send(msg, new MessageQueueSelector() { //设置当前消息发送时使用哪一个消息队列 public MessageQueue select(List<MessageQueue> list, Message message, Object o) { //根据发送的信息不同,选择不同的消息队列 //根据id来选择一个消息队列的对象,并返回->id得到int值 int mqIndex = order.getId().hashCode() % list.size(); return list.get(mqIndex); } }, null); System.out.println(result); }
3.20 顺序消息
- 接收消息
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列 consumer.registerMessageListener(new MessageListenerOrderly() { //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务 / /,转化为一个消息队列一个线程服务 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });
3.21 事务消息
- 正常事务过程
- 事务补偿过程
3.22 事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
- 注意:事务消息仅与生产者有关,与消费者无关
3.23 事务消息
//事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.COMMIT_MESSAGE; } //事务补偿过程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown();