Spring Cloud【Finchley】实战-07异步下单

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud【Finchley】实战-07异步下单

概述


学习了RabbitMQ、Spring Boot整合RabbitMQ以及使用Spring Cloud Stream操作RabbitMQ之后,我们着手改造下吧


Product微服务改造


接入配置中心


既然引入了配置中心,那么我们把artisan product微服务也接入到配置中心吧 。


Step1 引入依赖

作为客户端引入spring-cloud-config-client即可

 <dependency>
      <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-config-client</artifactId>
  </dependency>


Step2 远端Git存储中心,新增artisan product模块的配置文件

将原来配置在application.yml中的配置信息拆分到bootstrap.yml及远端的git文件中

规划一下:

  • application.yml —端口信息
  • 远端git:artisan-product-dev.yml --数据库等
  • bootstrap.yml: Config Server,Eureka等信息

application.yml

server:
  port: 8080


远端git:artisan-product-dev.yml

spring:
# datasource
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/o2o?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
    username: root
    password: root
  # jpa 输出sql
  jpa:
    show-sql: true


bootstrap.yml

spring:
  application:
    name: artisan-order
  cloud:
    config:
#      profile: dev
      # 可配置多个,不推荐使用,因为需要设置具体的ip.服务端修改或者新增IP后,要同步修改
      # uri: http://localhost:9898/,http://localhost:9999/
      discovery:
        # 指定Config Server在服务发现中的service Id ,默认为configserver
        service-id: ARTISAN-CONFIG
        # 表示使用服务发现组件中的Config Server,而不自己指定Config Server的uri,默认为false
        enabled: true
      profile: home
    # 修复github webhook 只能刷新config server 无法刷新config client的问题
    bus:
      #Workaround for defect in https://github.com/spring-cloud/spring-cloud-bus/issues/124
      id: ${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.cloud.config.profile:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}}
#Eureka
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8762/eureka/



通过config server访问下远端的Git配置文件

http://localhost:9898/artisan-product-dev.yml


20190413200339191.png


OK,访问正常,说明配置读取没问题。


启动artisan product微服务,查看下Eureka 上的注册情况


2019041320132052.png


成功注册上来了,8080端口


接入消息队列

Step1 引入依赖

这里我们选择使用 spring-boot-starter-amqp


    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>


Step2 配置RabbitMQ的信息

我们放到远端的Git上

spring:
 rabbitmq:
    host: 192.168.31.34
    password: guest
    port: 5672
    username: guest


通过config server访问下 http://localhost:9898/artisan-product-dev.yml


2019041320194941.png

OK ,说明配置没有问题 .


商品扣减完成后通知订单模块创建订单


Step1 分析

流程如下:

商品----------->消息队列<---------------订单

商品在扣减完成之后,通知消息队列,订单模块订阅消息队列处理请求。

那我们先看下商品扣减的方法原来的逻辑吧,不求一步到位,步步分析,逐步完善

Controller入口

 /**
     * 扣减库存
     * 提供给Order微服务用
     *
     * @param decreaseStockInputList
     * @return
     */
    @PostMapping("/decreseProduct")
    private void  decreseProduct(@RequestBody List<DecreaseStockInput> decreaseStockInputList) {
        productService.decreaseProduct(decreaseStockInputList);
    }


调用ProductServiceImpl#decreaseProduct方法,按照设计,商品扣减完成以后就要发送消息到消息队列 。


为解决Spring Cloud实战-04将订单微服务与商品微服务分别拆分为多模块 中的问题一,我们把数据模型对应的实体类又封装了一层,因此发送给消息队列的对象,建议也是封装后的对象,所以使用ProductOutput 。


Step2 扣减库存方法中增加发送消息队列的代码

增加如下代码:

 // 发送消息队列
 ProductOutput productOutput = new ProductOutput();
 BeanUtils.copyProperties(product,productOutput);
 amqpTemplate.convertAndSend("productOutput",JsonUtil.toJson(productOutput));
 log.info("发送消息到MQ,内容为:{}",JsonUtil.toJson(productOutput));


为了方便观察,将productOutput转成了Json格式,建议这样做,万一有消息挤压的话,方便在RabbitMQ的管理页面查看挤压的消息。 这里我们使用了Jackson。 你选择其他的库也可以,比如我们前面用到的Gson 。


Step3 验证发送消息队列的数据


因为还没有写接收方,我们先在RabbitMQ中手工创建一个名为productOutput的消息队列,先看下是否正确。



20190413233008192.png


启动product 微服务, 在postman测试如下

[
    {
        "productId": "1",
        "productQuantity": 2
    }
]


先看下数据库中的数据 productId=1的商品,目前还剩下81件,我们这次扣除两件,应该剩余79件,待会验证下 。


20190413233545970.png

20190413233313419.png



发送请求,观察RabbitMQ 和 数据

DB 数据OK

20190413234003195.png


RabbitMQ 接收正常,只是没有被消费


20190413233944334.png


查看挤压的消息:

2019041323432025.png

