实战_Spring_Cloud(2)

简介: 实战_Spring_Cloud

实战_Spring_Cloud(1)https://developer.aliyun.com/article/1543981

服务提供端改造

  • shopping-product项目中,把原先的application.yml文件重命名为bootstrap.yml,并配置Eureka Server地址、应用名称、Config的实例名称。服务启动后,会链接Eureka Server服务器,根据Config的实例名称找到对应的Config服务器,并根据实例名称(可以增加profile属性)来匹配配置文件。
eureka:
  client:
    serviceUrl:
      defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服务注册地址
 
spring:
  application:
    name: shopping-product  #应用名称
  cloud:
    config:
      discovery:
        enabled: true
        service-id: config-server
  • 之前服务端其余的配置,填写在github配置项目shopping-product.yml文件中
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: 123456
    url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
  jpa:
    show-sql: true
    database-platform: org.hibernate.dialect.MySQLDialect
 
server:
  port: 11100
  • 同样,shopping-order项目也如此改造,最后我们启动所有的服务,看是否都能正常启动。

配置动态刷新

  • 首先,在shopping-product.yml增加一个配置属性来进行测试
env: dev
  • 新建一个测试controller来绑定这个配置属性,并提供api来返回属性的值
@RestController
@RefreshScope
@RequestMapping("api/env")
public class EnvController {
 
    @Value("${env}")
    private String env;
 
    @RequestMapping
    public String printEnv() {
        return env;
    }
}
  • 访问http://localhost:11100/api/env,返回当前的值dev。
    Spring Cloud Config 在项目启动时加载配置内容这一机制,但是如果我们修改配置文件内容后,不会自动刷新。例如我们上面的项目,当服务已经启动的时候,去修改 github 上的配置文件内容,这时候,再次刷新页面,对不起,还是旧的配置内容,新内容不会主动刷新过来。那应该怎么去触发配置信息的动态刷新呢?
  • 它提供了一个刷新机制,但是需要我们主动触发。那就是 @RefreshScope 注解并结合 actuator ,注意要引入 spring-boot-starter-actuator 包。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  • EnvController上增加@RefreshScope注解
  • 发送 POST 请求到 http://localhost:11100/actuator/refresh 这个接口,默认没有开放endpoint的权限,所以这块我们首先配置开放权限
management:
  endpoints:
    web:
      exposure:
        include: "*"
  • 这时调用接口结束后,我们看到接口返回消息,表明env这个属性值已经刷新
[
    "config.client.version",
    "env"
]
  • 再次访问http://localhost:11100/api/env,返回当前的值就是修改后的值test,证明配置属性的值已经动态刷新,我们的程序也不用再次启动。

配置 Webhook

每次改了配置后,就用 postman 访问一下 refresh 接口,还是不够方便。 github 提供了一种 webhook 的方式,当有代码变更的时候,会调用我们设置的地址,来实现我们想达到的目的。

  • 进入 github 仓库配置页面,选择 Webhooks ,并点击 add webhook;
  • 填上回调的地址
    也就是上面提到的 actuator/refresh 这个地址,但是必须保证这个地址是可以被 github 访问到的。这样每当github上修改了配置文件,就自动通知对应的hook地址自动刷新。

小结

整体项目结构如下:

spring-cloud-app

--config-server(统一配置中心)

--eureka-server(服务注册中心)

--shopping-common(购物公共模块)

--shopping-product(商品服务模块)

--shopping-order(订单服务模块)

更新系统架构,新建config-server节点,也向eureka-server注册,相关服务注册节点根据配置实例名称,路由到config-server节点,动态的加载配置。

