[关闭]
@iamfox 2015-05-08T17:15:46.000000Z 字数 25649 阅读 8661

Axon Framework 学习

Axon DDD 学习



本文档改写自Axon框架官方手册,掺了不少私货,还改动了他们的代码例子。如果想看原版,用力戳这里

写在前面的话

我们做java web开发的人,高并发,大数据,高可用,高性能,一系列高大上的追求是我们的终极目标,但有幸步入这个领域的项目开发的人,恐怕还不到10%。

90%的人,都是些增删改查的搬砖工和跟数据库表、各种繁杂的技术组件斗争到底的苦力。

有没有那么些东西,是那10%,和那90%的人,都要同仇敌忾的东西?

有!业务,复杂的业务,多变的需求。

为了这些,多少开发人员和产品经理、市场部门、运营部门因此结下不共戴天之仇?需求的多变和软件质量的控制真是那么不可调和的矛盾吗?

那,假如有那么一个架构思想,专为了解决复杂多变的业务而生,将多种设计模式不动声色地融会贯通,那是不是一件很了不得的事?

DDD -- 领域驱动设计,它提出了一整套如何建立业务模型,划定模块边界及如何对其进行维护的理论,以避免所有的业务代码搅成一团,牵一发动全身的情况。DDD面世10多年来风靡了世界,却远没有在国内JAVA界普及开来,太多的人仍然坚信简单的MVC分层框架是万能钥匙,成了一种默守成规的固执。

CQRS -- 权责命令分离模式,简单说就是读写分离,对于写操作,有一连串的流程处理和追踪机制。对于查询,尽可能直接简单。读和写两条路线形成了完全不一样的处理过程,而不是像MVC三层一样都是一个service调一下dao完成。

Axon框架就是为了实现这两个东西而生。

很多人把网站的高质量完全寄托于新技术的运用,数据库的极致优化,乃至算法和JVM调优上,极端的甚至用存储过程全包。可是一切JAVA WEB项目都是以实现业务为目的的。当你千辛万苦将程序写到坚不可摧还有很高的性能,一个需求变更过来,你却瞠目结舌地发现程序很难再进行扩展和改版,前面大量工作都白干了要推翻重构,于是你就会把责任怪罪到业务人员头上,说他们违背当初的设计,无法实现。

业务,复杂的业务,多变的需求,不解决这个问题,再多的先进技术,再极致的优化,终究是无用功。

把对的程序调快容易,把快的程序调对很难

DDD的思想提出了一些概念,一些模式,让你的代码看起来更贴合自然语义,更贴近业务人员的思维,极力去消除开发人员和业务人员之间的代沟,另外非常重要的就是它界定了业务领域中的各种操作边界如何划分的原则,划定上下文范围,防止出现盘根错节的复杂调用关系。

Axon框架实现了这些思想,建立了一整套让你可以在专注于业务实现的同时,又能很简单地利用高级技术进行灵活处理的框架体系。

如果你不了解DDD,你也不用刻意去学习它,我会在介绍这个框架的过程中,同步去介绍DDD的思想和实践。

在此之前,我在用传统SSH主流框架渐进式地实践了近三年的DDD+CQRS,再一次去认识这个框架的心得。Axon框架没有试图去重新发明一个一栈式框架去推广什么极速开发。它是反其道而行的,它仍然可以和SSH框架共同运转,互相支持,不但没有简化,还在业务逻辑层扩展了大量的组件类,增加了代码量。

听起来是不是挺脑残的?你没给我简化代码还给我增加复杂性。但是转念想想,简单就是好吗?相对简单的摩托车能跑得过同级别的汽车吗?一台机的可扩展性和性能干得过台式机吗?那些总在说“先简单点实现,以后我们再逐步完善”,最后害我们掉进坑里花了不知几倍的力气也没把以前简单实现的后遗症治好的项目,难道不都还历历在目?

无论你使用多么简单的技术去开发软件,复杂性也是仍然存在的,只是被包装在了简单技术的内部,比如jquery之于javascript,你用起来简单了,但复杂的代码并没有消失,只是包起来了。DDD也是想把业务逻辑的复杂性包裹在内部,增加了业务逻辑代码量,却让业务的调用方和运营方受益,后续维护、调优、扩展都变成了简单的事。

Axon忠实地贯彻了DDD的思想,虽然第一次开发复杂度有所提升,但它所增加的工作量都是为了避免我们少走弯路,都是在处理一些我们以往一开始忽略,后来往往要费九牛二虎之力去填坑的东西。一旦做成,项目长远的可维护性可扩展性以及性能的优化点,都能极大地提升。

那现在,我们开始吧。

1 快速入门

任何框架都免不了要有个快速入门的小例子。Axon是为了复杂业务而生的,如果快速入门还是个Hello World,那用起来也太可笑了。至少,应该看得出业务。