到目前为止,起码消息发送到了消息队列,数据扣减正常。 下一步就是该在订单服务中去写消息接收方的业务逻辑了。


订单模块接收消息队列中的消息

Step1 开发消息接收类

package com.artisan.order.message;
import com.artisan.order.utils.JsonUtil;
import com.artisan.product.common.ProductOutput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ProductOutputReceive {
    // 自动创建productOutput队列
    @RabbitListener(queuesToDeclare = @Queue("productOutput"))
    public void process(String message) {
        // message --> ProductOutput
        ProductOutput productOutput = JsonUtil.JsonToBean(message, ProductOutput.class);
        log.info("接收到的消息为:{}",productOutput);
    }
}


Step2 验证


我们在上面的消息接收方中使用了@RabbitListener(queuesToDeclare = @Queue("productOutput"))会自动创建消息队列,因此我们测试之前,把之前手工新建的productOutput删掉吧,验证下消息队列通过代码自动创建


启动后,在postman中发送消息,观察RabbitMQ和日志


20190414000559541.png


RabbitMQ:

20190414000256443.png


日志

2019-04-14 00:01:35.621  INFO 15660 --- [cTaskExecutor-1] c.a.order.message.ProductOutputReceive   : 接收到的消息为:ProductOutput(productId=1, productName=拿铁咖啡, productPrice=20.99, productStock=73, productDescription=咖啡,提神醒脑, productIcon=null, productStatus=0, categoryType=99)


纠正错误


上述虽然实现了发送消息到消息队列,并接收消息处理业务。但业务上有个比较大的逻辑错误,那就是前台传递的是个List啊,Product中扣减库存方法中 有异常的判断,如果前台买了3个商品, 但库存只有2个,数据如何保持一致?


建议: 将商品扣减的DB操作的部分作为一个整体,如果都成功了,将List发送到消息队列。 同样的Order微服务也要做下相应的调整


Product

@Override
    public void decreaseProduct(List<DecreaseStockInput> decreaseStockInputList) {
        List<Product> productList = operateProducts(decreaseStockInputList);
        List<ProductOutput> productOutputList = productList.stream().map(e -> {
            ProductOutput productOutput = new ProductOutput();
            BeanUtils.copyProperties(e, productOutput);
            return productOutput;
        }).collect(Collectors.toList());
        // 发送消息队列
        amqpTemplate.convertAndSend("productOutput", JsonUtil.toJson(productOutputList));
        log.info("发送消息到MQ,内容为:{}", JsonUtil.toJson(productOutputList));
    }
    // 因为是对List操作,所以加个事务控制
    @Transactional
    public List<Product> operateProducts(List<DecreaseStockInput> decreaseStockInputList) {
        List<Product> productList = new ArrayList<>();
        // 遍历DecreaseStockInput
        for (DecreaseStockInput decreaseStockInput : decreaseStockInputList) {
            // 根据productId查询Product
            Optional<Product> productOptional = productRepository.findById(decreaseStockInput.getProductId());
            // 商品是否存在
            if (!productOptional.isPresent()) {
                throw new ProductException(ResultEnum.PRODUCT_NOT_EXIST);
            }
            // 是否库存充足
            Product product = productOptional.get();
            int leftStock = product.getProductStock() - decreaseStockInput.getProductQuantity();
            if (leftStock < 0) {
                throw new ProductException(ResultEnum.PRODUCT_STOCK_ERROR);
            }
            // 将剩余库存设置到product,并更新数据库
            product.setProductStock(leftStock);
            productRepository.save(product);
            productList.add(product);
        }
        return productList;
    }

Order

package com.artisan.order.message;
import com.artisan.order.utils.JsonUtil;
import com.artisan.product.common.ProductOutput;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class ProductOutputReceive {
    // 自动创建productOutput队列
    @RabbitListener(queuesToDeclare = @Queue("productOutput"))
    public void process(String message) {
        // message --> ProductOutput
//        ProductOutput productOutput = JsonUtil.JsonToBean(message, ProductOutput.class);
//        log.info("接收到的消息为:{}",productOutput);
        // message -> List<ProductOutput>
        List<ProductOutput> productOutputList = JsonUtil.JsonToList(message,new TypeReference<List<ProductOutput>>(){});
        log.info("接收到的消息为:{}",productOutputList);
    }
}


测试一把

[
    {
        "productId": "1",
        "productQuantity": 2
    },
    {
        "productId": "2",
        "productQuantity": 5
    }
]


20190414013426269.png


product的日志

2019-04-14 01:28:24.058  INFO 22272 --- [nio-8080-exec-1] c.a.p.service.impl.ProductServiceImpl    : 发送消息到MQ,内容为:[{"productId":"1","productName":"拿铁咖啡","productPrice":20.99,"productStock":71,"productDescription":"咖啡,提神醒脑","productIcon":null,"productStatus":0,"categoryType":99},{"productId":"2","productName":"青岛纯生","productPrice":7.50,"productStock":180,"productDescription":"啤酒","productIcon":null,"productStatus":0,"categoryType":98}]


