实战:基于CQRS微服务通信
Axon Framework是一个适用于Java的、基于事件驱动的轻量级CQRS框架,既支持直接持久化Aggregate状态,也支持采用EventSourcing。
Axon Framework的应用架构如图9-6所示。
图9-6 Axon Framework应用架构
本节,我们将基于Axon Framework来实现一个CQRS应用“axon-cqrs”。该应用展示了:“开通银行账户,取钱”这样一个逻辑业务。
配置
配置依赖如下。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring.version>5.2.3.RELEASE</spring.version> <axon.version>3.4.3</axon.version> <log4j.version>2.13.0</log4j.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId><artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.axonframework</groupId> <artifactId>axon-core</artifactId> <version>${axon.version}</version> </dependency> <dependency> <groupId>org.axonframework</groupId> <artifactId>axon-spring</artifactId> <version>${axon.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> </dependency> </dependencies>
其中,采用了Axon Framework以及日志框架。
Aggregate
BankAccount是DDD中的Aggregate,代表了银行账户。
package com.waylau.axon.cqrs.command.aggregates; import static org.axonframework.commandhandling.model.AggregateLifecycle.apply; import static org.slf4j.LoggerFactory.getLogger; import java.math.BigDecimal; import org.axonframework.commandhandling.CommandHandler; import org.axonframework.commandhandling.model.AggregateIdentifier; import org.axonframework.eventhandling.EventHandler; import org.slf4j.Logger; import com.waylau.axon.cqrs.command.commands.CreateAccountCommand; import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand; import com.waylau.axon.cqrs.common.domain.AccountId; import com.waylau.axon.cqrs.common.events.CreateAccountEvent; import com.waylau.axon.cqrs.common.events.WithdrawMoneyEvent; public class BankAccount { private static final Logger LOGGER = getLogger(BankAccount.class);@AggregateIdentifier private AccountId accountId; private String accountName; private BigDecimal balance; public BankAccount() { } @CommandHandler public BankAccount(CreateAccountCommand command){ LOGGER.debug("Construct a new BankAccount"); apply(new CreateAccountEvent(command.getAccountId(), command.getAccountName(), command.getAmount())); } @CommandHandler public void handle(WithdrawMoneyCommand command){ apply(new WithdrawMoneyEvent(command.getAccountId(), command.getAmount())); } @EventHandler public void on(CreateAccountEvent event){ this.accountId = event.getAccountId(); this.accountName = event.getAccountName(); this.balance = new BigDecimal(event.getAmount()); LOGGER.info("Account {} is created with balance {}", accountId, this.balance); } @EventHandler public void on(WithdrawMoneyEvent event){ BigDecimal result = this.balance.subtract( new BigDecimal(event.getAmount())); if(result.compareTo(BigDecimal.ZERO)<0) LOGGER.error("Cannot withdraw more money than the balance!"); else { this.balance = result; LOGGER.info("Withdraw {} from account {}, balance result: {}", event.getAmount(), accountId, balance); } } }
其中,注解@CommandHandler和@EventHandler代表了对Command和Event的处理。@AggregateIdentifier代表的AccountId,是一个Aggregate的全局唯一的标识符。
AccountId声明如下。
package com.waylau.axon.cqrs.common.domain; import org.axonframework.common.Assert; import org.axonframework.common.IdentifierFactory; import java.io.Serializable; public class AccountId implements Serializable {private static final long serialVersionUID = 7119961474083133148L; private final String identifier; private final int hashCode; public AccountId() { this.identifier = IdentifierFactory.getInstance().generateIdentifier(); this.hashCode = identifier.hashCode(); } public AccountId(String identifier) { Assert.notNull(identifier, ()->"Identifier may not be null"); this.identifier = identifier; this.hashCode = identifier.hashCode(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AccountId accountId = (AccountId) o; return identifier.equals(accountId.identifier); } @Override public int hashCode() { return hashCode; } @Override public String toString() { return identifier; } }
其中:
·实现equal和hashCode方法,因为它会被拿来与其他标识对比。
·实现toString方法,其结果也应该是全局唯一的。
·实现Serializable接口以表明可序列化。
Command
该应用共有两个Command:创建账户和取钱。
CreateAccountCommand代表了创建账户。
package com.waylau.axon.cqrs.command.commands; import com.waylau.axon.cqrs.common.domain.AccountId; public class CreateAccountCommand { private AccountId accountId; private String accountName; private long amount;public CreateAccountCommand(AccountId accountId, String accountName, long amount) { this.accountId = accountId; this.accountName = accountName; this.amount = amount; } public AccountId getAccountId() { return accountId; } public String getAccountName() { return accountName; } public long getAmount() { return amount; } }
WithdrawMoneyCommand代表了取钱。
package com.waylau.axon.cqrs.command.commands; import org.axonframework.commandhandling.TargetAggregateIdentifier; import com.waylau.axon.cqrs.common.domain.AccountId; public class WithdrawMoneyCommand { @TargetAggregateIdentifier private AccountId accountId; private long amount; public WithdrawMoneyCommand(AccountId accountId, long amount) { this.accountId = accountId; this.amount = amount; } public AccountId getAccountId() { return accountId; } public long getAmount() { return amount; } }
Event
Event是系统中发生任何改变时产生的事件类,典型的Event就是对Aggregate状态的修改。与Command相对应,会有两个事件。
CreateAccountEvent代表了创建账户的Event。
package com.waylau.axon.cqrs.common.events;import com.waylau.axon.cqrs.common.domain.AccountId; public class CreateAccountEvent { private AccountId accountId; private String accountName; private long amount; public CreateAccountEvent(AccountId accountId, String accountName, long amount) { this.accountId = accountId; this.accountName = accountName; this.amount = amount; } public AccountId getAccountId() { return accountId; } public String getAccountName() { return accountName; } public long getAmount() { return amount; } } WithdrawMoneyEvent代表了取钱的Event。 package com.waylau.axon.cqrs.common.events; import com.waylau.axon.cqrs.common.domain.AccountId; public class WithdrawMoneyEvent { private AccountId accountId; private long amount; public WithdrawMoneyEvent(AccountId accountId, long amount) { this.accountId = accountId; this.amount = amount; } public AccountId getAccountId() { return accountId; } public long getAmount() { return amount; } }
测试
为了方便测试,我们创建一个Application类。
package com.waylau.axon.cqrs; import static org.slf4j.LoggerFactory.getLogger; import org.axonframework.config.Configuration; import org.axonframework.config.DefaultConfigurer;import org.axonframework.eventsourcing.eventstore.inmemory. InMemoryEventStorageEngine; import org.slf4j.Logger; import com.waylau.axon.cqrs.command.aggregates.BankAccount; import com.waylau.axon.cqrs.command.commands.CreateAccountCommand; import com.waylau.axon.cqrs.command.commands.WithdrawMoneyCommand; import com.waylau.axon.cqrs.common.domain.AccountId; public class Application { private static final Logger LOGGER = getLogger(Application.class); public static void main(String args[]) throws InterruptedException{ LOGGER.info("Application is start."); Configuration config = DefaultConfigurer.defaultConfiguration() .configureAggregate(BankAccount.class) .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine()) .buildConfiguration(); config.start(); AccountId id = new AccountId(); config.commandGateway().send(new CreateAccountCommand(id, "MyAccount",1000)); config.commandGateway().send(new WithdrawMoneyCommand(id, 500)); config.commandGateway().send(new WithdrawMoneyCommand(id, 500)); config.commandGateway().send(new WithdrawMoneyCommand(id, 500)); // 线程先睡5s,等事件处理完 Thread.sleep(5000L); LOGGER.info("Application is shutdown."); } }
在该类中,我们创建了一个账户,并执行了3次取钱动作。由于账户总额为1000,所以,当执行第3次取钱动作时,预期是会因为余额不足而失败。
运行该Application类,能看到以下输出。
22:11:52.432 [main] INFO com.waylau.axon.cqrs.Application - Application is start. 22:11:52.676 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000 22:11:52.719 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000 22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500 22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000 22:11:52.725 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500 22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 0 22:11:52.727 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Account cc0653ff-afbd-49f5-b868-38aff296611c is created with balance 1000 22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 500 22:11:52.728 [main] INFO com.waylau.axon.cqrs.command.aggregates.BankAccount – Withdraw 500 from account cc0653ff-afbd-49f5-b868-38aff296611c, balance result: 022:11:52.728 [main] ERROR com.waylau.axon.cqrs.command.aggregates.BankAccount - Cannot withdraw more money than the balance! 22:11:57.729 [main] INFO com.waylau.axon.cqrs.Application - Application is shutdown.
本节示例,可以在axon-cqrs项目下找到。
本章小结
本章介绍了微服务架构的概念及构建微服务常用的技术。同时,也介绍了在微服务中常用的3种通信方式:HTTP、消息、事件驱动。在微服务中,我们可以使用CQRS来降低构建微服务通信的复杂度。
本文给大家讲解的内容是分布式系统开发实战:微服务架构,实战:基于CQRS微服务通信
本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。