分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

简介: Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

实战:基于CQRS微服务通信

Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

Axon Framework的应用架构如图9-6所示。

网络异常,图片无法展示
|

图9-6 Axon Framework应用架构

本节,我们将基于Axon Framework来实现一个CQRS应用“axon-cqrs”。该应用展示了:“开通银行账户,取钱”这样一个逻辑业务。

配置

配置依赖如下。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.2.3.RELEASE</spring.version>
<axon.version>3.4.3</axon.version>
<log4j.version>2.13.0</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId><artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

其中,采用了Axon Framework以及日志框架。

Aggregate

BankAccount是DDD中的Aggregate,代表了银行账户。

package com.waylau.axon.cqrs.command.aggregates;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.slf4j.LoggerFactory.getLogger;
import java.math.BigDecimal;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
import com.waylau.axon.cqrs.common.events.CreateAccountEvent;
import com.waylau.axon.cqrs.common.events.WithdrawMoneyEvent;
public class BankAccount {
private static final Logger LOGGER = getLogger(BankAccount.class);@AggregateIdentifier
private AccountId accountId;
private String accountName;
private BigDecimal balance;
public BankAccount() {
}
@CommandHandler
public BankAccount(CreateAccountCommand command){
LOGGER.debug("Construct a new BankAccount");
apply(new CreateAccountEvent(command.getAccountId(),
command.getAccountName(),
command.getAmount()));
}
@CommandHandler
public void handle(WithdrawMoneyCommand command){
apply(new WithdrawMoneyEvent(command.getAccountId(),
command.getAmount()));
}
@EventHandler
public void on(CreateAccountEvent event){
this.accountId = event.getAccountId();
this.accountName = event.getAccountName();
this.balance = new BigDecimal(event.getAmount());
LOGGER.info("Account {} is created with balance {}",
accountId,
this.balance);
}
@EventHandler
public void on(WithdrawMoneyEvent event){
BigDecimal result = this.balance.subtract(
new BigDecimal(event.getAmount()));
if(result.compareTo(BigDecimal.ZERO)<0)
LOGGER.error("Cannot withdraw more money than the balance!");
else {
this.balance = result;
LOGGER.info("Withdraw {} from account {}, balance result: {}",
event.getAmount(), accountId, balance);
}
}
}

其中,注解@CommandHandler和@EventHandler代表了对Command和Event的处理。@AggregateIdentifier代表的AccountId,是一个Aggregate的全局唯一的标识符。

AccountId声明如下。

