分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

简介: Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

实战:基于CQRS微服务通信

Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。

Axon Framework的应用架构如图9-6所示。

网络异常,图片无法展示
|

图9-6 Axon Framework应用架构

本节,我们将基于Axon Framework来实现一个CQRS应用“axon-cqrs”。该应用展示了:“开通银行账户,取钱”这样一个逻辑业务。

配置

配置依赖如下。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.2.3.RELEASE</spring.version>
<axon.version>3.4.3</axon.version>
<log4j.version>2.13.0</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId><artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

其中,采用了Axon Framework以及日志框架。

Aggregate

BankAccount是DDD中的Aggregate,代表了银行账户。

package com.waylau.axon.cqrs.command.aggregates;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import static org.slf4j.LoggerFactory.getLogger;
import java.math.BigDecimal;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
import com.waylau.axon.cqrs.common.events.CreateAccountEvent;
import com.waylau.axon.cqrs.common.events.WithdrawMoneyEvent;
public class BankAccount {
private static final Logger LOGGER = getLogger(BankAccount.class);@AggregateIdentifier
private AccountId accountId;
private String accountName;
private BigDecimal balance;
public BankAccount() {
}
@CommandHandler
public BankAccount(CreateAccountCommand command){
LOGGER.debug("Construct a new BankAccount");
apply(new CreateAccountEvent(command.getAccountId(),
command.getAccountName(),
command.getAmount()));
}
@CommandHandler
public void handle(WithdrawMoneyCommand command){
apply(new WithdrawMoneyEvent(command.getAccountId(),
command.getAmount()));
}
@EventHandler
public void on(CreateAccountEvent event){
this.accountId = event.getAccountId();
this.accountName = event.getAccountName();
this.balance = new BigDecimal(event.getAmount());
LOGGER.info("Account {} is created with balance {}",
accountId,
this.balance);
}
@EventHandler
public void on(WithdrawMoneyEvent event){
BigDecimal result = this.balance.subtract(
new BigDecimal(event.getAmount()));
if(result.compareTo(BigDecimal.ZERO)<0)
LOGGER.error("Cannot withdraw more money than the balance!");
else {
this.balance = result;
LOGGER.info("Withdraw {} from account {}, balance result: {}",
event.getAmount(), accountId, balance);
}
}
}

其中,注解@CommandHandler和@EventHandler代表了对Command和Event的处理。@AggregateIdentifier代表的AccountId,是一个Aggregate的全局唯一的标识符。

AccountId声明如下。