设想一个待办事项工具,你可以创建一个待办事项,还可以将它标记为完成,这就是我们快速入门的需求。

回忆下我们传统方式是怎么写的:

  1. public class ToDoItem {
  2. private String id;
  3. private String remark;
  4. private int status;
  5. ...
  6. }
  7. public class ToDoItemServiceImpl implements ToDoItemService {
  8. private TodoItemDao toDoItemDao;
  9. public void createToDoItem(String id, String remark) {
  10. ToDoItem toDoItem = new ToDoItem();
  11. toDoItem.setId(id);
  12. toDoItem.setRemark(remark);
  13. toDoItemDao.save(toDoItem);
  14. }
  15. public void changeToDoItemStatus(String id, int status) {
  16. ToDoItem toDoItem = toDoItemDao.get(id);
  17. toDoItemDao.setStatus(status);
  18. toDoItemDao.update(toDoItem);
  19. }
  20. }

好了,简单快速,不过坑也不知不觉就挖下了,我们看看Axon是怎么做的,这个例子来自Axon官网,不过进行了一些更符合中国人阅读习惯的小修改。

首先,你得建两个命令Command对象,一个是创建待办事项,一个是标记待办事项完成。

  1. public class CreateToDoItemCommand {
  2. @TargetAggregateIdentifier
  3. private final String todoId;
  4. private final String remark;
  5. public CreateToDoItemCommand(String todoId, String remark) {
  6. this.todoId = todoId;
  7. this.remark = remark;
  8. }
  9. public String getTodoId() {
  10. return todoId;
  11. }
  12. public String getRemark() {
  13. return remark;
  14. }
  15. }
  16. public class MarkCompletedCommand {
  17. @TargetAggregateIdentifier
  18. private final String todoId;
  19. public MarkCompletedCommand(String todoId) {
  20. this.todoId = todoId;
  21. }
  22. public String getTodoId() {
  23. return todoId;
  24. }
  25. }

这两个命令对象代表了用户可以做两种操作,来改变数据状态。
然后你需要创建这两个命令在执行时,会触发的事件。如果一切正常,那 创建待办事项 这个操作会产生 待办事项被创建 事件,标记待办事项已完成 这个操作会产生待办事项已完成事件。
那我们建立这样的两个事件:

  1. public class ToDoItemCreatedEvent {
  2. private ToDoItem toDoItem;
  3. public ToDoItemCreatedEvent(ToDoItem toDoItem) {
  4. this.toDoItem = toDoItem;
  5. }
  6. public String getToDoItem() {
  7. return toDoItem;
  8. }
  9. }
  10. public class ToDoItemCompletedEvent {
  11. private final String todoId;
  12. public ToDoItemCompletedEvent(String todoId) {
  13. this.todoId = todoId;
  14. }
  15. public String getTodoId() {
  16. return todoId;
  17. }
  18. }

事件包含了哪些属性字段,是不一定的,主要是要能满足对事件的后续处理和记录。

