与领域驱动设计一起使用的技术之一(关于领域驱动设计的更多信息,阅读实现领域驱动的设计,由Vaughn Vernon[addison - wesley Professional])是事件源。在这种架构风格中,数据并不是存储为当前状态的快照,以期在适当的地方修改它;换言之,即使用SQL UPDATE查询的方式。相反,事件源是将已经一些列已经发生了的事件作为领域事件保存在一个只追加(append-only)的数据存储中。使用这个设计,我们从不会覆盖任何数据,并且有效地拥有一个延伸而来的免费的审计日志。而且,查看实时数据的唯一方法就是将事件按照顺序应用于一个空的视图上即可。
在初始空状态之上应用事件的过程称为事件源中的投影(projection)。一个单一的事实来源可以驱动多个不同的投影。例如,我们可能会有一系列与预订系统相关的事件事实,如TicketReserved , ReservationConfirmed ,以及TicketBought——重要的是它们都是过去式的,因为事件事实(fact)总是反映已经发生的行为和事件。从一个单一的事实来源,可以驱动出多个投影,如下:
所有确认的预定名单
今天取消的预定名单
每周总收入
当系统发展时,我们可以利用收集到的数据,抛弃旧的投影(projection),建立新的投影(projection)。假设您希望构建包含所有预订和它们的状态的投影。为此,您必须消耗所有ReservationEvent并将它们应用到适当的预订中。每个ReservationEvent都有一个子类用于标识不同类型的事件,如TicketBought。此外,每个事件都有其所应用到的预定对象的UUID:
FactStore factStore = new CassandraFactStore();
Observable<ReservationEvent> facts = factStore.observe();
facts.subscribe(this::updateProjection);
//...
void updateProjection(ReservationEvent event) {
UUID uuid = event.getReservationUuid();
Reservation res = loadBy(uuid)
.orElseGet(() -> new Reservation(uuid));
res.consume(event);
store(event.getUuid(), res);
}
private void store(UUID id, Reservation modified) {
//...
}
Optional<Reservation> loadBy(UUID uuid) {
//...
}
class Reservation {
Reservation consume(ReservationEvent event) {
//mutate myself
return this;
}
}
显然,facts的流是可以以一个Observable表现。系统的其他部分接收API调用或web请求,响应(例如,向客户的信用卡收费)并存储发生了的事实(即领域事件)。系统的其他部分(甚至其他系统!)可以通过订阅这些facts流,以任意角度构建当前系统状态的快照。我们的代码非常简单:每个ReservationEvent 都从我们的投影的数据存储中加载一个Reservation(预订)。如果没有找到Reservation(预订),这意味着它是与这个UUID关联的第一个事件(一般情况是创建时间),所以我们从一个空的Reservation (预订)开始。然后,我们将ReservationEvent 传递给该Reservation (预订)对象。它可以自我更新以反映任何类型的事实。然后,我们Reservation保存回去。
请记住,投影(projection)是独立于事实(fact)的,它们可以使用任何其他持久性机制,甚至可以在内存中保存状态。此外,您可以有多个投影,它们使用相同的fact流,但构建不同的快照。例如,您可以拥有一个Accounting对象,它也是用相同的fact流,但是它只关心部分属性:即进进出出的钱。而另一种投影可能只对FraudDetected 的事实感兴趣,以总结欺诈情况。
这个简单的事件源介绍将帮助我们理解为什么groupBy()操作符是有用的。不久,我们发现对Reservation(预定)投影的更新落后了,我们无法跟上fact生成的速度。这个数据存储可以轻松处理并发读取和更新,因此我们可以尝试并行处理事实(fact):
Observable<ReservationEvent> facts = factStore.observe();
facts
.flatMap(this::updateProjectionAsync)
.subscribe();
//...
Observable<ReservationEvent> updateProjectionAsync(ReservationEvent event) {
//possibly asynchronous
}
在这个例子里,我们可以并行地使用事实(fact),或者更精确地说:接收是串行化的,但是处理(在updateProjectionAsync()中)可能是异步的。updateProjectionAsync()改变了在投影中提供的Reservation(预定)对象的状态。但是,看看updateProjection()是如何实现的,我们很快就看到了一个可能的竞争条件:两个线程可能使用不同的事件,修改相同的Reservation(预定)对象,并尝试存储它——但是第一个更新被覆盖并且有效地丢失了。从技术上讲,你可以尝试乐观锁,但另一个问题依然存在:事实(fact)的顺序已经无法保证。当两个无关的Reservation(预定)实例(使用不同的UUID)被触及时,这不是一个问题。但是,将事实(fact)以不同的顺序应用到相同的Reservation(预定)对象上,这实际上可能是灾难性的。
这就是groupBy()派上用场的地方。它将某个流,基于某个键的,将其分割成多个并行流,每个流都使用给定的键持有事件。在这种情况下,我们希望将包含所有事实(fact)的巨大的流,分割成大量的较小的流,每个流只释放与特定UUID相关的事件。
Observable<ReservationEvent> facts = factStore.observe();
Observable<GroupedObservable<UUID, ReservationEvent>> grouped =
facts.groupBy(ReservationEvent::getReservationUuid);
grouped.subscribe(byUuid -> {
byUuid.subscribe(this::updateProjection);
});
这个例子包含了相当多的新构念。首先,我们接收一个Observable<ReservationEvent>流,并将其按照UUID分组(ReservationEvent::getReservationUuid)。您可能希望groupBy()应该返回一个List<Observable<ReservationEvent>>
----毕竟我们将单个流转换为多个流。当您意识到groupBy()不可能知道上游会产生多少个不同的键(UUID)时,这个假设就不成立了。因此,它必须动态的生成它们:每当发现新的UUID时,新的GroupedObservable<UUID, ReservationEvent>就会发射,并推送与这个UUID相关的事件。因此,很明显,外部的数据结构必定是一个Observable的。
但是,这个GroupedObservable <UUID,ReservationEvent>到底是什么呢?GroupedObservable是一个简单的Observable 子类,除了标准Observable的契约,它返回一个key,该key是该流中的所有事件所属于的(在我们的例子中是UUID)。发射出的GroupedObservable的数量可以是一个(如果所有事件具有相同的键),也能是事件总数(如果每个上游事件都有一个惟一键)。这是一个嵌套Observable的例子,并没有那么糟糕。当我们订阅外部观察时,每个发出的值实际上是另一个Observable(GroupedObservable),您可以订阅它.例如,每个内部流可以提供相互关联的事件(比如相同的关联ID),但是,内部流与另一个流无关,可以分别单独处理。