package com.waylau.axon.cqrs.common.domain;
import org.axonframework.common.Assert;
import org.axonframework.common.IdentifierFactory;
import java.io.Serializable;
public class AccountId implements Serializable {private static final long serialVersionUID = 7119961474083133148L;
private final String identifier;
private final int hashCode;
public AccountId() {
this.identifier =
IdentifierFactory.getInstance().generateIdentifier();
this.hashCode = identifier.hashCode();
}
public AccountId(String identifier) {
Assert.notNull(identifier, ()->"Identifier may not be null");
this.identifier = identifier;
this.hashCode = identifier.hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccountId accountId = (AccountId) o;
return identifier.equals(accountId.identifier);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public String toString() {
return identifier;
}
}

其中:

·实现equal和hashCode方法,因为它会被拿来与其他标识对比。

·实现toString方法,其结果也应该是全局唯一的。

·实现Serializable接口以表明可序列化。

Command

该应用共有两个Command:创建账户和取钱。

CreateAccountCommand代表了创建账户。

package com.waylau.axon.cqrs.command.commands;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountCommand {
private AccountId accountId;
private String accountName;
private long amount;public CreateAccountCommand(AccountId accountId,
String accountName,
long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}

WithdrawMoneyCommand代表了取钱。

package com.waylau.axon.cqrs.command.commands;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyCommand {
@TargetAggregateIdentifier
private AccountId accountId;
private long amount;
public WithdrawMoneyCommand(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

Event

Event是系统中发生任何改变时产生的事件类,典型的Event就是对Aggregate状态的修改。与Command相对应,会有两个事件。

CreateAccountEvent代表了创建账户的Event。

package com.waylau.axon.cqrs.common.events;import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountEvent {
private AccountId accountId;
private String accountName;
private long amount;
public CreateAccountEvent(AccountId accountId,
String accountName, long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}
WithdrawMoneyEvent代表了取钱的Event。
package com.waylau.axon.cqrs.common.events;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyEvent {
private AccountId accountId;
private long amount;
public WithdrawMoneyEvent(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

测试

为了方便测试,我们创建一个Application类。

package com.waylau.axon.cqrs;
import static org.slf4j.LoggerFactory.getLogger;
import org.axonframework.config.Configuration;
import org.axonframework.config.DefaultConfigurer;import org.axonframework.eventsourcing.eventstore.inmemory.
InMemoryEventStorageEngine;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.aggregates.BankAccount;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class Application {
private static final Logger LOGGER = getLogger(Application.class);
public static void main(String args[]) throws InterruptedException{
LOGGER.info("Application is start.");
Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(BankAccount.class)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();
AccountId id = new AccountId();
config.commandGateway().send(new CreateAccountCommand(id, "MyAccount",1000));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
// 线程先睡5s,等事件处理完
Thread.sleep(5000L);
LOGGER.info("Application is shutdown.");
}
}

在该类中,我们创建了一个账户,并执行了3次取钱动作。由于账户总额为1000,所以,当执行第3次取钱动作时,预期是会因为余额不足而失败。

运行该Application类,能看到以下输出。

22:11:52.432 [main] INFO com.waylau.axon.cqrs.Application - Application is start.
22:11:52.676 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.719 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 0
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 022:11:52.728 [main] ERROR com.waylau.axon.cqrs.command.aggregates.BankAccount - Cannot
withdraw more money than the balance!
22:11:57.729 [main] INFO com.waylau.axon.cqrs.Application - Application is shutdown.

本节示例,可以在axon-cqrs项目下找到。

本章小结

本章介绍了微服务架构的概念及构建微服务常用的技术。同时,也介绍了在微服务中常用的3种通信方式:HTTP、消息、事件驱动。在微服务中,我们可以使用CQRS来降低构建微服务通信的复杂度。

本文给大家讲解的内容是分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关文章
|
6月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
1067 3
|
4月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
4月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
Spring Boot 3.x 微服务架构实战指南
|
4月前
|
负载均衡 Java API
《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
|
8月前
|
缓存 负载均衡 监控
微服务架构下的电商API接口设计:策略、方法与实战案例
本文探讨了微服务架构下的电商API接口设计,旨在打造高效、灵活与可扩展的电商系统。通过服务拆分(如商品、订单、支付等模块)和标准化设计(RESTful或GraphQL风格),确保接口一致性与易用性。同时,采用缓存策略、负载均衡及限流技术优化性能,并借助Prometheus等工具实现监控与日志管理。微服务架构的优势在于支持敏捷开发、高并发处理和独立部署,满足电商业务快速迭代需求。未来,电商API设计将向智能化与安全化方向发展。
505 102
|
6月前
|
负载均衡 监控 Java
微服务稳定性三板斧:熔断、限流与负载均衡全面解析(附 Hystrix-Go 实战代码)
在微服务架构中,高可用与稳定性至关重要。本文详解熔断、限流与负载均衡三大关键技术,结合API网关与Hystrix-Go实战,帮助构建健壮、弹性的微服务系统。
677 1
微服务稳定性三板斧:熔断、限流与负载均衡全面解析(附 Hystrix-Go 实战代码)
|
7月前
|
IDE Java API
Java 17 新特性与微服务开发的实操指南
本内容涵盖Java 11至Java 17最新特性实战,包括var关键字、字符串增强、模块化系统、Stream API、异步编程、密封类等,并提供图书管理系统实战项目,帮助开发者掌握现代Java开发技巧与工具。
353 1
|
8月前
|
NoSQL Java 微服务
2025 年最新 Java 面试从基础到微服务实战指南全解析
《Java面试实战指南:高并发与微服务架构解析》 本文针对Java开发者提供2025版面试技术要点,涵盖高并发电商系统设计、微服务架构实现及性能优化方案。核心内容包括:1)基于Spring Cloud和云原生技术的系统架构设计;2)JWT认证、Seata分布式事务等核心模块代码实现;3)数据库查询优化与高并发处理方案,响应时间从500ms优化至80ms;4)微服务调用可靠性保障方案。文章通过实战案例展现Java最新技术栈(Java 17/Spring Boot 3.2)的应用.
699 9
|
8月前
|
Java API 微服务
Java 21 与 Spring Boot 3.2 微服务开发从入门到精通实操指南
《Java 21与Spring Boot 3.2微服务开发实践》摘要: 本文基于Java 21和Spring Boot 3.2最新特性,通过完整代码示例展示了微服务开发全流程。主要内容包括:1) 使用Spring Initializr初始化项目,集成Web、JPA、H2等组件;2) 配置虚拟线程支持高并发;3) 采用记录类优化DTO设计;4) 实现JPA Repository与Stream API数据访问;5) 服务层整合虚拟线程异步处理和结构化并发;6) 构建RESTful API并使用Springdoc生成文档。文中特别演示了虚拟线程配置(@Async)和StructuredTaskSco
968 0
|
6月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
461 2