现在我们有了实体,有了命令,有了事件,我们应该把它们串起来了:

  1. public class ToDoItem extends AbstractAnnotatedAggregateRoot {
  2. private String id;
  3. private String remark;
  4. private int status;
  5. public final static Integer STATUS_UN_COMPLETE = 1;
  6. public final static Integer STATUS_COMPLETE = 2;
  7. /**
  8. / 处理命令
  9. */
  10. @CommandHandler
  11. public ToDoItem(CreateToDoItemCommand command) {
  12. // 根据命令创建实体
  13. setId(command.getId());
  14. setRemark(command.getRemark());
  15. setStatus(ToDoItem.STATUS_UN_COMPLETE);
  16. // 事件产生
  17. apply(new ToDoItemCreatedEvent(this);
  18. }
  19. /**
  20. / 处理命令
  21. */
  22. @CommandHandler
  23. public void markCompleted(MarkCompletedCommand command) {
  24. if (getStatus().intValue() == ToDoItem.STATUS_UN_COMPLETE) {
  25. setStatus(ToDoItem.STATUS_COMPLETE);
  26. apply(new ToDoItemCompletedEvent(id));
  27. } else {
  28. // 抛出异常处理
  29. }
  30. }
  31. /**
  32. / 处理事件
  33. */
  34. @EventHandler
  35. public void onCreate(ToDoItemCreatedEvent event) {
  36. System.out.println("待办事项" + event.getToDoItem().getRemark() + "创建成功");
  37. }
  38. /**
  39. / 处理事件
  40. */
  41. @EventHandler
  42. public void onMarkComplete(ToDoItemCompletedEvent event) {
  43. System.out.println("待办事项" + event.getId() + "已完成");
  44. }
  45. }

ps.现实项目中不会把那么多@CommandHandler和@EventHandler全写在实体类里,会太臃肿,可以移到其他组件类。
好了,让我们用junit来跑一跑:

  1. public class ToDoItemTest {
  2. private FixtureConfiguration fixture;
  3. @Before
  4. public void setUp() throws Exception {
  5. fixture = Fixtures.newGivenWhenThenFixture(ToDoItem.class);
  6. }
  7. @Test
  8. public void testCreateToDoItem() throws Exception {
  9. ToDoItem testEntity = new ToDoItem();
  10. testEntity.setId("todo1");
  11. testEntity.setRemark("8点钟要上班");
  12. testEntity.setStatus(1);
  13. fixture.given()
  14. .when(new CreateToDoItemCommand("todo1", "8点钟要上班"))
  15. .expectEvents(new ToDoItemCreatedEvent(testEntity));
  16. }
  17. @Test
  18. public void testMarkToDoItemAsCompleted() throws Exception {
  19. ToDoItem testEntity = new ToDoItem();
  20. testEntity.setId("todo1");
  21. testEntity.setRemark("8点钟要上班");
  22. testEntity.setStatus(1);
  23. fixture.given(new ToDoItemCreatedEvent(testEntity))
  24. .when(new MarkCompletedCommand("todo1"))
  25. .expectEvents(new ToDoItemCompletedEvent("todo1"));
  26. }
  27. }

成功。
对于程序员来说,这样就算是运行通了,但对于业务人员来说就不是了,他们会希望能看到你在UI界面上操作成功给他看,当然建UI太费时间了,这里不会演示,但我们可以用更接近生产代码而不是测试用例的形式来表示一下。

  1. public class ToDoItemRunner {
  2. public static void main(String[] args) {
  3. // 首先要有个命令总线
  4. CommandBus commandBus = new SimpleCommandBus();
  5. // 建一个命令网关以便为发送命令方提供友好的api
  6. CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
  7. // 我们准备把事件存在文件里在"events/"文件夹下
  8. EventStore eventStore = new FileSystemEventStore(new SimpleEventFileResolver(new File("./events")));
  9. // 再来个简单的事件总线
  10. EventBus eventBus = new SimpleEventBus();
  11. // 配置一下事件仓储把事件存起来
  12. EventSourcingRepository repository = new EventSourcingRepository(ToDoItem.class, eventStore);
  13. repository.setEventBus(eventBus);
  14. // 要让Axon知道哪个类能处理命令总线里的命令,订阅一下
  15. AggregateAnnotationCommandHandler.subscribe(ToDoItem.class, repository, commandBus);
  16. // 然后我们向命令总线发送一些命令
  17. final String itemId = UUID.randomUUID().toString();
  18. commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
  19. // 到公司了...
  20. commandGateway.send(new MarkCompletedCommand(itemId));
  21. }
  22. }

以上流程并不是常见的从数据库取出数据、修改再保存的操作过程,当然Axon也支持那样的过程,为了展示一些更高大上的架构思想,上例中采用了一种叫事件驱动或者叫事件溯源(Event Sourcing)的机制去做数据操作,如果你不了解这种机制,我们后面再说。

最后要展示的是,如果用spring会怎么样?

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:axon="http://www.axonframework.org/schema/core"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5. http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core-2.0.xsd">
  6. <axon:command-bus id="commandBus"/>
  7. <axon:event-bus id="eventBus"/>
  8. <axon:event-sourcing-repository id="toDoRepository"
  9. aggregate-type="org.axonframework.test.sample.ToDoItem"/>
  10. <axon:aggregate-command-handler id="toDoItemHandler"
  11. aggregate-type="org.axonframework.test.sample.ToDoItem"
  12. repository="toDoRepository"
  13. command-bus="commandBus"/>
  14. <axon:filesystem-event-store id="eventStore" base-dir="events"/>
  15. <bean class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
  16. <property name="commandBus" ref="commandBus"/>
  17. </bean>
  18. </beans>

然后代码还剩下这些:

  1. public class ToDoItemRunner {
  2. private CommandGateway commandGateway;
  3. public ToDoItemRunner(CommandGateway commandGateway) {
  4. this.commandGateway = commandGateway;
  5. }
  6. public static void main(String[] args) {
  7. // 初始化和注入spring bean
  8. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml");
  9. ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class));
  10. // 开跑
  11. runner.run();
  12. }
  13. private void run() {
  14. // 创建待办事项
  15. final String itemId = UUID.randomUUID().toString();
  16. commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
  17. // 到公司以后
  18. commandGateway.send(new MarkCompletedCommand(itemId));
  19. }
  20. }

总结下整个过程:

  1. 客户端创建了一个命令
  2. 通过命令网关,向命令总线发送了命令
  3. 命令处理器执行了命令,操作了业务实体,持久化
  4. 命令处理器根据操作结果产生了事件
  5. 事件被发送到事件总线,并被保存起来
  6. 若干个订阅过该事件的监听器收到了事件,交给事件处理器实现了事件会造成的后续影响