order的日志:

2019-04-14 01:28:24.086  INFO 18036 --- [cTaskExecutor-1] c.a.order.message.ProductOutputReceive   : 接收到的消息为:[ProductOutput(productId=1, productName=拿铁咖啡, productPrice=20.99, productStock=71, productDescription=咖啡,提神醒脑, productIcon=null, productStatus=0, categoryType=99), ProductOutput(productId=2, productName=青岛纯生, productPrice=7.50, productStock=180, productDescription=啤酒, productIcon=null, productStatus=0, categoryType=98)]


消息接收方接收到消息后,比如可以把 List<ProductOutput>信息放到redis里,查询商品服务的话,就可以从redis中查询了。


JackSon 的操作


这里我们选择了JackSon 来操作JavaBean和Json之间的互转,当然了你也可以选择其他的API,比如我们上次用的Gson。。


https://blog.csdn.net/qq_37936542/article/details/79268402

package com.artisan.order.utils;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class JsonUtil {
    private static ObjectMapper objectMapper = new ObjectMapper();
    /**
     * 对象转换为json字符串
     *
     * @param object
     * @return
     */
    public static String toJson(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * json 转 javabean
     * @param jsonStr
     * @param objClass
     * @param <T>
     * @return
     */
    public static <T> T JsonToBean(String jsonStr, Class<T> objClass) {
        try {
            return objectMapper.readValue(jsonStr, objClass);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     *  json 转 List
     * @param jsonStr
     * @param typeReference
     * @param <T>
     * @return
     */
    public static <T> T JsonToList(String jsonStr, TypeReference typeReference) {
        try {
            return objectMapper.readValue(jsonStr, typeReference);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}


代码


https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order


https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan-product

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
负载均衡 监控 Java
Spring Cloud Gateway 全解析:路由配置、断言规则与过滤器实战指南
本文详细介绍了 Spring Cloud Gateway 的核心功能与实践配置。首先讲解了网关模块的创建流程,包括依赖引入(gateway、nacos 服务发现、负载均衡)、端口与服务发现配置,以及路由规则的设置(需注意路径前缀重复与优先级 order)。接着深入解析路由断言,涵盖 After、Before、Path 等 12 种内置断言的参数、作用及配置示例,并说明了自定义断言的实现方法。随后重点阐述过滤器机制,区分路由过滤器(如 AddRequestHeader、RewritePath、RequestRateLimiter 等)与全局过滤器的作用范围与配置方式,提
Spring Cloud Gateway 全解析:路由配置、断言规则与过滤器实战指南
|
4月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
739 3
|
2月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
Spring Boot 3.x 微服务架构实战指南
|
2月前
|
XML Java 测试技术
《深入理解Spring》:IoC容器核心原理与实战
Spring IoC通过控制反转与依赖注入实现对象间的解耦,由容器统一管理Bean的生命周期与依赖关系。支持XML、注解和Java配置三种方式,结合作用域、条件化配置与循环依赖处理等机制,提升应用的可维护性与可测试性,是现代Java开发的核心基石。
|
3月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
209 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
7月前
|
人工智能 Java API
Spring AI 实战|Spring AI入门之DeepSeek调用
本文介绍了Spring AI框架如何帮助Java开发者轻松集成和使用大模型API。文章从Spring AI的初探开始,探讨了其核心能力及应用场景,包括手动与自动发起请求、流式响应实现打字机效果,以及兼容不同AI服务(如DeepSeek、通义千问)的方法。同时,还详细讲解了如何在生产环境中添加监控以优化性能和成本管理。通过Spring AI,开发者可以简化大模型调用流程,降低复杂度,为企业智能应用开发提供强大支持。最后,文章展望了Spring AI在未来AI时代的重要作用,鼓励开发者积极拥抱这一技术变革。
2612 71
Spring AI 实战|Spring AI入门之DeepSeek调用
|
4月前
|
人工智能 监控 安全
如何快速上手【Spring AOP】?核心应用实战(上篇)
哈喽大家好吖~欢迎来到Spring AOP系列教程的上篇 - 应用篇。在本篇,我们将专注于Spring AOP的实际应用,通过具体的代码示例和场景分析,帮助大家掌握AOP的使用方法和技巧。而在后续的下篇中,我们将深入探讨Spring AOP的实现原理和底层机制。 AOP(Aspect-Oriented Programming,面向切面编程)是Spring框架中的核心特性之一,它能够帮助我们解决横切关注点(如日志记录、性能统计、安全控制、事务管理等)的问题,提高代码的模块化程度和复用性。
|
5月前
|
人工智能 安全 Java
Spring Boot 中使用 Function 和异步线程池处理列表拆分任务并汇总结果
在Java开发中,处理大规模数据时常常需要将列表拆分为多个子列表进行异步处理并汇总结果。本文介绍如何在Spring Boot中使用Function和异步线程池实现高效且可维护的代码,涵盖结果封装、线程池配置、列表拆分处理及结果汇总等关键步骤。
242 0