异步消息(Stream

应用场景

1、异步处理

比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。

2、应用解耦:

假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。

3、流量削峰

比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。

4、日志处理

kafka 最开始就是专门为了处理日志产生的。

当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq ,本文以rabbitmq 为例。

当前项目场景

分析目前shopping-order项目中,创建订单的代码如下:

/**
     * 创建订单
     *
     */
@Transactional
public String Create(OrderInput orderInput) throws Exception {
 
    //扣库存
    ResultVo result1=productClient.decreaseStock(orderInput.getOrderItemInputs());
    if (result1.getCode() != 0)
        throw new Exception("调用订单扣减库存接口出错:" + result1.getMsg());
 
    //构建订单主表
    OrderMaster orderMaster = new OrderMaster();
    BeanUtils.copyProperties(orderInput, orderMaster);
    //指定默认值
    orderMaster.setOrderId(KeyUtil.genUniqueKey("OM"));
    orderMaster.setOrderStatus(OrderStatus.NEW);
    orderMaster.setPayStatus(PayStatus.WAIT);
 
    //构建订单明细
    List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList());
    ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds));
    if (result2.getCode() != 0)
        throw new Exception("调用订单查询接口出错:" + result2.getMsg());
    List<ProductInfoOutput> productInfoOutputs = result2.getData();
 
    //订单金额总计
    BigDecimal total = new BigDecimal(BigInteger.ZERO);
    for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) {
        OrderDetail orderDetail = new OrderDetail();
        BeanUtils.copyProperties(orderItemInput, orderDetail);
 
        Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream()
            .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst();
 
        if (!productInfoOutputOptional.isPresent())
            throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId()));
 
        ProductInfoOutput productInfoOutput = productInfoOutputOptional.get();
        orderDetail.setDetailId(KeyUtil.genUniqueKey("OD"));
        orderDetail.setOrderId(orderMaster.getOrderId());
        orderDetail.setProductName(productInfoOutput.getProductName());
        orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity())));
        orderDetail.setProductIcon(productInfoOutput.getProductIcon());
        total = total.add(orderDetail.getProductPrice());
 
        orderDetailRepository.save(orderDetail);
    }
 
    orderMaster.setOrderAmount(total);
    orderMasterRepository.save(orderMaster);
    return orderMaster.getOrderId();
}

创建订单的同时,先调用商品接口扣减库存,如果占用库存成功,再生成订单。这样的话,生成订单的操作和占用商品库存的操作其实是耦合在一起的。在实际电商高并发、高流量的情况下,我们很少这么做。所以,我们要将业务解耦,实现订单和扣减库存的异步处理。

大体思路如下:生成订单==》通知商品调用库存==》商品占用库存==》通知订单占用成功==》更新订单占用库存状态

stream-rabbit集成

shopping-order、shopping-product项目中

  • 首先引入stream-rabbit依赖:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • application.yml中作相应的配置:
spring:
  rabbitmq:
    host: aliyun.host
    port: 5672
    username: guest
    password: guest
  • 消息接口定义
public interface StreamClient {
 
    String INPUT = "myMessage";
 
    @Input(StreamClient.INPUT)
    SubscribableChannel input();
 
    @Output(StreamClient.INPUT)
    MessageChannel output();
}
  • 接收端处理逻辑
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
 
    @StreamListener(value = StreamClient.INPUT)
    public void process(OrderInput orderInput) {
        log.info("StreamReceiver: {}", orderInput);
    }
}
  • 发送端处理逻辑
@RestController
@RequestMapping("api/v1/stream")
@Slf4j
public class StreamController {
 
    private final StreamClient streamClient;
 
    @Autowired
    public StreamController(StreamClient streamClient) {
        this.streamClient = streamClient;
    }
 
    @GetMapping("/sendMessage")
    public void sendMessage() {
        OrderInput orderInput=new OrderInput();
        orderInput.setBuyerName("小王");
        orderInput.setBuyerPhone("15011111111");
        orderInput.setBuyerAddress("姥姥家");
        orderInput.setBuyerOpenid("11111");
        streamClient.output().send(MessageBuilder.withPayload(orderInput).build());
    }
}

启动应用程序,测试发送接口,发现spring-cloud-stream帮我们自动创建了一个队列,消息发送到这个队列,然后被接收端消费。

此时,如果我们启动多个shopping-product服务实例,会有个问题,如果发送端发送一条消息,会被2个实例同时消费,在正常的业务中,这种情况是应该避免的。所以我们需要对消息进行分组,在application.yml中增加如下配置,保证只有一个服务实例来消费。

spring:
  rabbitmq:
    host: aliyun.host
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        myMessage:
          group: shopping-order
          content-type: application/json

改造Order和Product项目

shopping-order作为库存占用命令的消息发送者,首先向shopping-product发送消息stock_apply(占用库存申请),shopping-product接收此消息进行库存处理,然后将库存占用处理的结果作为消息stock_result(占用库存结果)发送,shopping-order端再收到结果消息对订单状态进行更新。

  • shopping-order配置:
spring:
  cloud:
    stream:
      bindings:
        stock_apply_output:           #占用库存申请
          destination: stock.apply
        stock_result_input:           #占用库存结果
          destination: stock.result
          group: shopping-order
  • shopping-product配置:
spring:
  cloud:
    stream:
      bindings:
        stock_apply_input:            #占用库存申请
          destination: stock.apply
          group: shopping-product
        stock_result_output:          #占用库存结果
          destination: stock.result
  • shopping-order定义channel
public interface OrderStream {
 
    String STOCK_APPLY_OUTPUT = "stock_apply_output";
    @Output(OrderStream.STOCK_APPLY_OUTPUT)
    MessageChannel stockApplyOutput();
 
    String STOCK_RESULT_INPUT = "stock_result_input";
    @Input(OrderStream.STOCK_RESULT_INPUT)
    SubscribableChannel stockResultInput();
}
  • shopping-product定义channel
public interface ProductStream {
 
    String STOCK_APPLY_INPUT = "stock_apply_input";
    @Input(ProductStream.STOCK_APPLY_INPUT)
    SubscribableChannel stockApplyInput();
 
    String STOCK_RESULT_OUTPUT = "stock_result_output";
    @Output(ProductStream.STOCK_RESULT_OUTPUT)
    MessageChannel stockResultOutput();
}
  • shopping-order发送库存申请消息
/**
     * 创建订单
     */
    @Transactional
    public String Create(OrderInput orderInput) throws Exception {
 
        //构建订单主表
        OrderMaster orderMaster = new OrderMaster();
        BeanUtils.copyProperties(orderInput, orderMaster);
        //指定默认值
        orderMaster.setOrderId(KeyUtil.genUniqueKey("OM"));
        orderMaster.setOrderStatus(OrderStatus.NEW);
        orderMaster.setPayStatus(PayStatus.WAIT);
 
        //构建订单明细
        List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList());
        ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds));
        if (result2.getCode() != 0)
            throw new Exception("调用订单查询接口出错:" + result2.getMsg());
        List<ProductInfoOutput> productInfoOutputs = result2.getData();
 
        //订单金额总计
        BigDecimal total = new BigDecimal(BigInteger.ZERO);
        for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) {
            OrderDetail orderDetail = new OrderDetail();
            BeanUtils.copyProperties(orderItemInput, orderDetail);
 
            Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream()
                    .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst();
 
            if (!productInfoOutputOptional.isPresent())
                throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId()));
 
            ProductInfoOutput productInfoOutput = productInfoOutputOptional.get();
            orderDetail.setDetailId(KeyUtil.genUniqueKey("OD"));
            orderDetail.setOrderId(orderMaster.getOrderId());
            orderDetail.setProductName(productInfoOutput.getProductName());
            orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity())));
            orderDetail.setProductIcon(productInfoOutput.getProductIcon());
            total = total.add(orderDetail.getProductPrice());
 
            orderDetailRepository.save(orderDetail);
        }
 
        orderMaster.setOrderAmount(total);
        orderMasterRepository.save(orderMaster);
 
        //扣库存
        StockApplyInput stockApplyInput = new StockApplyInput();
        stockApplyInput.setOrderId(orderMaster.getOrderId());
        stockApplyInput.setOrderItemInputs(orderInput.getOrderItemInputs());
        orderStream.stockApplyOutput().send(MessageBuilder.withPayload(stockApplyInput).build());
 
        return orderMaster.getOrderId();
    }
  • shopping-product处理库存申请消息,并发送库存处理结果
@Service
@Slf4j
@EnableBinding(ProductStream.class)
public class ProductService {
 
    private final ProductInfoRepository productInfoRepository;
    private final ProductCategoryRepository productCategoryRepository;
 
    @Autowired
    public ProductService(ProductInfoRepository productInfoRepository,
                          ProductCategoryRepository productCategoryRepository) {
        this.productInfoRepository = productInfoRepository;
        this.productCategoryRepository = productCategoryRepository;
    }
 