以上,你觉得这么写代码,好还是不好?

老实说,代码量真的比传统做法多了不少,不过...
产品经理搭着你的肩膀说:我们要扩展一下功能,待办事项应该有指派人信息,还要有超时时间,每次完成任务要给指派人发邮件,超时完成要给员工扣1分。

然后...
传统代码变成了这样:

  1. public class ToDoItem {
  2. private String id;
  3. private String userId;
  4. private String commander;
  5. private String remark;
  6. private Date expectCompleteDate;
  7. private int status;
  8. ...
  9. }
  10. public class ToDoItemServiceImpl implements ToDoItemService {
  11. private TodoItemDao toDoItemDao;
  12. private EmailService emailService;
  13. private PointService pointService;
  14. public void createToDoItem(String id, String remark, String commander) {
  15. ToDoItem toDoItem = new ToDoItem();
  16. toDoItem.setId(id);
  17. toDoItem.setRemark(remark);
  18. toDoItem.setUserId(userId);
  19. toDoItem.setCommander(commander);
  20. // 一系列时间计算
  21. ...
  22. toDoItem.setExpectCompleteDate(expectCompleteDate);
  23. toDoItemDao.save(toDoItem);
  24. }
  25. public void changeToDoItemStatus(String id, int status) {
  26. ToDoItem toDoItem = toDoItemDao.get(id);
  27. toDoItemDao.setStatus(status);
  28. toDoItemDao.update(toDoItem);
  29. if(status == ToDoItem.STATUS_COMPLETE) {
  30. emailService.send("这是邮件内容");
  31. if(toDoItem.getExpectCompleteDate.before(new Date())) {
  32. pointService.subtract(toDoItem.getUserId(), 1);
  33. }
  34. }
  35. }
  36. }

一股恶化的味道,service和controller都要改写,加参数,if分支在变多。
无疑设计模式要登场了,Axon让设计模式有了一个落地的方式。

  1. public class ToDoItemRunner {
  2. private CommandGateway commandGateway;
  3. public ToDoItemRunner(CommandGateway commandGateway) {
  4. this.commandGateway = commandGateway;
  5. }
  6. public static void main(String[] args) {
  7. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml");
  8. ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class));
  9. runner.run();
  10. }
  11. private void run() {
  12. // 创建待办事项
  13. final String itemId = UUID.randomUUID().toString();
  14. commandGateway.send(new CreateToDoItemCommand(itemId, "8点要上班"));
  15. // 到公司以后
  16. commandGateway.send(new MarkCompletedCommand(itemId));
  17. }
  18. }

