分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

实战:基于CQRS微服务通信

Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

Axon Framework的应用架构如图9-6所示。

网络异常,图片无法展示
|

图9-6 Axon Framework应用架构

本节,我们将基于Axon Framework来实现一个CQRS应用“axon-cqrs”。该应用展示了:“开通银行账户,取钱”这样一个逻辑业务。

配置

配置依赖如下。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.2.3.RELEASE</spring.version>
<axon.version>3.4.3</axon.version>
<log4j.version>2.13.0</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId><artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

其中,采用了Axon Framework以及日志框架。

Aggregate

BankAccount是DDD中的Aggregate,代表了银行账户。

package com.waylau.axon.cqrs.command.aggregates;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.slf4j.LoggerFactory.getLogger;
import java.math.BigDecimal;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
import com.waylau.axon.cqrs.common.events.CreateAccountEvent;
import com.waylau.axon.cqrs.common.events.WithdrawMoneyEvent;
public class BankAccount {
private static final Logger LOGGER = getLogger(BankAccount.class);@AggregateIdentifier
private AccountId accountId;
private String accountName;
private BigDecimal balance;
public BankAccount() {
}
@CommandHandler
public BankAccount(CreateAccountCommand command){
LOGGER.debug("Construct a new BankAccount");
apply(new CreateAccountEvent(command.getAccountId(),
command.getAccountName(),
command.getAmount()));
}
@CommandHandler
public void handle(WithdrawMoneyCommand command){
apply(new WithdrawMoneyEvent(command.getAccountId(),
command.getAmount()));
}
@EventHandler
public void on(CreateAccountEvent event){
this.accountId = event.getAccountId();
this.accountName = event.getAccountName();
this.balance = new BigDecimal(event.getAmount());
LOGGER.info("Account {} is created with balance {}",
accountId,
this.balance);
}
@EventHandler
public void on(WithdrawMoneyEvent event){
BigDecimal result = this.balance.subtract(
new BigDecimal(event.getAmount()));
if(result.compareTo(BigDecimal.ZERO)<0)
LOGGER.error("Cannot withdraw more money than the balance!");
else {
this.balance = result;
LOGGER.info("Withdraw {} from account {}, balance result: {}",
event.getAmount(), accountId, balance);
}
}
}

其中,注解@CommandHandler和@EventHandler代表了对Command和Event的处理。@AggregateIdentifier代表的AccountId,是一个Aggregate的全局唯一的标识符。

AccountId声明如下。

package com.waylau.axon.cqrs.common.domain;
import org.axonframework.common.Assert;
import org.axonframework.common.IdentifierFactory;
import java.io.Serializable;
public class AccountId implements Serializable {private static final long serialVersionUID = 7119961474083133148L;
private final String identifier;
private final int hashCode;
public AccountId() {
this.identifier =
IdentifierFactory.getInstance().generateIdentifier();
this.hashCode = identifier.hashCode();
}
public AccountId(String identifier) {
Assert.notNull(identifier, ()->"Identifier may not be null");
this.identifier = identifier;
this.hashCode = identifier.hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccountId accountId = (AccountId) o;
return identifier.equals(accountId.identifier);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public String toString() {
return identifier;
}
}

其中:

·实现equal和hashCode方法,因为它会被拿来与其他标识对比。

·实现toString方法,其结果也应该是全局唯一的。

·实现Serializable接口以表明可序列化。

Command

该应用共有两个Command:创建账户和取钱。

CreateAccountCommand代表了创建账户。

package com.waylau.axon.cqrs.command.commands;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountCommand {
private AccountId accountId;
private String accountName;
private long amount;public CreateAccountCommand(AccountId accountId,
String accountName,
long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}

WithdrawMoneyCommand代表了取钱。

package com.waylau.axon.cqrs.command.commands;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyCommand {
@TargetAggregateIdentifier
private AccountId accountId;
private long amount;
public WithdrawMoneyCommand(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

Event

Event是系统中发生任何改变时产生的事件类,典型的Event就是对Aggregate状态的修改。与Command相对应,会有两个事件。

CreateAccountEvent代表了创建账户的Event。

package com.waylau.axon.cqrs.common.events;import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountEvent {
private AccountId accountId;
private String accountName;
private long amount;
public CreateAccountEvent(AccountId accountId,
String accountName, long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}
WithdrawMoneyEvent代表了取钱的Event。
package com.waylau.axon.cqrs.common.events;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyEvent {
private AccountId accountId;
private long amount;
public WithdrawMoneyEvent(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

测试

为了方便测试,我们创建一个Application类。

package com.waylau.axon.cqrs;
import static org.slf4j.LoggerFactory.getLogger;
import org.axonframework.config.Configuration;
import org.axonframework.config.DefaultConfigurer;import org.axonframework.eventsourcing.eventstore.inmemory.
InMemoryEventStorageEngine;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.aggregates.BankAccount;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class Application {
private static final Logger LOGGER = getLogger(Application.class);
public static void main(String args[]) throws InterruptedException{
LOGGER.info("Application is start.");
Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(BankAccount.class)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();
AccountId id = new AccountId();
config.commandGateway().send(new CreateAccountCommand(id, "MyAccount",1000));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
// 线程先睡5s,等事件处理完
Thread.sleep(5000L);
LOGGER.info("Application is shutdown.");
}
}

在该类中,我们创建了一个账户,并执行了3次取钱动作。由于账户总额为1000,所以,当执行第3次取钱动作时,预期是会因为余额不足而失败。

运行该Application类,能看到以下输出。

22:11:52.432 [main] INFO com.waylau.axon.cqrs.Application - Application is start.
22:11:52.676 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.719 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 0
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 022:11:52.728 [main] ERROR com.waylau.axon.cqrs.command.aggregates.BankAccount - Cannot
withdraw more money than the balance!
22:11:57.729 [main] INFO com.waylau.axon.cqrs.Application - Application is shutdown.

本节示例,可以在axon-cqrs项目下找到。

本章小结

本章介绍了微服务架构的概念及构建微服务常用的技术。同时,也介绍了在微服务中常用的3种通信方式:HTTP、消息、事件驱动。在微服务中,我们可以使用CQRS来降低构建微服务通信的复杂度。

本文给大家讲解的内容是分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关文章
|
2月前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
64 2
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3月前
|
存储 JSON 数据库
Elasticsearch 分布式架构解析
【9月更文第2天】Elasticsearch 是一个分布式的搜索和分析引擎,以其高可扩展性和实时性著称。它基于 Lucene 开发,但提供了更高级别的抽象,使得开发者能够轻松地构建复杂的搜索应用。本文将深入探讨 Elasticsearch 的分布式存储和检索机制,解释其背后的原理及其优势。
208 5
|
3天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
20 8
|
4月前
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
87 0
|
2月前
|
运维 供应链 安全
SD-WAN分布式组网:构建高效、灵活的企业网络架构
本文介绍了SD-WAN(软件定义广域网)在企业分布式组网中的应用,强调其智能化流量管理、简化的网络部署、弹性扩展能力和增强的安全性等核心优势,以及在跨国企业、多云环境、零售连锁和制造业中的典型应用场景。通过合理设计网络架构、选择合适的网络连接类型、优化应用流量优先级和定期评估网络性能等最佳实践,SD-WAN助力企业实现高效、稳定的业务连接,加速数字化转型。
SD-WAN分布式组网:构建高效、灵活的企业网络架构
|
2月前
|
消息中间件 运维 数据库
架构设计之解析CQRS架构模式!
架构设计之解析CQRS架构模式!
架构设计之解析CQRS架构模式!
|
2月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
2月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。