@iamfox
2015-05-08T17:15:46.000000Z
字数 25649
阅读 8661
Axon
DDD
学习
本文档改写自Axon框架官方手册,掺了不少私货,还改动了他们的代码例子。如果想看原版,用力戳这里。
我们做java web开发的人,高并发,大数据,高可用,高性能,一系列高大上的追求是我们的终极目标,但有幸步入这个领域的项目开发的人,恐怕还不到10%。
90%的人,都是些增删改查的搬砖工和跟数据库表、各种繁杂的技术组件斗争到底的苦力。
有没有那么些东西,是那10%,和那90%的人,都要同仇敌忾的东西?
有!业务,复杂的业务,多变的需求。
为了这些,多少开发人员和产品经理、市场部门、运营部门因此结下不共戴天之仇?需求的多变和软件质量的控制真是那么不可调和的矛盾吗?
那,假如有那么一个架构思想,专为了解决复杂多变的业务而生,将多种设计模式不动声色地融会贯通,那是不是一件很了不得的事?
DDD -- 领域驱动设计,它提出了一整套如何建立业务模型,划定模块边界及如何对其进行维护的理论,以避免所有的业务代码搅成一团,牵一发动全身的情况。DDD面世10多年来风靡了世界,却远没有在国内JAVA界普及开来,太多的人仍然坚信简单的MVC分层框架是万能钥匙,成了一种默守成规的固执。
CQRS -- 权责命令分离模式,简单说就是读写分离,对于写操作,有一连串的流程处理和追踪机制。对于查询,尽可能直接简单。读和写两条路线形成了完全不一样的处理过程,而不是像MVC三层一样都是一个service调一下dao完成。
Axon框架就是为了实现这两个东西而生。
很多人把网站的高质量完全寄托于新技术的运用,数据库的极致优化,乃至算法和JVM调优上,极端的甚至用存储过程全包。可是一切JAVA WEB项目都是以实现业务为目的的。当你千辛万苦将程序写到坚不可摧还有很高的性能,一个需求变更过来,你却瞠目结舌地发现程序很难再进行扩展和改版,前面大量工作都白干了要推翻重构,于是你就会把责任怪罪到业务人员头上,说他们违背当初的设计,无法实现。
业务,复杂的业务,多变的需求,不解决这个问题,再多的先进技术,再极致的优化,终究是无用功。
把对的程序调快容易,把快的程序调对很难
DDD的思想提出了一些概念,一些模式,让你的代码看起来更贴合自然语义,更贴近业务人员的思维,极力去消除开发人员和业务人员之间的代沟,另外非常重要的就是它界定了业务领域中的各种操作边界如何划分的原则,划定上下文范围,防止出现盘根错节的复杂调用关系。
Axon框架实现了这些思想,建立了一整套让你可以在专注于业务实现的同时,又能很简单地利用高级技术进行灵活处理的框架体系。
如果你不了解DDD,你也不用刻意去学习它,我会在介绍这个框架的过程中,同步去介绍DDD的思想和实践。
在此之前,我在用传统SSH主流框架渐进式地实践了近三年的DDD+CQRS,再一次去认识这个框架的心得。Axon框架没有试图去重新发明一个一栈式框架去推广什么极速开发。它是反其道而行的,它仍然可以和SSH框架共同运转,互相支持,不但没有简化,还在业务逻辑层扩展了大量的组件类,增加了代码量。
听起来是不是挺脑残的?你没给我简化代码还给我增加复杂性。但是转念想想,简单就是好吗?相对简单的摩托车能跑得过同级别的汽车吗?一台机的可扩展性和性能干得过台式机吗?那些总在说“先简单点实现,以后我们再逐步完善”,最后害我们掉进坑里花了不知几倍的力气也没把以前简单实现的后遗症治好的项目,难道不都还历历在目?
无论你使用多么简单的技术去开发软件,复杂性也是仍然存在的,只是被包装在了简单技术的内部,比如jquery之于javascript,你用起来简单了,但复杂的代码并没有消失,只是包起来了。DDD也是想把业务逻辑的复杂性包裹在内部,增加了业务逻辑代码量,却让业务的调用方和运营方受益,后续维护、调优、扩展都变成了简单的事。
Axon忠实地贯彻了DDD的思想,虽然第一次开发复杂度有所提升,但它所增加的工作量都是为了避免我们少走弯路,都是在处理一些我们以往一开始忽略,后来往往要费九牛二虎之力去填坑的东西。一旦做成,项目长远的可维护性可扩展性以及性能的优化点,都能极大地提升。
那现在,我们开始吧。
任何框架都免不了要有个快速入门的小例子。Axon是为了复杂业务而生的,如果快速入门还是个Hello World,那用起来也太可笑了。至少,应该看得出业务。
设想一个待办事项工具,你可以创建一个待办事项,还可以将它标记为完成,这就是我们快速入门的需求。
回忆下我们传统方式是怎么写的:
public class ToDoItem {
private String id;
private String remark;
private int status;
...
}
public class ToDoItemServiceImpl implements ToDoItemService {
private TodoItemDao toDoItemDao;
public void createToDoItem(String id, String remark) {
ToDoItem toDoItem = new ToDoItem();
toDoItem.setId(id);
toDoItem.setRemark(remark);
toDoItemDao.save(toDoItem);
}
public void changeToDoItemStatus(String id, int status) {
ToDoItem toDoItem = toDoItemDao.get(id);
toDoItemDao.setStatus(status);
toDoItemDao.update(toDoItem);
}
}
好了,简单快速,不过坑也不知不觉就挖下了,我们看看Axon是怎么做的,这个例子来自Axon官网,不过进行了一些更符合中国人阅读习惯的小修改。
首先,你得建两个命令Command
对象,一个是创建待办事项,一个是标记待办事项完成。
public class CreateToDoItemCommand {
@TargetAggregateIdentifier
private final String todoId;
private final String remark;
public CreateToDoItemCommand(String todoId, String remark) {
this.todoId = todoId;
this.remark = remark;
}
public String getTodoId() {
return todoId;
}
public String getRemark() {
return remark;
}
}
public class MarkCompletedCommand {
@TargetAggregateIdentifier
private final String todoId;
public MarkCompletedCommand(String todoId) {
this.todoId = todoId;
}
public String getTodoId() {
return todoId;
}
}
这两个命令对象代表了用户可以做两种操作,来改变数据状态。
然后你需要创建这两个命令在执行时,会触发的事件。如果一切正常,那 创建待办事项 这个操作会产生 待办事项被创建 事件,标记待办事项已完成 这个操作会产生待办事项已完成事件。
那我们建立这样的两个事件:
public class ToDoItemCreatedEvent {
private ToDoItem toDoItem;
public ToDoItemCreatedEvent(ToDoItem toDoItem) {
this.toDoItem = toDoItem;
}
public String getToDoItem() {
return toDoItem;
}
}
public class ToDoItemCompletedEvent {
private final String todoId;
public ToDoItemCompletedEvent(String todoId) {
this.todoId = todoId;
}
public String getTodoId() {
return todoId;
}
}
事件包含了哪些属性字段,是不一定的,主要是要能满足对事件的后续处理和记录。
现在我们有了实体,有了命令,有了事件,我们应该把它们串起来了:
public class ToDoItem extends AbstractAnnotatedAggregateRoot {
private String id;
private String remark;
private int status;
public final static Integer STATUS_UN_COMPLETE = 1;
public final static Integer STATUS_COMPLETE = 2;
/**
/ 处理命令
*/
@CommandHandler
public ToDoItem(CreateToDoItemCommand command) {
// 根据命令创建实体
setId(command.getId());
setRemark(command.getRemark());
setStatus(ToDoItem.STATUS_UN_COMPLETE);
// 事件产生
apply(new ToDoItemCreatedEvent(this);
}
/**
/ 处理命令
*/
@CommandHandler
public void markCompleted(MarkCompletedCommand command) {
if (getStatus().intValue() == ToDoItem.STATUS_UN_COMPLETE) {
setStatus(ToDoItem.STATUS_COMPLETE);
apply(new ToDoItemCompletedEvent(id));
} else {
// 抛出异常处理
}
}
/**
/ 处理事件
*/
@EventHandler
public void onCreate(ToDoItemCreatedEvent event) {
System.out.println("待办事项" + event.getToDoItem().getRemark() + "创建成功");
}
/**
/ 处理事件
*/
@EventHandler
public void onMarkComplete(ToDoItemCompletedEvent event) {
System.out.println("待办事项" + event.getId() + "已完成");
}
}
ps.现实项目中不会把那么多@CommandHandler和@EventHandler全写在实体类里,会太臃肿,可以移到其他组件类。
好了,让我们用junit来跑一跑:
public class ToDoItemTest {
private FixtureConfiguration fixture;
@Before
public void setUp() throws Exception {
fixture = Fixtures.newGivenWhenThenFixture(ToDoItem.class);
}
@Test
public void testCreateToDoItem() throws Exception {
ToDoItem testEntity = new ToDoItem();
testEntity.setId("todo1");
testEntity.setRemark("8点钟要上班");
testEntity.setStatus(1);
fixture.given()
.when(new CreateToDoItemCommand("todo1", "8点钟要上班"))
.expectEvents(new ToDoItemCreatedEvent(testEntity));
}
@Test
public void testMarkToDoItemAsCompleted() throws Exception {
ToDoItem testEntity = new ToDoItem();
testEntity.setId("todo1");
testEntity.setRemark("8点钟要上班");
testEntity.setStatus(1);
fixture.given(new ToDoItemCreatedEvent(testEntity))
.when(new MarkCompletedCommand("todo1"))
.expectEvents(new ToDoItemCompletedEvent("todo1"));
}
}
成功。
对于程序员来说,这样就算是运行通了,但对于业务人员来说就不是了,他们会希望能看到你在UI界面上操作成功给他看,当然建UI太费时间了,这里不会演示,但我们可以用更接近生产代码而不是测试用例的形式来表示一下。
public class ToDoItemRunner {
public static void main(String[] args) {
// 首先要有个命令总线
CommandBus commandBus = new SimpleCommandBus();
// 建一个命令网关以便为发送命令方提供友好的api
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
// 我们准备把事件存在文件里在"events/"文件夹下
EventStore eventStore = new FileSystemEventStore(new SimpleEventFileResolver(new File("./events")));
// 再来个简单的事件总线
EventBus eventBus = new SimpleEventBus();
// 配置一下事件仓储把事件存起来
EventSourcingRepository repository = new EventSourcingRepository(ToDoItem.class, eventStore);
repository.setEventBus(eventBus);
// 要让Axon知道哪个类能处理命令总线里的命令,订阅一下
AggregateAnnotationCommandHandler.subscribe(ToDoItem.class, repository, commandBus);
// 然后我们向命令总线发送一些命令
final String itemId = UUID.randomUUID().toString();
commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
// 到公司了...
commandGateway.send(new MarkCompletedCommand(itemId));
}
}
以上流程并不是常见的从数据库取出数据、修改再保存的操作过程,当然Axon也支持那样的过程,为了展示一些更高大上的架构思想,上例中采用了一种叫事件驱动或者叫事件溯源(Event Sourcing)的机制去做数据操作,如果你不了解这种机制,我们后面再说。
最后要展示的是,如果用spring会怎么样?
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:axon="http://www.axonframework.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core-2.0.xsd">
<axon:command-bus id="commandBus"/>
<axon:event-bus id="eventBus"/>
<axon:event-sourcing-repository id="toDoRepository"
aggregate-type="org.axonframework.test.sample.ToDoItem"/>
<axon:aggregate-command-handler id="toDoItemHandler"
aggregate-type="org.axonframework.test.sample.ToDoItem"
repository="toDoRepository"
command-bus="commandBus"/>
<axon:filesystem-event-store id="eventStore" base-dir="events"/>
<bean class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
<property name="commandBus" ref="commandBus"/>
</bean>
</beans>
然后代码还剩下这些:
public class ToDoItemRunner {
private CommandGateway commandGateway;
public ToDoItemRunner(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
public static void main(String[] args) {
// 初始化和注入spring bean
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml");
ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class));
// 开跑
runner.run();
}
private void run() {
// 创建待办事项
final String itemId = UUID.randomUUID().toString();
commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
// 到公司以后
commandGateway.send(new MarkCompletedCommand(itemId));
}
}
总结下整个过程:
以上,你觉得这么写代码,好还是不好?
老实说,代码量真的比传统做法多了不少,不过...
产品经理搭着你的肩膀说:我们要扩展一下功能,待办事项应该有指派人信息,还要有超时时间,每次完成任务要给指派人发邮件,超时完成要给员工扣1分。
然后...
传统代码变成了这样:
public class ToDoItem {
private String id;
private String userId;
private String commander;
private String remark;
private Date expectCompleteDate;
private int status;
...
}
public class ToDoItemServiceImpl implements ToDoItemService {
private TodoItemDao toDoItemDao;
private EmailService emailService;
private PointService pointService;
public void createToDoItem(String id, String remark, String commander) {
ToDoItem toDoItem = new ToDoItem();
toDoItem.setId(id);
toDoItem.setRemark(remark);
toDoItem.setUserId(userId);
toDoItem.setCommander(commander);
// 一系列时间计算
...
toDoItem.setExpectCompleteDate(expectCompleteDate);
toDoItemDao.save(toDoItem);
}
public void changeToDoItemStatus(String id, int status) {
ToDoItem toDoItem = toDoItemDao.get(id);
toDoItemDao.setStatus(status);
toDoItemDao.update(toDoItem);
if(status == ToDoItem.STATUS_COMPLETE) {
emailService.send("这是邮件内容");
if(toDoItem.getExpectCompleteDate.before(new Date())) {
pointService.subtract(toDoItem.getUserId(), 1);
}
}
}
}
一股恶化的味道,service和controller都要改写,加参数,if分支在变多。
无疑设计模式要登场了,Axon让设计模式有了一个落地的方式。
public class ToDoItemRunner {
private CommandGateway commandGateway;
public ToDoItemRunner(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml");
ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class));
runner.run();
}
private void run() {
// 创建待办事项
final String itemId = UUID.randomUUID().toString();
commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
// 到公司以后
commandGateway.send(new MarkCompletedCommand(itemId));
}
}
以上没有任何变化,确实不需要变。
public class ToDoItem extends AbstractAnnotatedAggregateRoot {
private String id;
private String userId;
private String commander;
private String remark;
private Date expectCompleteDate;
private int status;
public final static Integer STATUS_UN_COMPLETE = 1;
public final static Integer STATUS_COMPLETE = 2;
@CommandHandler
public ToDoItem(CreateToDoItemCommand command) {
// 根据命令创建实体
setId(command.getId());
setRemark(command.getRemark());
setStatus(ToDoItem.STATUS_UN_COMPLETE);
// 事件产生
apply(new ToDoItemCreatedEvent(this);
}
@CommandHandler
public void markCompleted(MarkCompletedCommand command) {
// 超没超时,是两个不同的结果事件
if(toDoItem.getExpectCompleteDate.before(new Date())) {
apply(new ToDoItemTimeOutCompletedEvent(id));
} else {
apply(new ToDoItemCompletedEvent(id));
}
}
@EventHandler
public void onCreate(ToDoItemCreatedEvent event) {
System.out.println("待办事项" + event.getToDoItem().getRemark() + "创建成功");
}
@EventHandler
public void onMarkComplete(ToDoItemCompletedEvent event) {
System.out.println("待办事项" + event.getId() + "已完成");
}
/**
/ 一个事件处理器负责扣分
*/
@EventHandler
public void onTimeOutMarkCompleteForPoint(ToDoTimeOutItemCompletedEvent event) {
commandGateway.send(new SubtractUserPointCommand(itemId, 1));
System.out.println("待办事项" + event.getId() + "超时完成");
}
/**
/ 另一个事件处理器负责发邮件
*/
@EventHandler
public void onTimeOutMarkCompleteForEmail(ToDoTimeOutItemCompletedEvent event) {
commandGateway.send(new SendEmailCommand(itemId, "这是邮件内容"));
System.out.println("待办事项" + event.getId() + "超时完成");
}
}
重新审视一下这一段run的过程,没有任何变化。增加的参数在Command里,后续的处理在事件处理器中完成。
高内聚、低耦合达成。
高内聚是指业务逻辑封装到了markCompleted()
方法内部,客户端如Controller
类不用做任何诸如加方法参数之类的改动。
低耦合是无论后续有多少要擦屁股的操作,markCompleted()
的复杂度也仍然很低,只是发出了不同的事件ToDoItemCompletedEvent
和ToDoItemTimeOutCompletedEvent
。而这个事件后后续处理上,一个事件处理器只负责做一件事,千变万化任意扩展,互不影响可维护性。
那性能方面呢?可调优的地方就多了去了,你可以通过命令网关设置命令是同步执行还是异步,是否开事务,是否回调,超时时间,重试机制。可以设置事件是同步还是异步,可以决定采用哪种技术分发命令和存储事件,mq、disruptor,redis、mongodb、jpa等等,你只需要在spring配置中更换一下实现类,代码和注解写法不变。
如上,Axon是为了业务需求复杂、高性能、高可扩展的系统而生的。对那些几张小表CRUD以后也不会再大动的系统,就不适用了。后面的章节里,我们再慢慢看Axon的各个组件都提供了些什么样的特性,来完成业务的处理。
在细看Axon的各部分组件之前,我们要先了解一下之前提到的Event Sourcing事件驱动机制。
传统项目中我们在做数据操作时,都是对数据库中的表进行直接修改,或者从数据库中取出,进行一系列逻辑处理以后把新的内容写回去。
在Axon中,对于高并发操作场景,更推荐采用Event Sourcing方式做数据操作。
首先数据对象会从持久化设施中读出来,长驻内存。每当有新的操作请求过来,在内存中修改对象数据,然后将结果事件保存起来,返回请求响应。并不需要立即将对象再写回到数据库等持久化设施中。
这样,长驻内存的对象可以支持非常高密度的写操作,省掉了磁盘IO的耗时,只需要确保事件能保存成功,而事件只增不改,通过无锁的高速写入也能支持很高的操作密度。
要注意一些处理要点:
由于对象不实时保存,所以要有不依赖于数据库ACID的新的灾备机制。从事件仓储中取出这个对象的操作事件流,一个一个进行重放。new一个新的对象,通过重放事件的方法,把所有的历史事件再解析一遍,最终对象又回到了宕机前的状态。
内存是有限的,对象过多时终究要借助硬盘,所以应该借助一种有冷热数据自动清理机制的缓存来保存内存中对象,内存满时,清理掉使用率低的对象,当对象不存在时,可以从数据源重新读取。
内存中的对象是无法进行复杂条件查询的,因此还需要有一个区别于实体持久数据源的查询数据库,专门支持查询操作。由于在高并发修改的情况下,对象内容变得实在太频繁了,所以没必要每个修改都去update查询数据库,如果一秒钟会发生几次几十次变化,实时查询还有实际意义吗?一次网络点击期间都不知发生多少变化了。更实际的做法是监听一下对象操作所产生的事件,在一定的时机去更新一次查询数据库就可以了。
查询数据库可以不遵循实体结构,怎么样便于查询优化就怎么设计
仅建议对高频率修改的实体进行Event Sourcing维护,如秒杀商品库存,金融交易频繁的实时帐户余额变动等。一般情况还是会采用传统方式,后面的框架介绍例子也会结合传统方式去说明。实时
这个图是Axon框架在Event Sourcing机制下的流程图,咋一看挺复杂的,不过之前已经用文字描述过了,再对照一下:
1. 客户端创建了一个命令
2. 通过命令网关,向命令总线发送了命令
3. 命令处理器执行了命令,操作了业务实体,持久化 (图中command handler到repository)
4. 命令处理器根据操作结果产生了事件
5. 事件被发送到事件总线,并被保存起来
6. 若干个订阅过该事件的监听器收到了事件,交给事件处理器实现了事件会造成的后续影响
这就是我们写操作的处理过程,说的是图的上部,下部有dto和query的地方就是查询,直连了database。
核心包,如果你要建一个单节点单JVM应用,这个包就够了,所有其他模块都会依赖这个包。
用来测试你写的Axon程序的模块,上线运行时是不需要的。
分布式命令总线,提供了一个可以跨多节点的命令总线的实现,它通过一个JGroupsConnector来连接每一个节点上的DistributedCommandBus实现,让多个节点分头处理不同的命令。
AMQP是一种比JMS更高级的消息协议,而且跨语言,开源产品有rabbitmq。你可以用这个模块来建一个基于AMQP协议的事件总线(EventBus),这样就能和其他使用AMQP协议的MQ产品的系统互发消息。它还可以保证消息传递可靠性,就算是事件处理器所在节点一时半会离线了也没事。
采用Spring Integration(一个Spring出品的开源ESB)来实现事件总线的模块,这样就能和其他使用Spring Integration的应用系统互发消息了。
要注意Spring Integration核心在Spring4中已经迁移到Spring Messaging项目去了,如果你使用了Spring 4以上版本,要采用Axon Spring Messaging模块来替代本模块。
采用Spring Messaging来实现事件总线的模块,可以和其他采用Spring Messaging的系统互发消息。
MongoDB是一个文档型的NoSQL数据库。这个模块提供了基于MongoDB实现的事件仓储,用来保存系统产生的事件流。
各Axon组件一般是分别提供监控信息的,这个模块将这些信息推送到了JMX。不需要任何配置,只要有这个依赖存在,各模块就会将运行的监控统计信息自动推送到JMX。
孵化阶段的模块没有经过像主要模块那么充分的测试,也没有那么完善的文档,所以不建议用在生产环境,用不用你自己看着办。
这些模块的Maven groupId是org.axonframework.incubator.
这个模块是用Cassandra列式NoSQL数据库来实现事件存储。
这个模块提供了让你使用Google App Engine的API,像使用GAE上的DatastoreService来实现事件仓储。
Redis是一个key-value式的NoSQL,有很高的性能,这个模块想要提供一个基于Redis的事件仓储。
应用系统里的所有数据变更,都是由命令触发的。命令对象里包含了这个操作所需要的所有信息,命令处理器负责根据命令里的信息去操作系统数据。然后,你要把命令和命令处理器挂上勾。
如之前的入门例子,Axon已经把这件事做了。不过那不是唯一的做法,Axon也支持你像传统的MVC应用一样,建一个service层来处理命令。
命令网关是个很实用的命令转发机制,虽然你可以不用,但是用了会比较简单。有两种方式去使用命令网关。
一是用由Axon提供的CommandGateway
接口和DefaultCommandGateway
实现类。这里面有几个方法让你发送命令,你可以决定是同步执行还是异步执行,以及超时等待时间。
另一种方法更灵活,你可以通过GatewayProxyFactory
类,把所有的自定义接口都变成命令网关。这样你就可以进行强类型定义和抛出自己的异常,Axon会在运行时自动为你的接口生成实现类。
不管是你自己定义的,还是Axon提供的命令网关,你都得加上配置,比如Spring配置。至少你的配置内容要让它找得到命令总线。另外,你还可以配上一些RetryScheduler
, CommandDispatchInterceptorS
, CommandCallbackS
和 CorrelationDataResolverS
这样的东西。
RetryScheduler
可以在命令执行失败时进行重试。IntervalRetryScheduler
是另一个重试实现类,可以加上重试的时间间隔还有最大重试次数等参数。但是它们不会对明确的业务失败异常进行重试,明确的业务失败异常是指受检查的异常(checked exception),所以只有遇到RuntimeException
时,才会进行重试。
CommandDispatchInterceptor
这个拦截器,在使用命令网关时会有效,直接发给命令总线时无效。可以用来在发送到命令总线前,改变CommandMessage
的优先级,也可以用来给命令附加元信息,或者做参数校验之类的。
CommandCallback
是个发送了任何命令以后都会被执行的回调类。
GatewayProxyFactory
类会为你自定义的接口生成命令网关实现类,你写的每个发送命令的方法会有什么样的执行效果,是根据你的方法参数,返回的类型和定义的异常决定的。这样不只是方便了调用,也方便了测试。
先看个综合例子,然后下面会挨个分析我们定义的这几个方法的行为。
// 创建一个实例:
GatewayProxyFactory factory = new GatewayProxyFactory(commandBus);
MyGateway myGateway = factory.createInstance(MyGateway.class);
public interface MyGateway {
// 调了就不管了,非堵塞
void sendCommand(MyPayloadType command);
// 方法附加了一个元数据userId到命令上,并且会堵塞等待方法执行,10秒钟不返回就超时,并返回Null
@Timeout(value = 10, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command,
@MetaData("userId") String userId);
// 如果超时或者线程中断,会抛出相应异常
@Timeout(value = 20, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(MyPayloadType command)
throws TimeoutException, InterruptedException;
// 这个方法也会堵塞等待结果,等多久由调用方决定
void sendCommandAndWait(MyPayloadType command, long timeout, TimeUnit unit)
throws TimeoutException, InterruptedException;
}
如果你用的是Spring,可以配成下面这样:
<bean id="myGateway" class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
<property name="commandBus" ref="commandBus"/>
<property name="gatewayInterface" value="package.to.MyGateway"/>
</bean>
下面总结下方法上的参数是怎么影响你的命令网关接口方法的行为的:
第一个参数必须是你期望发送的实际命令对象。
用@MetaData注解的参数对象值,会被加到命令的元数据字段上,比如来源客户端时间戳之类的。
参数上的元数据会和CommandMessage
里的元数据合并,对于key相同的元数据值,后定义的会覆盖掉先定义的。就是说,如果你的命令对象里原来已经有一个userId字段,这里又定义了一个,那这个就会覆盖掉原来的。
CommandCallback
类型的参数,会在命令处理完以后,被调用它的onSuccess
或onFailure
方法。你还可以通过回调类传递更多处理结果的信息,比如你自己的类和异常。在想要并行处理多个命令时,你可以通过这个类非堵塞拿到运行结果。
最后两个参数可能是个long或int值,以及TimeUnit
时间单位枚举。如果加了这两个,方法就会堵塞到参数设定的超时时间结束为止。至于超时以后会发生什么,取决于你定义在方法上的异常,下面再解释。注意如果其它属性完全阻止了堵塞等待,这两参数就无效了。
下面是返回值对接口效果的影响:
返回类型为void的方法会立即返回,不管是不是还有其他看起来会堵塞的参数比如抛出异常、超时时间等。
Future返回类型的方法也会立即返回,可以从Future中拿到执行结果,抛出的异常和超时时间等仍然会被忽略。
任何其他类型的返回值都会造成方法调用堵塞直到返回结果,Axon会帮你把返回值转成你定义的类,不匹配的话可能会抛出ClassCastException
。
然后是异常会造成的影响:
在方法上定义的受检查的异常只要命令处理器或者拦截器一抛出,Axon就会直接在方法上抛出给你自己处理。如果被抛出的异常你没有定义在接口方法上,Axon会把它包装成一个CommandExecutionException
,这是一个RuntimeException
在超时的时候,默认情况方法会返回Null,但是如果你在方法上定义了一个TimeoutException
,那超时时就会给你抛出TimeoutException
。
如果在等待结果的时候线程中断了,默认也会返回Null,中断标识会设置在线程中。在方法上定义一个InterruptedException
可以改变方法行为。
其他的运行时异常也可以定义在方法上以便捕获,但是不会对行为有任何影响的。
最后是注解的影响:
加在参数上的@MetaData注解会把这个参数做为元数据键值对的value,而key可以写成注解的参数,如@MetaData("userId") String userId。
如果在方法上加了@Timeout
会导致方法堵塞直到超时,但如果参数里有一个timeout参数,这个注解会被忽略。
@Timeout
会导致所有方法都堵塞,除非方法上有@Timeout
或者参数里有超时参数来覆盖掉它的影响。再来个例子:
public interface CommandGateway {
<R> void send(Object command, CommandCallback<R> callback);
<R> R sendAndWait(Object command);
<R> R sendAndWait(Object command, long timeout, TimeUnit unit);
void send(Object command);
}
...
// 同步调用
ToDoItem toDoItem = commandGateway.sendAndWait(new MarkCompletedCommand(itemId1), 15, TimeUnit.SECONDS);
// 并发调用
FutureCallback<ToDoItem> future1 = new FutureCallback<ToDoItem>();
FutureCallback<ToDoItem> future2 = new FutureCallback<ToDoItem>();
commandGateway.send(new MarkCompletedCommand(itemId1), future1);
commandGateway.send(new MarkCompletedCommand(itemId1), future2);
ToDoItem toDoItem1 = future1.get();
ToDoItem toDoItem2 = future2.get();
命令总线是一个将命令转给处理它的命令处理器的机制。命令和命令处理器总是精确匹配的,一个命令会有一个相应的命令处理器方法。如果命令没有对应的处理器,发送命令就会抛出一个NoHandlerForCommandException
。反之如果不止一个命令处理器订阅了同一个命令,则会产生覆盖效果,后订阅的有效。
CommandBus
接口就提供了两个方法来分发命令,dispatch(commandMessage, callback)
和dispatch(commandMessage)
。第一个参数是实际要分发的命令对象,第二个可选的是个回调对象,里面有两个方法onSuccess()
和onFailure()
,在方法正常执行完返回或者抛异常时会分别被调用。
前面的命令网关,就是在这个的基础上进行了包装,增加了更灵活多样化的调用选项。
如果你想要让其他线程来处理命令,你可以通过一个FutureCallback
来获取处理结果,这是Axon提供的类,是一个JDK里的Future
接口的实现,同时也是Axon框架提供的CommandCallback
接口的实现类,Axon框架还提供了很多这种实现类。之后可以通过命令网关来使用它们。
如果你的应用调用端对命令的执行结果不用关心,命令发了可以立即返回就不用管了,那程序就有了最好的可扩展性,命令处理机制只需要保证你的命令迟早会被完成的。
SimpleCommandBus
,就像名字上的那样,是Axon给你提供的一个简单的命令总线实现。它就直接在分发命令的那个线程上处理了命令,修改聚合实体,保存实体,保存事件,推送事件,全在同一个线程里完成。这种处理方式也适合很多场景,比如web应用。
SimpleCommandBus
是可以配置拦截器的,比如CommandDispatchInterceptor
,会在命令被发比命令总线后,被命令处理器处理前执行,你可以在里面对命令进行修改或者堵塞。详见 3.5章 “命令拦截器”。
SimpleCommandBus
为每个推送过来的命令都单独维护了一个工作单元来追踪命令的处理过程。工作单元是由实现了UnitOfWorkFactory
接口的工厂类产生的。为了适应你的各种各样的应用需求,你可以自己实现这个工厂类来改变工作单元。
因为在简单命令冲线里,所有的命令分发处理过程全在一个线程完成,所以性能会受限于JVM,虽然性能也很好,但还达不到极致。如果你想超越单个JVM的性能限制,或者更充分地利用CPU,可以看看Axon提供的其它的命令总线实现。
简单命令总线的性能已经不错了,尤其是当你按照第10章“性能调优”设置过以后。但实际上,它还是得利用JDK提供的同步锁来防止多个线程同时访问修改同一个聚合实体,这就导致了锁资源争夺。
DisruptorCommandBus
提供了一种不同的处理机制。它不像简单命令总线一样多个线程都在为不同的命令做着同样的处理过程,它是让每个线程只专注于做命令处理过程中的某一阶段工作,线程之间分工不同。
DisruptorCommandBus
使用了Disruptor技术(http://lmax-exchange.github.io/disruptor/),这是为了获得更好的极致性能而开发的一个并发编程小框架,它就是通过上面所说的改变多线程分工方式达到了目的。
关于高大上的Disruptor技术,你可以看这里的中文介绍。
简单地描述,Disruptor利用数组提供了一个环状的集合,而不是队列,数据被不停地加入这个环里,一个指针不断地从头到尾去遍历获取,处理好了再塞回到环里等待下一次处理,无穷无尽,还充分利用了CPU的设计机制,性能非常好,至于为什么好,看上面的介绍吧。
简单命令总线是在分发命令的线程里做完了所有的命令处理过程,而Disruptor命令总线把线程分成了两组,一组负责前半段的执行命令处理器,修改聚合实体,另一组负责后半段的推送事件到事件仓储和事件总线。
同样是在内存中处理,DisruptorCommandBus
的性能可以轻松达到SimpleCommandBus
的4倍!不过它有一些限制要注意:
DisruptorCommandBus
只支持以事件驱动方式维护的聚合实体,不适合用传统的“取-改-存”方式维护的聚合实体。命令总线也只是为这些由Disruptor处理的聚合实体提供了一个仓储(Repository)的作用。想要拿到一个仓储的引用,可以用createRepository(AggregateFactory)
方法。
一个命令只能导致一个聚合实体发生状态变化,即不能用一个命令改变两个无关实体的状态值。
如果启用了缓存的话,你不能让两个不同类型的聚合实体拥有相同的id标识,比如id为1的User和id为1的Order不能同时存在。
用它来处理的命令通常不应该在工作单元里产生一个会回滚的错误,一旦发生了回滚,DisruptorCommandBus
就没法保证你的命令会按分发的顺序被处理了。另外回滚后的重试也会消耗它的计算性能。发生了错误以后应该通过后续的处理措施来弥补而不是回滚。
如果你刚创建了一个新的聚合实体的实例,那些已经发出去,排着队要修改它的命令可能没办法精确地按它们分发的顺序去修改你的新实例。如果这些修改命令要修改的是一个已经存在的实体,顺序就不会有问题。为了保证顺序,在创建新实体的那个命令上用一个回调,等待得到成功结果以后,再发送那些修改命令,也就等几毫秒的事。不要把对同一个新实体的创建和更新命令一次全给发出去。
配置DisruptorCommandBus
要构建一个DisruptorCommandBus
实例,你需要一个AggregateFactory
聚合工厂,一个事件总线EventBus
还有一个EventStore
事件仓储。这些组件的详细介绍看第5章和第6.1章。最后你还需要一个CommandTargetResolver
接口,按名字理解,命令目标解析器,这个东西是为了告诉disruptor你的命令想要操作的是哪一个聚合实体。
Axon提供了两个实现了这个接口的类,一个是AnnotationCommandTargetResolver
是通过分析注解来得知命令的目标实体的,还有一个是MetaDataCommandTargetResolver
,是通过分析命令的元数据来得知目标实体的。
可选的,你可以提供一个自己的DisruptorConfiguration
实例来改变默认配置,针对你的应用环境进行进一步的性能调优。在Spring里配<axon:disruptor-command-bus>
是最简单的配置DisruptorCommandBus
的方式。
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:axon="http://www.axonframework.org/schema/core"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core.xsd">
<axon:disruptor-command-bus id="commandBus" event-bus="eventBus" event-store="eventStore" buffer-size="4096" ... />
<axon:disruptor-repository id="toDoRepository"
aggregate-type="org.axonframework.quickstart.annotated.ToDoItem"
command-bus="commandBus"/>
<bean id="commandGateway" class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
<property name="commandBus" ref="commandBus"/>
</bean>
<axon:filesystem-event-store id="eventStore" base-dir="events"/>
<axon:event-bus id="eventBus"/>
<axon:aggregate-command-handler aggregate-type="org.axonframework.quickstart.annotated.ToDoItem"
repository="toDoRepository"
command-bus="commandBus"/>
<axon:annotation-config/>
<bean class="org.axonframework.quickstart.annotated.ToDoEventHandler"/>
</beans>
解释一下上面的元素,元素名称的精确写法可以借助ide的xml提示敲出来:
ProducerType: 明确是用一个还是多个线程来处理队列里的一个条目,默认是多个。
WaitStrategy: 当几个分工处理同一命令过程的线程,需要彼此等待的时候的等待策略。最合适的等待策略取决于机器CPU的可用核心数以及其它在跑的处理过程的数量。如果追求低延迟,并且DisruptorCommandBus
可以拿到它所需要的核心,那你可以采用BusySpinWaitStrategy
,字面意思是繁忙旋转等待策略。The strategy to use when the processor threads (the three threads taking care of the actual processing) need to wait for each other. The best WaitStrategy depends on the number of cores available in the machine, and the number of other processes running. If low latency is crucial, and the DisruptorCommandBus may claim cores for itself, you can use the BusySpinWaitStrategy. To make the Command Bus claim less of the CPU and allow other threads to do processing, use the YieldingWaitStrategy. Finally, you can use the SleepingWaitStrategy and BlockingWaitStrategy to allow other processes a fair share of CPU. The latter is suitable if the Command Bus is not expected to be processing full-time. Defaults to the BlockingWaitStrategy.
Executor: Sets the Executor that provides the Threads for the DisruptorCommandBus. This executor must be able to provide at least 4 threads. 3 of the threads are claimed by the processing components of the DisruptorCommandBus. Extra threads are used to invoke callbacks and to schedule retries in case an Aggregate's state is detected to be corrupt. Defaults to a CachedThreadPool that provides threads from a thread group called "DisruptorCommandBus".
TransactionManager: Defines the Transaction Manager that should ensure that the storage and publication of events are executed transactionally.
InvokerInterceptors: Defines the CommandHandlerInterceptors that are to be used in the invocation process. This is the process that calls the actual Command Handler method.
PublisherInterceptors: Defines the CommandHandlerInterceptors that are to be used in the publication process. This is the process that stores and publishes the generated events.
RollbackConfiguration: Defines on which Exceptions a Unit of Work should be rolled back. Defaults to a configuration that rolls back on unchecked exceptions.
RescheduleCommandsOnCorruptState: Indicates whether Commands that have been executed against an Aggregate that has been corrupted (e.g. because a Unit of Work was rolled back) should be rescheduled. If false the callback's onFailure() method will be invoked. If true (the default), the command will be rescheduled instead.
CoolingDownPeriod: Sets the number of seconds to wait to make sure all commands are processed. During the cooling down period, no new commands are accepted, but existing commands are processed, and rescheduled when necessary. The cooling down period ensures that threads are available for rescheduling the commands and calling callbacks. Defaults to 1000 (1 second).
Cache: Sets the cache that stores aggregate instances that have been reconstructed from the Event Store. The cache is used to store aggregate instances that are not in active use by the disruptor.
InvokerThreadCount: The number of threads to assign to the invocation of command handlers. A good starting point is half the number of cores in the machine.
PublisherThreadCount: The number of threads to use to publish events. A good starting point is half the number of cores, and could be increased if a lot of time is spent on IO.
SerializerThreadCount: The number of threads to use to pre-serialize events. This defaults to 1, but is ignored if no serializer is configured.
Serializer: The serializer to perform pre-serialization with. When a serializer is configured, the DisruptorCommandBus will wrap all generated events in a SerializationAware message. The serialized form of the payload and meta data is attached before they are published to the Event Store or Event Bus. This can drastically improve performance when the same serializer is used to store and publish events to a remote destination.