以上没有任何变化,确实不需要变。

  1. public class ToDoItem extends AbstractAnnotatedAggregateRoot {
  2. private String id;
  3. private String userId;
  4. private String commander;
  5. private String remark;
  6. private Date expectCompleteDate;
  7. private int status;
  8. public final static Integer STATUS_UN_COMPLETE = 1;
  9. public final static Integer STATUS_COMPLETE = 2;
  10. @CommandHandler
  11. public ToDoItem(CreateToDoItemCommand command) {
  12. // 根据命令创建实体
  13. setId(command.getId());
  14. setRemark(command.getRemark());
  15. setStatus(ToDoItem.STATUS_UN_COMPLETE);
  16. // 事件产生
  17. apply(new ToDoItemCreatedEvent(this);
  18. }
  19. @CommandHandler
  20. public void markCompleted(MarkCompletedCommand command) {
  21. // 超没超时,是两个不同的结果事件
  22. if(toDoItem.getExpectCompleteDate.before(new Date())) {
  23. apply(new ToDoItemTimeOutCompletedEvent(id));
  24. } else {
  25. apply(new ToDoItemCompletedEvent(id));
  26. }
  27. }
  28. @EventHandler
  29. public void onCreate(ToDoItemCreatedEvent event) {
  30. System.out.println("待办事项" + event.getToDoItem().getRemark() + "创建成功");
  31. }
  32. @EventHandler
  33. public void onMarkComplete(ToDoItemCompletedEvent event) {
  34. System.out.println("待办事项" + event.getId() + "已完成");
  35. }
  36. /**
  37. / 一个事件处理器负责扣分
  38. */
  39. @EventHandler
  40. public void onTimeOutMarkCompleteForPoint(ToDoTimeOutItemCompletedEvent event) {
  41. commandGateway.send(new SubtractUserPointCommand(itemId, 1));
  42. System.out.println("待办事项" + event.getId() + "超时完成");
  43. }
  44. /**
  45. / 另一个事件处理器负责发邮件
  46. */
  47. @EventHandler
  48. public void onTimeOutMarkCompleteForEmail(ToDoTimeOutItemCompletedEvent event) {
  49. commandGateway.send(new SendEmailCommand(itemId, "这是邮件内容"));
  50. System.out.println("待办事项" + event.getId() + "超时完成");
  51. }
  52. }

重新审视一下这一段run的过程,没有任何变化。增加的参数在Command里,后续的处理在事件处理器中完成。

高内聚、低耦合达成。

高内聚是指业务逻辑封装到了markCompleted()方法内部,客户端如Controller类不用做任何诸如加方法参数之类的改动。

低耦合是无论后续有多少要擦屁股的操作,markCompleted()的复杂度也仍然很低,只是发出了不同的事件ToDoItemCompletedEventToDoItemTimeOutCompletedEvent。而这个事件后后续处理上,一个事件处理器只负责做一件事,千变万化任意扩展,互不影响可维护性。

那性能方面呢?可调优的地方就多了去了,你可以通过命令网关设置命令是同步执行还是异步,是否开事务,是否回调,超时时间,重试机制。可以设置事件是同步还是异步,可以决定采用哪种技术分发命令和存储事件,mq、disruptor,redis、mongodb、jpa等等,你只需要在spring配置中更换一下实现类,代码和注解写法不变。

如上,Axon是为了业务需求复杂、高性能、高可扩展的系统而生的。对那些几张小表CRUD以后也不会再大动的系统,就不适用了。后面的章节里,我们再慢慢看Axon的各个组件都提供了些什么样的特性,来完成业务的处理。

2 技术概览

2.1 关于Event Sourcing的简介

在细看Axon的各部分组件之前,我们要先了解一下之前提到的Event Sourcing事件驱动机制。

传统项目中我们在做数据操作时,都是对数据库中的表进行直接修改,或者从数据库中取出,进行一系列逻辑处理以后把新的内容写回去。

在Axon中,对于高并发操作场景,更推荐采用Event Sourcing方式做数据操作。

首先数据对象会从持久化设施中读出来,长驻内存。每当有新的操作请求过来,在内存中修改对象数据,然后将结果事件保存起来,返回请求响应。并不需要立即将对象再写回到数据库等持久化设施中。

这样,长驻内存的对象可以支持非常高密度的写操作,省掉了磁盘IO的耗时,只需要确保事件能保存成功,而事件只增不改,通过无锁的高速写入也能支持很高的操作密度。

要注意一些处理要点:

CQRS流程结构图

这个图是Axon框架在Event Sourcing机制下的流程图,咋一看挺复杂的,不过之前已经用文字描述过了,再对照一下:
1. 客户端创建了一个命令
2. 通过命令网关,向命令总线发送了命令
3. 命令处理器执行了命令,操作了业务实体,持久化 (图中command handler到repository)
4. 命令处理器根据操作结果产生了事件
5. 事件被发送到事件总线,并被保存起来
6. 若干个订阅过该事件的监听器收到了事件,交给事件处理器实现了事件会造成的后续影响

这就是我们写操作的处理过程,说的是图的上部,下部有dto和query的地方就是查询,直连了database。

2.2 技术组件简介

2.2.1 Axon Core

核心包,如果你要建一个单节点单JVM应用,这个包就够了,所有其他模块都会依赖这个包。

2.2.2 Axon Test

用来测试你写的Axon程序的模块,上线运行时是不需要的。

2.2.3 Axon Distributed CommandBus

分布式命令总线,提供了一个可以跨多节点的命令总线的实现,它通过一个JGroupsConnector来连接每一个节点上的DistributedCommandBus实现,让多个节点分头处理不同的命令。

2.2.4 Axon AMQP

AMQP是一种比JMS更高级的消息协议,而且跨语言,开源产品有rabbitmq。你可以用这个模块来建一个基于AMQP协议的事件总线(EventBus),这样就能和其他使用AMQP协议的MQ产品的系统互发消息。它还可以保证消息传递可靠性,就算是事件处理器所在节点一时半会离线了也没事。

2.2.5 Axon Integration

采用Spring Integration(一个Spring出品的开源ESB)来实现事件总线的模块,这样就能和其他使用Spring Integration的应用系统互发消息了。

要注意Spring Integration核心在Spring4中已经迁移到Spring Messaging项目去了,如果你使用了Spring 4以上版本,要采用Axon Spring Messaging模块来替代本模块。

2.2.6 Axon Spring Messaging

采用Spring Messaging来实现事件总线的模块,可以和其他采用Spring Messaging的系统互发消息。

2.2.7 Axon Mongo

MongoDB是一个文档型的NoSQL数据库。这个模块提供了基于MongoDB实现的事件仓储,用来保存系统产生的事件流。

2.2.8 Axon Monitoring JMX

各Axon组件一般是分别提供监控信息的,这个模块将这些信息推送到了JMX。不需要任何配置,只要有这个依赖存在,各模块就会将运行的监控统计信息自动推送到JMX。

2.3 孵化阶段的模块

孵化阶段的模块没有经过像主要模块那么充分的测试,也没有那么完善的文档,所以不建议用在生产环境,用不用你自己看着办。
这些模块的Maven groupId是org.axonframework.incubator.

2.3.1 Axon Cassandra

这个模块是用Cassandra列式NoSQL数据库来实现事件存储。

2.3.2 Axon Google App Engine

这个模块提供了让你使用Google App Engine的API,像使用GAE上的DatastoreService来实现事件仓储。

2.3.3 Axon Redis

Redis是一个key-value式的NoSQL,有很高的性能,这个模块想要提供一个基于Redis的事件仓储。

3 命令处理

应用系统里的所有数据变更,都是由命令触发的。命令对象里包含了这个操作所需要的所有信息,命令处理器负责根据命令里的信息去操作系统数据。然后,你要把命令和命令处理器挂上勾。

如之前的入门例子,Axon已经把这件事做了。不过那不是唯一的做法,Axon也支持你像传统的MVC应用一样,建一个service层来处理命令。

3.1 命令网关

命令网关是个很实用的命令转发机制,虽然你可以不用,但是用了会比较简单。有两种方式去使用命令网关。

一是用由Axon提供的CommandGateway接口和DefaultCommandGateway实现类。这里面有几个方法让你发送命令,你可以决定是同步执行还是异步执行,以及超时等待时间。

另一种方法更灵活,你可以通过GatewayProxyFactory类,把所有的自定义接口都变成命令网关。这样你就可以进行强类型定义和抛出自己的异常,Axon会在运行时自动为你的接口生成实现类。

3.1.1 配置命令网关

不管是你自己定义的,还是Axon提供的命令网关,你都得加上配置,比如Spring配置。至少你的配置内容要让它找得到命令总线。另外,你还可以配上一些RetryScheduler, CommandDispatchInterceptorS, CommandCallbackSCorrelationDataResolverS这样的东西。

RetryScheduler可以在命令执行失败时进行重试。IntervalRetryScheduler是另一个重试实现类,可以加上重试的时间间隔还有最大重试次数等参数。但是它们不会对明确的业务失败异常进行重试,明确的业务失败异常是指受检查的异常(checked exception),所以只有遇到RuntimeException时,才会进行重试。

CommandDispatchInterceptor这个拦截器,在使用命令网关时会有效,直接发给命令总线时无效。可以用来在发送到命令总线前,改变CommandMessage的优先级,也可以用来给命令附加元信息,或者做参数校验之类的。

CommandCallback是个发送了任何命令以后都会被执行的回调类。

3.1.2 创建一个自定义的命令网关

GatewayProxyFactory类会为你自定义的接口生成命令网关实现类,你写的每个发送命令的方法会有什么样的执行效果,是根据你的方法参数,返回的类型和定义的异常决定的。这样不只是方便了调用,也方便了测试。

先看个综合例子,然后下面会挨个分析我们定义的这几个方法的行为。

  1. // 创建一个实例:
  2. GatewayProxyFactory factory = new GatewayProxyFactory(commandBus);
  3. MyGateway myGateway = factory.createInstance(MyGateway.class);
  4. public interface MyGateway {
  5. // 调了就不管了,非堵塞
  6. void sendCommand(MyPayloadType command);
  7. // 方法附加了一个元数据userId到命令上,并且会堵塞等待方法执行,10秒钟不返回就超时,并返回Null
  8. @Timeout(value = 10, unit = TimeUnit.SECONDS)
  9. ReturnValue sendCommandAndWaitForAResult(MyPayloadType command,
  10. @MetaData("userId") String userId);
  11. // 如果超时或者线程中断,会抛出相应异常
  12. @Timeout(value = 20, unit = TimeUnit.SECONDS)
  13. ReturnValue sendCommandAndWaitForAResult(MyPayloadType command)
  14. throws TimeoutException, InterruptedException;
  15. // 这个方法也会堵塞等待结果,等多久由调用方决定
  16. void sendCommandAndWait(MyPayloadType command, long timeout, TimeUnit unit)
  17. throws TimeoutException, InterruptedException;
  18. }

如果你用的是Spring,可以配成下面这样:

  1. <bean id="myGateway" class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
  2. <property name="commandBus" ref="commandBus"/>
  3. <property name="gatewayInterface" value="package.to.MyGateway"/>
  4. </bean>

下面总结下方法上的参数是怎么影响你的命令网关接口方法的行为的:

下面是返回值对接口效果的影响:

然后是异常会造成的影响:

再来个例子:

  1. public interface CommandGateway {
  2. <R> void send(Object command, CommandCallback<R> callback);
  3. <R> R sendAndWait(Object command);
  4. <R> R sendAndWait(Object command, long timeout, TimeUnit unit);
  5. void send(Object command);
  6. }
  7. ...
  8. // 同步调用
  9. ToDoItem toDoItem = commandGateway.sendAndWait(new MarkCompletedCommand(itemId1), 15, TimeUnit.SECONDS);
  10. // 并发调用
  11. FutureCallback<ToDoItem> future1 = new FutureCallback<ToDoItem>();
  12. FutureCallback<ToDoItem> future2 = new FutureCallback<ToDoItem>();
  13. commandGateway.send(new MarkCompletedCommand(itemId1), future1);
  14. commandGateway.send(new MarkCompletedCommand(itemId1), future2);
  15. ToDoItem toDoItem1 = future1.get();
  16. ToDoItem toDoItem2 = future2.get();

3.2 命令总线

命令总线是一个将命令转给处理它的命令处理器的机制。命令和命令处理器总是精确匹配的,一个命令会有一个相应的命令处理器方法。如果命令没有对应的处理器,发送命令就会抛出一个NoHandlerForCommandException。反之如果不止一个命令处理器订阅了同一个命令,则会产生覆盖效果,后订阅的有效。

3.2.1 分发命令

CommandBus接口就提供了两个方法来分发命令,dispatch(commandMessage, callback)dispatch(commandMessage)。第一个参数是实际要分发的命令对象,第二个可选的是个回调对象,里面有两个方法onSuccess()onFailure(),在方法正常执行完返回或者抛异常时会分别被调用。

前面的命令网关,就是在这个的基础上进行了包装,增加了更灵活多样化的调用选项。

如果你想要让其他线程来处理命令,你可以通过一个FutureCallback来获取处理结果,这是Axon提供的类,是一个JDK里的Future接口的实现,同时也是Axon框架提供的CommandCallback接口的实现类,Axon框架还提供了很多这种实现类。之后可以通过命令网关来使用它们。

如果你的应用调用端对命令的执行结果不用关心,命令发了可以立即返回就不用管了,那程序就有了最好的可扩展性,命令处理机制只需要保证你的命令迟早会被完成的。

3.2.2 简单命令总线

SimpleCommandBus,就像名字上的那样,是Axon给你提供的一个简单的命令总线实现。它就直接在分发命令的那个线程上处理了命令,修改聚合实体,保存实体,保存事件,推送事件,全在同一个线程里完成。这种处理方式也适合很多场景,比如web应用。

SimpleCommandBus是可以配置拦截器的,比如CommandDispatchInterceptor,会在命令被发比命令总线后,被命令处理器处理前执行,你可以在里面对命令进行修改或者堵塞。详见 3.5章 “命令拦截器”。

SimpleCommandBus为每个推送过来的命令都单独维护了一个工作单元来追踪命令的处理过程。工作单元是由实现了UnitOfWorkFactory接口的工厂类产生的。为了适应你的各种各样的应用需求,你可以自己实现这个工厂类来改变工作单元。

因为在简单命令冲线里,所有的命令分发处理过程全在一个线程完成,所以性能会受限于JVM,虽然性能也很好,但还达不到极致。如果你想超越单个JVM的性能限制,或者更充分地利用CPU,可以看看Axon提供的其它的命令总线实现。

3.2.2 Disruptor命令总线

简单命令总线的性能已经不错了,尤其是当你按照第10章“性能调优”设置过以后。但实际上,它还是得利用JDK提供的同步锁来防止多个线程同时访问修改同一个聚合实体,这就导致了锁资源争夺。

DisruptorCommandBus提供了一种不同的处理机制。它不像简单命令总线一样多个线程都在为不同的命令做着同样的处理过程,它是让每个线程只专注于做命令处理过程中的某一阶段工作,线程之间分工不同。

DisruptorCommandBus使用了Disruptor技术(http://lmax-exchange.github.io/disruptor/),这是为了获得更好的极致性能而开发的一个并发编程小框架,它就是通过上面所说的改变多线程分工方式达到了目的。

关于高大上的Disruptor技术,你可以看这里的中文介绍。
简单地描述,Disruptor利用数组提供了一个环状的集合,而不是队列,数据被不停地加入这个环里,一个指针不断地从头到尾去遍历获取,处理好了再塞回到环里等待下一次处理,无穷无尽,还充分利用了CPU的设计机制,性能非常好,至于为什么好,看上面的介绍吧。

简单命令总线是在分发命令的线程里做完了所有的命令处理过程,而Disruptor命令总线把线程分成了两组,一组负责前半段的执行命令处理器,修改聚合实体,另一组负责后半段的推送事件到事件仓储和事件总线。

同样是在内存中处理,DisruptorCommandBus的性能可以轻松达到SimpleCommandBus的4倍!不过它有一些限制要注意:

  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xmlns:axon="http://www.axonframework.org/schema/core"
  3. xmlns="http://www.springframework.org/schema/beans"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core.xsd">
  5. <axon:disruptor-command-bus id="commandBus" event-bus="eventBus" event-store="eventStore" buffer-size="4096" ... />
  6. <axon:disruptor-repository id="toDoRepository"
  7. aggregate-type="org.axonframework.quickstart.annotated.ToDoItem"
  8. command-bus="commandBus"/>
  9. <bean id="commandGateway" class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
  10. <property name="commandBus" ref="commandBus"/>
  11. </bean>
  12. <axon:filesystem-event-store id="eventStore" base-dir="events"/>
  13. <axon:event-bus id="eventBus"/>
  14. <axon:aggregate-command-handler aggregate-type="org.axonframework.quickstart.annotated.ToDoItem"
  15. repository="toDoRepository"
  16. command-bus="commandBus"/>
  17. <axon:annotation-config/>
  18. <bean class="org.axonframework.quickstart.annotated.ToDoEventHandler"/>
  19. </beans>

解释一下上面的元素,元素名称的精确写法可以借助ide的xml提示敲出来:

ProducerType: 明确是用一个还是多个线程来处理队列里的一个条目,默认是多个。

WaitStrategy: 当几个分工处理同一命令过程的线程,需要彼此等待的时候的等待策略。最合适的等待策略取决于机器CPU的可用核心数以及其它在跑的处理过程的数量。如果追求低延迟,并且DisruptorCommandBus可以拿到它所需要的核心,那你可以采用BusySpinWaitStrategy,字面意思是繁忙旋转等待策略。The strategy to use when the processor threads (the three threads taking care of the actual processing) need to wait for each other. The best WaitStrategy depends on the number of cores available in the machine, and the number of other processes running. If low latency is crucial, and the DisruptorCommandBus may claim cores for itself, you can use the BusySpinWaitStrategy. To make the Command Bus claim less of the CPU and allow other threads to do processing, use the YieldingWaitStrategy. Finally, you can use the SleepingWaitStrategy and BlockingWaitStrategy to allow other processes a fair share of CPU. The latter is suitable if the Command Bus is not expected to be processing full-time. Defaults to the BlockingWaitStrategy.

Executor: Sets the Executor that provides the Threads for the DisruptorCommandBus. This executor must be able to provide at least 4 threads. 3 of the threads are claimed by the processing components of the DisruptorCommandBus. Extra threads are used to invoke callbacks and to schedule retries in case an Aggregate's state is detected to be corrupt. Defaults to a CachedThreadPool that provides threads from a thread group called "DisruptorCommandBus".

TransactionManager: Defines the Transaction Manager that should ensure that the storage and publication of events are executed transactionally.

InvokerInterceptors: Defines the CommandHandlerInterceptors that are to be used in the invocation process. This is the process that calls the actual Command Handler method.

PublisherInterceptors: Defines the CommandHandlerInterceptors that are to be used in the publication process. This is the process that stores and publishes the generated events.

RollbackConfiguration: Defines on which Exceptions a Unit of Work should be rolled back. Defaults to a configuration that rolls back on unchecked exceptions.

RescheduleCommandsOnCorruptState: Indicates whether Commands that have been executed against an Aggregate that has been corrupted (e.g. because a Unit of Work was rolled back) should be rescheduled. If false the callback's onFailure() method will be invoked. If true (the default), the command will be rescheduled instead.

CoolingDownPeriod: Sets the number of seconds to wait to make sure all commands are processed. During the cooling down period, no new commands are accepted, but existing commands are processed, and rescheduled when necessary. The cooling down period ensures that threads are available for rescheduling the commands and calling callbacks. Defaults to 1000 (1 second).

Cache: Sets the cache that stores aggregate instances that have been reconstructed from the Event Store. The cache is used to store aggregate instances that are not in active use by the disruptor.

InvokerThreadCount: The number of threads to assign to the invocation of command handlers. A good starting point is half the number of cores in the machine.

PublisherThreadCount: The number of threads to use to publish events. A good starting point is half the number of cores, and could be increased if a lot of time is spent on IO.

SerializerThreadCount: The number of threads to use to pre-serialize events. This defaults to 1, but is ignored if no serializer is configured.

Serializer: The serializer to perform pre-serialization with. When a serializer is configured, the DisruptorCommandBus will wrap all generated events in a SerializationAware message. The serialized form of the payload and meta data is attached before they are published to the Event Store or Event Bus. This can drastically improve performance when the same serializer is used to store and publish events to a remote destination.

3.3 命令处理器

3.4 工作单元

3.5 命令拦截器

3.6 分布式命令总线

4 领域建模

4.1 事件

4.2 聚合

5 仓储和事件存储

5.1 标准仓储

5.2 事件驱动仓储

5.3 事件存储实现

5.4 事件向上转型

5.5 快照

5.6 高级冲突检测与解决

6 事件处理

6.1 事件总线

6.2 事件监听器

6.3 异步事件处理

6.4 分布式事件总线

6.5 在集群上重放事件流

7 管理复杂业务流程

7.1 Saga流程

7.1 Saga流程基础设施

8 测试

9 整合Spring

10 性能调优

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注