开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段:Java 实现消息消费】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/686/detail/11907
Java 实现消息消费
内容介绍:
一、Java 实现消息消费
二、案例
一、Java 实现消息消费
用 Java 代码进行消费,给予的是订阅机制也可以认为是接听机制。
1、消息接收步骤:
(1)创建消息消费者,指定消费者所属的组名
(2)指定 Nameserver 地址
(3)指定消费者订阅的主题和标签
(4)设置回调函数,编写处理消息的方法(消费者会一直监听 MQ,一旦发现消息用回调方式进行处理)
(5)启动消息消费者
//接收消息
public class RocketMQReceiveTest {
public static void main(string[] args) throws
mqclientException {
//1.创建消息消费者,指定消费者所属的组名
DefaultMQPushConsumer consumer
=newDefau1tMQPushCon
sumer("myconsumer-group")
;
//2.指定
Nameserver
地址
consumer.setNamesrvAddr("192.168.109.131:9876)
;
//3
.
指定消费者订阅的主题和标签
consumer.subscribe("myTopic",""");
//4.设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new
MessageListenerConc
urrently{
@O
verride
public ConsumeConcurrentlystatus
consumeMessaqe(List<Me
ssaqeExt> msqs,
ConsumeConcurrentlyContextcontext){
System.out.print1n("Receive New Messages:"+ msgs);
//返回消费状态
return ConsumeConcurrentlyStatusCONSUMESUCCESS;
}
});
//5.启动消息消费者 consumer.start();
system.out.print1n("Consumer Started.");
}
}
2、处理
打开工程新加一个类 New-Java Class
接收消息的测试
package com.itheima.test;
public class RocketMQReceiveMessageTest{
//接收消息
publi
c
static void main(string[] args){
//1创建消费者,并且为其指定消费者组名
new DefaultMQPushConsumer consumer = new DefaultMQ
PushConsumer(consumerGroup:“myconsumer-group“);
//2为消费者设置
NameServer
的地址
consumer.setNamesrvAddr("192.168.109.131:9876");
//3指定消费者订阅的主题和标签
consumer.subscribe(topic:"myTopic",subExpression:"*");
//4设置一个回调函数,并在函数中编写接收到消息之后的处理方法
consumer.registerMessageListener(new
MessageListenerConc urrently() {
//处理获取到的消息
@O
verride
public ConsumeConcurrentlystatus consumeMessage(List<Me
ssaqeext>
list,Consumeconcurrentlycontext consumeconcurr
e
ntlyContext){
//消费逻辑
system.out.println("Message--->" + list);
//返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME SUCCESS
});
//5 启动消费者
consumer.start);
system.out.println("启动消费者成功了");
}
}
启动消费者运行:
C:\Java\jdk1.8.0\bin\java.exe ..
.
启动消费者成功了
Messaqe===>[MessaqeExt [queueId=1,storeSize=182,
queue
Offset=0,sysFlaq=0,
bornTimestamp=1577690976503,bornHo
st=/
192.168.109.1:65029,storeTimestamp=1577690975351,
storeHost=/192.168.109.131:10911,msgId=C
消费者启动成功消息已经被消费,进程不会退出去一直等待,因为是基于监督机制。
还可以再发送一个消息,消费者现在只消费了一条,再来到发送启动:
c:\Java\jdk1.8.0\bin\java.exe ...
Sendresult [sendstatus=SEND OK,msgId=C0A82B9058E658
644D4698B7F5270000,
offsetMsId=C0A86D8300002A9F00000
00000AFA1A,
message
15:43:12.715 [Nettyclientselector1] INFO
RocketmqRemotin
g-closechannel: close the connection to remote address
[192.168.109
15:43:12.724 [NettyclientSelector 1] INFO
RocketmgRemoting- closechannel: close the connection to remote address[192.
168.109
15:43:12.724 [NettyclientSelector 11 INFO
RocketmgRemoting - closeChannel; close the connection to remote address[192.
168.109
Process finished with exit code 0
消费者结果:
c:\Java\jdk1.8.0\bin\java.exe..启动消费者成功了
Message===>[MessageExt[queueId=1,storesize=182,
queue
Offset=0,
sysFlag=0,bornTimestamp=1577690976503, born
Host=/
192.168.109.1:65029,storeTimestamp=1577690975351,
storeHost=/192.168.109.131:10911,msgId=C
Message===> [MessageExt[queueId=3, storesize=182,
queue
offset=0, sysflag=0,
bornTimestamp=1577691792680, bornH
ost=/
192.168.109.1:49417,storeTimestamp=1577691791527,
storeHost=/192.168.109.131:10911,msqId=
C
自动消费掉,关键是了解代码的大体逻辑以及内部的定额,内部的订阅机制是利用的监听,当一旦发现有消息过来,自动内部进行消费。
二、案例
接下来我们模拟一种场景:下单成功之后,向下单用户发送短信。设计图如下:
把代码应用到案例中去,下单成功以后会发送一个消息放到 MQ 中,通过另一个微服务从 MQ 中取消息。