开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段:下单消息的发送和接收案例】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/686/detail/11908
下单消息的发送和接收案例
1、案例
接下来我们模拟一种场景:下单成功之后,向下单用户发送短信。设计图如下:
下单操作,当下单成功以后需要向下单的用户发送一条短信通知,下单操作是发生在订单微服务,发送短信的操作是发生在用户微服务,涉及到了两个微服务的调用问题,下单之后发送短信不需要立即拿到响应结果,非常适用于 MQ 的使用场景,所以加入了一个 MQ。当用户下单成功以后,会将下单成功的信息推送到 MQ 中,然后用户微服务会订阅 MQ 中的消息,一旦发现有下单成功的消息推过来,立即进行消息的消费。
实现一下场景,首先处理左部分订单微服务中下单成功以后向 MQ 投递消息的过程。
2、订单微服务发送消息
(1)在 shop-order 中添加 rocketmq 的依赖
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactid>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmqr/groupid>
<artifactId>rocketmq-client</artifactid>
<version>4.4.0</version>
</dependency>
(2)添加配置
rocketmq:
name-server: 192.168.109.131:9876 #rocketmQ服务的地址
producer:
group:shop-order # 生产者组
(3)编写测试代码
@RestController
@s1f4j
public class OrderContro1ler2 {
@Autowired
private OrderService orderService
;
@
Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate
;
//准备买1件商品
@GetMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid){
1og.info(">>客户下单,这时候要调用商品微服务查询商品信息");
//通过
fegin
调用商品微服务
Product product=productServicefindByPid(pid);
if (product == nu11){
Order order=newOrder();
order.setPname("下单失败");
return order;
}
1og.info(">>商品信息,
查询结果:"+JSON.toJSONString(product));
Order order =new Order();
order.setuid(1):
order.setUsername("测试用户");
ordet.setpid(product.getpid());
order.setPname(product.getPname)):
order.setPprice(productgetPprice); order.setNumber(1);
orderService.save(order);
//下单成功之后,将消息放到
mq
中
rocketMQTemplate.convertAndSend("order-topic",order);
return order;
}
}
①处理一下:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifact
Id>
</dependency>
<!--fegin-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactid
>
</dependency>
<!--sentine1-->
<dependency>
<qroupId>com.alibaba.cloud</qroup
I
d>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactid></dependency>
<
!
--rockatm
q
-->
//首先复制一下找到订单,导入
<
!
--rockatm
q
-->
<dependenay>
<groupId>org.apache.rocketmg</groupId>
<artifactId>rocketmg-spring-boot-starter</artifactid>
<version>2.0.2</version>
</dependenay>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmg-client</artifactid>
<version>4.4.0</version>
</dependency>
</dependencies>
②修改配置文件直接粘贴,第一个选项是指定了 NameServer 的地址,第二个指定的生产者的组名
hbm2dd1
auto: update
dialect:org.hibernatedialect.MysQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr:localhost:8848
sentinel:
transport:
port:9999 #跟控制台交流的端口,随意指定一个未使用的端口即可 dashboard:localhost:8080#指定控制台服务的地址
# filter:
# enabled:false
service-product:#调用的提供者的名称
ribbon:
NFLoadBalancerRuleClassName:com.netflixloadbalancer.RandomRule
ribbon:
ReadTimeout:5000
ConnectTimeout:5000
#开启 feign 对 sentinel 的支持
feign:
sentinel:
enabled:true
#roaketmq
name-server: 192.168.109.131:9876 #rocketMQ 服务的地址
producer:
group: shop-order#生产者组
③第三步去编写代码,没找到下单成功的地方有一个模拟的延时就去掉,下单成功以后可以向 MQ 中投递一个下单成功的消息,利用的API 就是 RocketMQ 的 private 注入一个 Autowired private
RockerMQTemplate rockerMQTemplate。
@Autowired
private RestTemplate restTemplate;
@Autowired
private orderservice orderService;
@Autowired
private Productservice productservice;
@Autowired
private Discoveryclient discoveryClient;
@Autowired
private RockerMQTemplate rockerMQTemplate;
//下单--fegin
@RequestMapping("/order/prod/{pid}")
public order order(@PathVariable("pid") Integer pid) {
Iog.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息",pid);
//调用商品微服务,查询商品信息
Product product =productServicefindByPid(pid);
if(product.getpid()==-100){
Order order=neworder;
order.setoid(-100L);
order.setPname("下单失败");
return order;
}
log.info("查询到()号商品的信息,内容是:()}",pid,
JSON.tojsonstring(product));
//下单(创建订单)
Order order=neworder();
order.setuid(1);
order.setUsername("测试用户");
order.setpid(pid);
order.setPname(product.getPname(());
order.setPprice(productgetPprice()));
order.setNumber(1);
orderService.createorder(order);
log.info("创建订单成功,订单信息为{)",
JSON.tojsonstring(order));
//向 mq 中投递一个下单成功的消息
//参数一:指定topic
//参数二:指定消息体
rocketMQTemplate.convertAndSend( destination: "order-topic ", order);
return order;
}
//下单--ribbon 负载均衡
@RequestMapping("/order/prod/{pid}")
public order order(@PathVariable("pid") Integer pid){
log.info ("接收到()号商品的下单请求,接下来调用商品微服务查询此商品信息",pid);
//调用商品微服务,查询商品信息
//问题:
//代码可读性不好
运行一下:
c:\Java\idk1.8.0\bin\java.exe
Connected to the target VM, address: '127.0.0.1:55940',
transport: 'socket'
2019-12-30 16:53:24.183 INFO[-] 36268 --- [ main]
trationDelegate$BeanPostProcessorChecker :Be
启动完毕之后,下一个订单
刷新一下,服务没有报错
{
“oid”: 56
“uid”:1
“usernane”:“测试用户”,
“pid": 2.
“pname” "华为",
“pprice”: 2000.
“nunber”:1
}
通过 RocketMQ 提供的控制台来看一下是否报错,刷新一下 Message 界面,选定一下主题,点击搜索
已经出现了一条消息
点击消息详情,可以看到消息已经过来,2号商品的订单
代表投递消息已经完成
2、用户微服务订阅消息
消息已经到 MQ 中只需要在用户微服务填写代码去监听 MQ,一旦发现相同的主题下有消息过来,进行消费就可以。第一步先依赖,第二步在主类上开启它,因为现在的 user 里面还没有加 naps 依赖,第三步加配置文件主要的是两部分,第一个是加 nacos,第二个是加 rocketmq,指定一个 name server。最重要的一个步骤是如何来消费消息,要求自己实现一个类,然后这个类要去实现一个 rocket mq listener 接听器,有一个泛型指具体的消息内容,然后去重新写一个方法叫 onmessage,当消息过来以后如何消费消息的方法有一个注解,注解里面要求指定消息消费者的组以及消费的是哪一个 topic
(1)修改 shop-user 模块配置
<?xm1 version="1.0" encoding="UTF-8"?>
<project xm1ns="http://maven.apache.org/POM/4.0.0"
xm1ns:xsi="http://www.w3.org/2001/XMLSchema-instance xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-alibaba</artifactid>
<groupId>com.itheima</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<mode1version>4.0.0</modelversion>
<artifactId>shop-user</artifactid>
<dependencies>
<dependency>
<groupId>com.itheima</groupId>
<artifactId>shop-common</artifactid>
<version>1.0-SNAPSHOT</version>
<
dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-
discovery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<
artifactId>rocketmq-c1ient</artifactid>
<version>4.4.0</version>
</dependency>
</dependencies>
</project>
(2)修改主类
@SpringBootApplication
@EnableDiscoveryclient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class,args);
}
}
(3)修改配置文件
server:
port:8071
spring:
application:
name: service-user
datasource:
driver-c1ass-name:com.mysq1.jdbc.Driver
ur1:jdbc:mysq1:///shop?
serverTimezone=UTC&useUnicode=true&characterEncoding
=utf-8&useSSL=true
username:root
password:root
jpa:
properties:
hibernate:
hbm2dd1:
auto: update
dialect:org.hibernate.dialect.MySQL5InnoDBDialect
cloud: nacos:
discovery:
server-addr: 127.0.0.1:8848
rocketmq:
name-server: 192.168.109.131:9876
(4)编写消息接收服务
package com.itheima.service;
//发送短信的服务
as1f4j
@Service
@RocketMQMessaqeListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements
RocketMQListener<Order> {
@Override
public void onMessage(Order order){
1og.info("收到一个订单信息{},接下来发送短信",
JSON.toJSONString(order));
}
}
(5)启动服务,执行下单操作,观看后台输出
①处理一下第一步:
把一部分代码直接拷走拿过来找到 user 文件 pom.xml,叫web 和 com
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupid>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactid
>
</dependency>
<dependency>
<qroupId>orq.apache.rocketmg</qroupid>
<artifactId>rocketmg-spring-boot-starter</artifactid>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketm
q
</groupid>
<artifactId>rocketmg-client</artifactid
>
<version>4.4.0</version>
</dependency>
</dependencies>
</progect>
②第二步需要添加 EnableDiscoveryClient
package com.itheima;
Import
...
espringBootApplication
@EnableDiscoveryClient
public class Userapplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class);
}
}
③第三步添加配置文件
server:
port:8071
spring:
application:
name:service-user
datasource:
driver-class-name:com.mysgljdbc.Driver
url:jdbc:mysql:///shop?
serverTimezone=UTC&useUnicode=true&characterEncoding
=utf-8&usessL=true
username:root
password:root
jpa:
properties:
hibernate:
hbm2dd1:
auto:update
dialect:org.hibernatedialect.MySQL5InnoDBDialect
cloud:
nacos:
discovery:
server-addr:127.0.0.1:8848
//点一下能进去就没有问题
#rocketmq
roaketmq:
name-server: 192.168.109.131:9876
④编写代码消费消息
自己去写一个实现类
去实现一个接口叫 rocketmq listener,需要传入一个泛型消息的具体实现比,消息传过来的时候是一个 order,Order 注意选自己的实体号,接下来去实现方法,这个方法就是进行消费的逻辑。
package com.itheima.service;
import com.itheima.domain.order;
import org.apache.rocketmg.spring.coreRocketMlistener
;
@S1f4j
@Service
//consumerGroup消费者组名topic-要消费的主题
@RocketMOMessageListener(consumerGroup = "shop-user", topic = "order-topic"
public class SmsService implements
RocketMQListener<Order> {
//消费逻辑
Goverride
public void onMessage(Order message){
log.info("接收到了一个订单信息{},接下来就可以发送短信通知了",message);
}
}
将短信发送出去,改一下加上一个注解,需要在上面加两个东西,第一个是放入容器的,第二个是 rocket mq 提供的 Rocket mq message listener,指定两个东西一个是消费者的组名,另外一个是topic,为了保持一致 consumer group 直接抄一下,把名字Shop-user拿过来,第二个叫 topic 指定消费主题,主题必须跟短信发送的主题保持一致,如果不一致消费不了,要指定一个注释,需要写 Consumer group 消费者组名、Topic 要消费的主题。
找到 user applications 在里面跑起来:
服务正在启动
重新发一个消息看是否可以进行消费:
再下一个3号订单
{
“
oid
”
: 57,
“
uid”: 1
“
usernane
”:
“测试用户”
“
pid
”
:3
“
pname
:
"苹果",
“
pprice"
:
3000.
“
nunber”:
1
}
[service-user,,,] 25964 --- [MessageThread_1] com.itheima.se rvice.SmsService :接收到了一个订单信息 order(oid=57,uid=1,username=测试用户,pid=3,pname=苹果,pprice=3000.0,number=1),接下来可以发送短信通知了
多来几次发现每次都会过来
订单微服务投递消息,用户微服务消费消息,大家主要理解第一个是 rocketmq 是用 rocketmq template 的方法,第二个是如何来消费消息主要是写一个监听的类,然后去实现鉴定方法。