实战_Spring_Cloud(1)https://developer.aliyun.com/article/1543981
服务提供端改造
- shopping-product项目中,把原先的application.yml文件重命名为bootstrap.yml,并配置Eureka Server地址、应用名称、Config的实例名称。服务启动后,会链接Eureka Server服务器,根据Config的实例名称找到对应的Config服务器,并根据实例名称(可以增加profile属性)来匹配配置文件。
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服务注册地址 spring: application: name: shopping-product #应用名称 cloud: config: discovery: enabled: true service-id: config-server
- 之前服务端其余的配置,填写在github配置项目shopping-product.yml文件中
spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC jpa: show-sql: true database-platform: org.hibernate.dialect.MySQLDialect server: port: 11100
- 同样,shopping-order项目也如此改造,最后我们启动所有的服务,看是否都能正常启动。
配置动态刷新
- 首先,在
shopping-product.yml
增加一个配置属性来进行测试
env: dev
- 新建一个测试controller来绑定这个配置属性,并提供api来返回属性的值
@RestController @RefreshScope @RequestMapping("api/env") public class EnvController { @Value("${env}") private String env; @RequestMapping public String printEnv() { return env; } }
- 访问http://localhost:11100/api/env,返回当前的值dev。
Spring Cloud Config 在项目启动时加载配置内容这一机制,但是如果我们修改配置文件内容后,不会自动刷新。例如我们上面的项目,当服务已经启动的时候,去修改 github 上的配置文件内容,这时候,再次刷新页面,对不起,还是旧的配置内容,新内容不会主动刷新过来。那应该怎么去触发配置信息的动态刷新呢? - 它提供了一个刷新机制,但是需要我们主动触发。那就是 @RefreshScope 注解并结合 actuator ,注意要引入 spring-boot-starter-actuator 包。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
- EnvController上增加
@RefreshScope
注解 - 发送 POST 请求到 http://localhost:11100/actuator/refresh 这个接口,默认没有开放endpoint的权限,所以这块我们首先配置开放权限
management: endpoints: web: exposure: include: "*"
- 这时调用接口结束后,我们看到接口返回消息,表明env这个属性值已经刷新
[ "config.client.version", "env" ]
- 再次访问http://localhost:11100/api/env,返回当前的值就是修改后的值test,证明配置属性的值已经动态刷新,我们的程序也不用再次启动。
配置 Webhook
每次改了配置后,就用 postman 访问一下 refresh 接口,还是不够方便。 github 提供了一种 webhook 的方式,当有代码变更的时候,会调用我们设置的地址,来实现我们想达到的目的。
- 进入 github 仓库配置页面,选择 Webhooks ,并点击 add webhook;
- 填上回调的地址
也就是上面提到的 actuator/refresh 这个地址,但是必须保证这个地址是可以被 github 访问到的。这样每当github上修改了配置文件,就自动通知对应的hook地址自动刷新。
小结
整体项目结构如下:
spring-cloud-app
--config-server(统一配置中心)
--eureka-server(服务注册中心)
--shopping-common(购物公共模块)
--shopping-product(商品服务模块)
--shopping-order(订单服务模块)
更新系统架构,新建config-server节点,也向eureka-server注册,相关服务注册节点根据配置实例名称,路由到config-server节点,动态的加载配置。
异步消息(Stream)
应用场景
1、异步处理
比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。
2、应用解耦:
假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。
3、流量削峰
比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
4、日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq ,本文以rabbitmq 为例。
当前项目场景
分析目前shopping-order项目中,创建订单的代码如下:
/** * 创建订单 * */ @Transactional public String Create(OrderInput orderInput) throws Exception { //扣库存 ResultVo result1=productClient.decreaseStock(orderInput.getOrderItemInputs()); if (result1.getCode() != 0) throw new Exception("调用订单扣减库存接口出错:" + result1.getMsg()); //构建订单主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默认值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //构建订单明细 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("调用订单查询接口出错:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //订单金额总计 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); return orderMaster.getOrderId(); }
创建订单的同时,先调用商品接口扣减库存,如果占用库存成功,再生成订单。这样的话,生成订单的操作和占用商品库存的操作其实是耦合在一起的。在实际电商高并发、高流量的情况下,我们很少这么做。所以,我们要将业务解耦,实现订单和扣减库存的异步处理。
大体思路如下:生成订单==》通知商品调用库存==》商品占用库存==》通知订单占用成功==》更新订单占用库存状态
stream-rabbit集成
shopping-order、shopping-product项目中
- 首先引入stream-rabbit依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
- application.yml中作相应的配置:
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest
- 消息接口定义
public interface StreamClient { String INPUT = "myMessage"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); }
- 接收端处理逻辑
@Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { @StreamListener(value = StreamClient.INPUT) public void process(OrderInput orderInput) { log.info("StreamReceiver: {}", orderInput); } }
- 发送端处理逻辑
@RestController @RequestMapping("api/v1/stream") @Slf4j public class StreamController { private final StreamClient streamClient; @Autowired public StreamController(StreamClient streamClient) { this.streamClient = streamClient; } @GetMapping("/sendMessage") public void sendMessage() { OrderInput orderInput=new OrderInput(); orderInput.setBuyerName("小王"); orderInput.setBuyerPhone("15011111111"); orderInput.setBuyerAddress("姥姥家"); orderInput.setBuyerOpenid("11111"); streamClient.output().send(MessageBuilder.withPayload(orderInput).build()); } }
启动应用程序,测试发送接口,发现spring-cloud-stream帮我们自动创建了一个队列,消息发送到这个队列,然后被接收端消费。
此时,如果我们启动多个shopping-product服务实例,会有个问题,如果发送端发送一条消息,会被2个实例同时消费,在正常的业务中,这种情况是应该避免的。所以我们需要对消息进行分组,在application.yml中增加如下配置,保证只有一个服务实例来消费。
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest cloud: stream: bindings: myMessage: group: shopping-order content-type: application/json
改造Order和Product项目
shopping-order作为库存占用命令的消息发送者,首先向shopping-product发送消息stock_apply(占用库存申请),shopping-product接收此消息进行库存处理,然后将库存占用处理的结果作为消息stock_result(占用库存结果)发送,shopping-order端再收到结果消息对订单状态进行更新。
- shopping-order配置:
spring: cloud: stream: bindings: stock_apply_output: #占用库存申请 destination: stock.apply stock_result_input: #占用库存结果 destination: stock.result group: shopping-order
- shopping-product配置:
spring: cloud: stream: bindings: stock_apply_input: #占用库存申请 destination: stock.apply group: shopping-product stock_result_output: #占用库存结果 destination: stock.result
- shopping-order定义channel
public interface OrderStream { String STOCK_APPLY_OUTPUT = "stock_apply_output"; @Output(OrderStream.STOCK_APPLY_OUTPUT) MessageChannel stockApplyOutput(); String STOCK_RESULT_INPUT = "stock_result_input"; @Input(OrderStream.STOCK_RESULT_INPUT) SubscribableChannel stockResultInput(); }
- shopping-product定义channel
public interface ProductStream { String STOCK_APPLY_INPUT = "stock_apply_input"; @Input(ProductStream.STOCK_APPLY_INPUT) SubscribableChannel stockApplyInput(); String STOCK_RESULT_OUTPUT = "stock_result_output"; @Output(ProductStream.STOCK_RESULT_OUTPUT) MessageChannel stockResultOutput(); }
- shopping-order发送库存申请消息
/** * 创建订单 */ @Transactional public String Create(OrderInput orderInput) throws Exception { //构建订单主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默认值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //构建订单明细 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("调用订单查询接口出错:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //订单金额总计 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); //扣库存 StockApplyInput stockApplyInput = new StockApplyInput(); stockApplyInput.setOrderId(orderMaster.getOrderId()); stockApplyInput.setOrderItemInputs(orderInput.getOrderItemInputs()); orderStream.stockApplyOutput().send(MessageBuilder.withPayload(stockApplyInput).build()); return orderMaster.getOrderId(); }
- shopping-product处理库存申请消息,并发送库存处理结果
@Service @Slf4j @EnableBinding(ProductStream.class) public class ProductService { private final ProductInfoRepository productInfoRepository; private final ProductCategoryRepository productCategoryRepository; @Autowired public ProductService(ProductInfoRepository productInfoRepository, ProductCategoryRepository productCategoryRepository) { this.productInfoRepository = productInfoRepository; this.productCategoryRepository = productCategoryRepository; } /** * 扣减库存 * */ @Transactional @StreamListener(ProductStream.STOCK_APPLY_INPUT) @SendTo(ProductStream.STOCK_RESULT_OUTPUT) public StockResultOutput processStockApply(StockApplyInput stockApplyInput) throws Exception { log.info("占用库存消息被消费..."); StockResultOutput stockResultOutput = new StockResultOutput(); stockResultOutput.setOrderId(stockApplyInput.getOrderId()); try { for (OrderItemInput orderItemInput : stockApplyInput.getOrderItemInputs()) { Optional<ProductInfo> productInfoOptional = productInfoRepository.findById(orderItemInput.getProductId()); if (!productInfoOptional.isPresent()) throw new Exception("商品不存在."); ProductInfo productInfo = productInfoOptional.get(); int result = productInfo.getProductStock() - orderItemInput.getProductQuantity(); if (result < 0) throw new Exception("商品库存不满足."); productInfo.setProductStock(result); productInfoRepository.save(productInfo); } stockResultOutput.setIsSuccess(true); stockResultOutput.setMessage("OK"); return stockResultOutput; } catch (Exception e) { stockResultOutput.setIsSuccess(false); stockResultOutput.setMessage(e.getMessage()); return stockResultOutput; } } }
- shopping-order处理库存处理结果
@StreamListener(OrderStream.STOCK_RESULT_INPUT) public void processStockResult(StockResultOutput stockResultOutput) { log.info("库存消息返回" + stockResultOutput); Optional<OrderMaster> optionalOrderMaster = orderMasterRepository.findById(stockResultOutput.getOrderId()); if (optionalOrderMaster.isPresent()) { OrderMaster orderMaster = optionalOrderMaster.get(); if (stockResultOutput.getIsSuccess()) { orderMaster.setOrderStatus(OrderStatus.OCCUPY_SUCCESS); } else { orderMaster.setOrderStatus(OrderStatus.OCCUPY_FAILURE); } orderMasterRepository.save(orderMaster); } }
执行调试结果,跟踪执行结果:生成订单同时发送库存申请命令,商品模块处理库存申请成功后,返回库存占用结果告知订单模块,从而实现订单生成和商品库存占用的逻辑的解耦。
小结
在原有的架构基础上,我们对商品和订单服务进行了应用解耦,库存占用逻辑异步化,通过消息队列传递消息,并结合spring cloud stream对消息input和output绑定,使得在程序中很方便的进行消息发送和接收处理。
实战_Spring_Cloud(3)https://developer.aliyun.com/article/1543985