package com.waylau.axon.cqrs.common.domain;
import org.axonframework.common.Assert;
import org.axonframework.common.IdentifierFactory;
import java.io.Serializable;
public class AccountId implements Serializable {private static final long serialVersionUID = 7119961474083133148L;
private final String identifier;
private final int hashCode;
public AccountId() {
this.identifier =
IdentifierFactory.getInstance().generateIdentifier();
this.hashCode = identifier.hashCode();
}
public AccountId(String identifier) {
Assert.notNull(identifier, ()->"Identifier may not be null");
this.identifier = identifier;
this.hashCode = identifier.hashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccountId accountId = (AccountId) o;
return identifier.equals(accountId.identifier);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public String toString() {
return identifier;
}
}

其中:

·实现equal和hashCode方法,因为它会被拿来与其他标识对比。

·实现toString方法,其结果也应该是全局唯一的。

·实现Serializable接口以表明可序列化。

Command

该应用共有两个Command:创建账户和取钱。

CreateAccountCommand代表了创建账户。

package com.waylau.axon.cqrs.command.commands;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountCommand {
private AccountId accountId;
private String accountName;
private long amount;public CreateAccountCommand(AccountId accountId,
String accountName,
long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}

WithdrawMoneyCommand代表了取钱。

package com.waylau.axon.cqrs.command.commands;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyCommand {
@TargetAggregateIdentifier
private AccountId accountId;
private long amount;
public WithdrawMoneyCommand(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

Event

Event是系统中发生任何改变时产生的事件类,典型的Event就是对Aggregate状态的修改。与Command相对应,会有两个事件。

CreateAccountEvent代表了创建账户的Event。

package com.waylau.axon.cqrs.common.events;import com.waylau.axon.cqrs.common.domain.AccountId;
public class CreateAccountEvent {
private AccountId accountId;
private String accountName;
private long amount;
public CreateAccountEvent(AccountId accountId,
String accountName, long amount) {
this.accountId = accountId;
this.accountName = accountName;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public String getAccountName() {
return accountName;
}
public long getAmount() {
return amount;
}
}
WithdrawMoneyEvent代表了取钱的Event。
package com.waylau.axon.cqrs.common.events;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class WithdrawMoneyEvent {
private AccountId accountId;
private long amount;
public WithdrawMoneyEvent(AccountId accountId, long amount) {
this.accountId = accountId;
this.amount = amount;
}
public AccountId getAccountId() {
return accountId;
}
public long getAmount() {
return amount;
}
}

测试

为了方便测试,我们创建一个Application类。

package com.waylau.axon.cqrs;
import static org.slf4j.LoggerFactory.getLogger;
import org.axonframework.config.Configuration;
import org.axonframework.config.DefaultConfigurer;import org.axonframework.eventsourcing.eventstore.inmemory.
InMemoryEventStorageEngine;
import org.slf4j.Logger;
import com.waylau.axon.cqrs.command.aggregates.BankAccount;
import com.waylau.axon.cqrs.command.commands.CreateAccountCommand;
import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand;
import com.waylau.axon.cqrs.common.domain.AccountId;
public class Application {
private static final Logger LOGGER = getLogger(Application.class);
public static void main(String args[]) throws InterruptedException{
LOGGER.info("Application is start.");
Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(BankAccount.class)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();
AccountId id = new AccountId();
config.commandGateway().send(new CreateAccountCommand(id, "MyAccount",1000));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
config.commandGateway().send(new WithdrawMoneyCommand(id, 500));
// 线程先睡5s,等事件处理完
Thread.sleep(5000L);
LOGGER.info("Application is shutdown.");
}
}

在该类中,我们创建了一个账户,并执行了3次取钱动作。由于账户总额为1000,所以,当执行第3次取钱动作时,预期是会因为余额不足而失败。

运行该Application类,能看到以下输出。

22:11:52.432 [main] INFO com.waylau.axon.cqrs.Application - Application is start.
22:11:52.676 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.719 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 0
22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500
22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount –
Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 022:11:52.728 [main] ERROR com.waylau.axon.cqrs.command.aggregates.BankAccount - Cannot
withdraw more money than the balance!
22:11:57.729 [main] INFO com.waylau.axon.cqrs.Application - Application is shutdown.

本节示例,可以在axon-cqrs项目下找到。

本章小结

本章介绍了微服务架构的概念及构建微服务常用的技术。同时,也介绍了在微服务中常用的3种通信方式:HTTP、消息、事件驱动。在微服务中,我们可以使用CQRS来降低构建微服务通信的复杂度。

本文给大家讲解的内容是分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。

相关文章
|
7月前
|
人工智能 监控 前端开发
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
支付宝「AI 出行助手」是一款集成公交、地铁、火车票、机票、打车等多项功能的智能出行产品。
1169 21
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
|
5月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
5月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
1038 2
Spring Boot 3.x 微服务架构实战指南
|
6月前
|
消息中间件 数据采集 NoSQL
秒级行情推送系统实战:从触发、采集到入库的端到端架构
本文设计了一套秒级实时行情推送系统,涵盖触发、采集、缓冲、入库与推送五层架构,结合动态代理IP、Kafka/Redis缓冲及WebSocket推送,实现金融数据低延迟、高并发处理,适用于股票、数字货币等实时行情场景。
875 3
秒级行情推送系统实战:从触发、采集到入库的端到端架构
|
6月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
785 7
|
7月前
|
负载均衡 监控 Java
微服务稳定性三板斧:熔断、限流与负载均衡全面解析(附 Hystrix-Go 实战代码)
在微服务架构中,高可用与稳定性至关重要。本文详解熔断、限流与负载均衡三大关键技术,结合API网关与Hystrix-Go实战,帮助构建健壮、弹性的微服务系统。
754 1
微服务稳定性三板斧:熔断、限流与负载均衡全面解析(附 Hystrix-Go 实战代码)
|
6月前
|
JSON 供应链 监控
1688商品详情API技术深度解析:从接口架构到数据融合实战
1688商品详情API(item_get接口)可通过商品ID获取标题、价格、库存、SKU等核心数据,适用于价格监控、供应链管理等场景。支持JSON格式返回,需企业认证。Python示例展示如何调用接口获取商品信息。
|
6月前
|
消息中间件 缓存 监控
中间件架构设计与实践:构建高性能分布式系统的核心基石
摘要 本文系统探讨了中间件技术及其在分布式系统中的核心价值。作者首先定义了中间件作为连接系统组件的&quot;神经网络&quot;,强调其在数据传输、系统稳定性和扩展性中的关键作用。随后详细分类了中间件体系,包括通信中间件(如RabbitMQ/Kafka)、数据中间件(如Redis/MyCAT)等类型。文章重点剖析了消息中间件的实现机制,通过Spring Boot代码示例展示了消息生产者的完整实现,涵盖消息ID生成、持久化、批量发送及重试机制等关键技术点。最后,作者指出中间件架构设计对系统性能的决定性影响,