    /**
     * 扣减库存
     *
     */
    @Transactional
    @StreamListener(ProductStream.STOCK_APPLY_INPUT)
    @SendTo(ProductStream.STOCK_RESULT_OUTPUT)
    public StockResultOutput processStockApply(StockApplyInput stockApplyInput) throws Exception {
 
        log.info("占用库存消息被消费...");
        StockResultOutput stockResultOutput = new StockResultOutput();
        stockResultOutput.setOrderId(stockApplyInput.getOrderId());
 
        try {
            for (OrderItemInput orderItemInput : stockApplyInput.getOrderItemInputs()) {
 
                Optional<ProductInfo> productInfoOptional = productInfoRepository.findById(orderItemInput.getProductId());
                if (!productInfoOptional.isPresent())
                    throw new Exception("商品不存在.");
 
                ProductInfo productInfo = productInfoOptional.get();
                int result = productInfo.getProductStock() - orderItemInput.getProductQuantity();
                if (result < 0)
                    throw new Exception("商品库存不满足.");
 
                productInfo.setProductStock(result);
                productInfoRepository.save(productInfo);
            }
 
            stockResultOutput.setIsSuccess(true);
            stockResultOutput.setMessage("OK");
            return stockResultOutput;
        } catch (Exception e) {
            stockResultOutput.setIsSuccess(false);
            stockResultOutput.setMessage(e.getMessage());
            return stockResultOutput;
        }
 
    }
 
}
  • shopping-order处理库存处理结果
@StreamListener(OrderStream.STOCK_RESULT_INPUT)
public void processStockResult(StockResultOutput stockResultOutput) {
 
    log.info("库存消息返回" + stockResultOutput);
 
    Optional<OrderMaster> optionalOrderMaster = orderMasterRepository.findById(stockResultOutput.getOrderId());
    if (optionalOrderMaster.isPresent()) {
        OrderMaster orderMaster = optionalOrderMaster.get();
        if (stockResultOutput.getIsSuccess()) {
            orderMaster.setOrderStatus(OrderStatus.OCCUPY_SUCCESS);
        } else {
            orderMaster.setOrderStatus(OrderStatus.OCCUPY_FAILURE);
        }
        orderMasterRepository.save(orderMaster);
    }
}

执行调试结果,跟踪执行结果:生成订单同时发送库存申请命令,商品模块处理库存申请成功后,返回库存占用结果告知订单模块,从而实现订单生成和商品库存占用的逻辑的解耦。

小结

在原有的架构基础上,我们对商品和订单服务进行了应用解耦,库存占用逻辑异步化,通过消息队列传递消息,并结合spring cloud stream对消息input和output绑定,使得在程序中很方便的进行消息发送和接收处理。

实战_Spring_Cloud(3)https://developer.aliyun.com/article/1543985

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7天前
|
存储 Java Maven
Spring Boot WebFlux 增删改查完整实战 demo
Spring Boot WebFlux 增删改查完整实战 demo
|
1月前
|
Java 应用服务中间件 测试技术
深入探索Spring Boot Web应用源码及实战应用
【5月更文挑战第11天】本文将详细解析Spring Boot Web应用的源码架构,并通过一个实际案例,展示如何构建一个基于Spring Boot的Web应用。本文旨在帮助读者更好地理解Spring Boot的内部工作机制,以及如何利用这些机制优化自己的Web应用开发。
54 3
|
1月前
|
安全 Java 开发者
深入理解Spring Boot配置绑定及其实战应用
【4月更文挑战第10天】本文详细探讨了Spring Boot中配置绑定的核心概念,并结合实战示例,展示了如何在项目中有效地使用这些技术来管理和绑定配置属性。
29 1
|
1月前
|
Java 关系型数据库 数据库
Spring Boot多数据源及事务管理:概念与实战
【4月更文挑战第29天】在复杂的企业级应用中,经常需要访问和管理多个数据源。Spring Boot通过灵活的配置和强大的框架支持,可以轻松实现多数据源的整合及事务管理。本篇博客将探讨如何在Spring Boot中配置多数据源,并详细介绍事务管理的策略和实践。
93 3
|
1月前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
113 2
|
4天前
|
JSON 安全 Java
Spring Security 6.x 微信公众平台OAuth2授权实战
上一篇介绍了OAuth2协议的基本原理,以及Spring Security框架中自带的OAuth2客户端GitHub的实现细节,本篇以微信公众号网页授权登录为目的,介绍如何在原框架基础上定制开发OAuth2客户端。
24 4
Spring Security 6.x 微信公众平台OAuth2授权实战
|
6天前
|
存储 数据可视化 关系型数据库
|
6天前
|
算法 Java API
|
6天前
|
负载均衡 Java API
|
1月前
|
开发框架 监控 Java
深入探索Spring Boot的监控、管理和测试功能及实战应用
【5月更文挑战第14天】Spring Boot是一个快速开发框架,提供了一系列的功能模块,包括监控、管理和测试等。本文将深入探讨Spring Boot中监控、管理和测试功能的原理与应用,并提供实际应用场景的示例。
35 2