Command Query Responsibility Segregation,CQRS 这个架构好象最近博客园里讨论得比较多,有几篇园友的文章很有深度,推荐阅读:
比较有趣的是,以往一断谈及架构思路、OO这些,往往都是java大佬们的专长,而CQRS这个话题,好象.NET占了上风。园友汤雪华的ENODE开源大作,在github上人气也很旺。
于是,我逆向思路搜索了下java的类似项目,果然有一个AxonFramework,甚至还有一个专门的网站。按文档上的介绍,弄了一个hello world,记录一下:
CRQS是基于事件驱动的,其主要架构并不复杂,见下图:
简单来讲,对数据库的修改操作,UI层只管发送各种命令(Command),触发事件(Event),然后由EventHandler去异步处理,最终写入master DB,对于数据库的查询,则查询slave DB(注:这里的master db, slave db只是一个逻辑上的区分,可以是真正的主-从库,也可以都是一个库)。 这样的架构,很容易实现读写分离,也易于大型项目的扩展。
项目结构:
package的名称上大概就能看出用途:
command包定义各种命令,
event包定义各种事件,
handler包定义事件处理逻辑,
model包相当于领域模型
最外层的ToDOItemRunner相当于应用程序入口。
gradle依赖项:
group 'yjmyzz' version '1.0' apply plugin: 'java' apply plugin: 'application' sourceCompatibility = 1.8 repositories { mavenLocal() maven { url 'http://maven.oschina.net/content/groups/public/' } mavenCentral() } dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile "org.axonframework:axon:2.4.3" compile "org.axonframework:axon-core:2.4.3" compile "org.axonframework:axon-test:2.4.3" compile 'org.springframework:spring-core:4.2.3.RELEASE' compile 'org.springframework:spring-beans:4.2.3.RELEASE' compile 'org.springframework:spring-context:4.2.3.RELEASE' compile 'org.springframework:spring-context-support:4.2.3.RELEASE' compile 'org.springframework:spring-aop:4.2.3.RELEASE' compile 'org.springframework:spring-test:4.2.3.RELEASE' compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.5' compile 'org.apache.logging.log4j:log4j-core:2.5' compile 'javax.persistence:persistence-api:2.1' } mainClassName='demo.axon.ToDoItemRunner'
command命令:
这里我们假设了二个命令:创建命令、完成命令
CreateToDoItemCommand
package demo.axon.command; import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier; public class CreateToDoItemCommand { @TargetAggregateIdentifier private final String todoId; private final String description; public CreateToDoItemCommand(String todoId, String description) { this.todoId = todoId; this.description = description; } public String getTodoId() { return todoId; } public String getDescription() { return description; } }
MarkCompletedCommand
package demo.axon.command; import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier; public class MarkCompletedCommand { @TargetAggregateIdentifier private final String todoId; public MarkCompletedCommand(String todoId) { this.todoId = todoId; } public String getTodoId() { return todoId; } }
Event事件:
ToDoItemCreatedEvent
package demo.axon.event; public class ToDoItemCreatedEvent { private final String todoId; private final String description; public ToDoItemCreatedEvent(String todoId, String description) { this.todoId = todoId; this.description = description; } public String getTodoId() { return todoId; } public String getDescription() { return description; } }
ToDoItemCompletedEvent
package demo.axon.event; public class ToDoItemCompletedEvent { private final String todoId; public ToDoItemCompletedEvent(String todoId) { this.todoId = todoId; } public String getTodoId() { return todoId; } }
EventHandler事件处理
package demo.axon.handler; import demo.axon.event.ToDoItemCompletedEvent; import demo.axon.event.ToDoItemCreatedEvent; import org.axonframework.eventhandling.annotation.EventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ToDoEventHandler { Logger logger = LoggerFactory.getLogger(ToDoEventHandler.class); @EventHandler public void handle(ToDoItemCreatedEvent event) { logger.info("We've got something to do: " + event.getDescription() + " (" + event.getTodoId() + ")"); } @EventHandler public void handle(ToDoItemCompletedEvent event) { logger.info("We've completed a task: " + event.getTodoId()); } }
上面的代码只是演示,将事件信息输出而已,真实应用中,这里可以完成对db的更新操作。
领域模型model
package demo.axon.model; import demo.axon.command.CreateToDoItemCommand; import demo.axon.command.MarkCompletedCommand; import demo.axon.event.ToDoItemCompletedEvent; import demo.axon.event.ToDoItemCreatedEvent; import org.axonframework.commandhandling.annotation.CommandHandler; import org.axonframework.eventhandling.annotation.EventHandler; import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot; import org.axonframework.eventsourcing.annotation.AggregateIdentifier; public class ToDoItem extends AbstractAnnotatedAggregateRoot { @AggregateIdentifier private String id; public ToDoItem() { } @CommandHandler public ToDoItem(CreateToDoItemCommand command) { apply(new ToDoItemCreatedEvent(command.getTodoId(), command.getDescription())); } @EventHandler public void on(ToDoItemCreatedEvent event) { this.id = event.getTodoId(); } @CommandHandler public void markCompleted(MarkCompletedCommand command) { apply(new ToDoItemCompletedEvent(id)); } }
然后让Spring将这些东西串在一起,配置文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:axon="http://www.axonframework.org/schema/core" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 6 http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core-2.0.xsd"> 7 8 <axon:command-bus id="commandBus"/> 9 <axon:event-bus id="eventBus"/> 10 11 <axon:event-sourcing-repository id="toDoRepository" 12 aggregate-type="demo.axon.model.ToDoItem"/> 13 14 <axon:aggregate-command-handler id="toDoItemHandler" 15 aggregate-type="demo.axon.model.ToDoItem" 16 repository="toDoRepository" 17 command-bus="commandBus"/> 18 19 <axon:filesystem-event-store id="eventStore" base-dir="events"/> 20 21 <axon:annotation-config /> 22 23 <bean class="demo.axon.handler.ToDoEventHandler"/> 24 25 <bean class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean"> 26 <property name="commandBus" ref="commandBus"/> 27 </bean> 28 29 </beans>
最后,提供一个舞台,让整个应用run起来:
package demo.axon; import demo.axon.command.CreateToDoItemCommand; import demo.axon.command.MarkCompletedCommand; import org.axonframework.commandhandling.gateway.CommandGateway; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.UUID; public class ToDoItemRunner { private CommandGateway commandGateway; public ToDoItemRunner(CommandGateway commandGateway) { this.commandGateway = commandGateway; } public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml"); ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class)); runner.run(); } private void run() { final String itemId = UUID.randomUUID().toString(); commandGateway.send(new CreateToDoItemCommand(itemId, "Need to do this")); commandGateway.send(new MarkCompletedCommand(itemId)); } }
输出结果:
12:01:36,113 <demo.axon.handler.ToDoEventHandler> INFO [main]: We've got something to do: Need to do this (3126f293-67fd-4bb7-b152-069acba775b6) 12:01:36,114 <org.axonframework.commandhandling.callbacks.LoggingCallback> INFO [main]: Command executed successfully: demo.axon.command.CreateToDoItemCommand 12:01:36,205 <demo.axon.handler.ToDoEventHandler> INFO [main]: We've completed a task: 3126f293-67fd-4bb7-b152-069acba775b6 12:01:36,206 <org.axonframework.commandhandling.callbacks.LoggingCallback> INFO [main]: Command executed successfully: demo.axon.command.MarkCompletedCommand
axon框架测试也很容易:
package test.demo.axon; import demo.axon.command.CreateToDoItemCommand; import demo.axon.command.MarkCompletedCommand; import demo.axon.event.ToDoItemCompletedEvent; import demo.axon.event.ToDoItemCreatedEvent; import demo.axon.model.ToDoItem; import org.axonframework.test.FixtureConfiguration; import org.axonframework.test.Fixtures; import org.junit.Before; import org.junit.Test; public class ToDoItemTest { private FixtureConfiguration fixture; @Before public void setUp() throws Exception { fixture = Fixtures.newGivenWhenThenFixture(ToDoItem.class); } @Test public void testCreateToDoItem() throws Exception { fixture.given() .when(new CreateToDoItemCommand("todo1", "need to implement the aggregate")) .expectEvents(new ToDoItemCreatedEvent("todo1", "need to implement the aggregate")); } @Test public void testMarkToDoItemAsCompleted() throws Exception { fixture.given(new ToDoItemCreatedEvent("todo1", "need to implement the aggregate")) .when(new MarkCompletedCommand("todo1")) .expectEvents(new ToDoItemCompletedEvent("todo1")); } }
given/when/expectEvents的意思是,给(given)一个事件,然后当(when)某个命令被调用时,期待(expectEvents)某个事件被触发。
最后 github上还有一个比较复杂的示例项目:https://github.com/AxonFramework/Axon-trader,想深入了解的可以研究下