你有没有注意到 RxGroceries.purchase()需要两个参数: productName和quantity,而且quantity参数的值总是1?如果我们的购物清单上有一个产品出现了多次,表明更大的需求,这该怎么办?第一个简单的实现只发送相同的请求—例如,对于egg,买多次,每次请求一个。幸运的是,我们可以使用groupBy()来声明批处理这些请求——这仍然需要与声明式并发一起工作:
import org.apache.commons.lang3.tuple.Pair;
Observable<BigDecimal> totalPrice = Observable
.just("bread", "butter", "egg", "milk", "tomato",
"cheese", "tomato", "egg", "egg")
.groupBy(prod -> prod)
.flatMap(grouped -> grouped
.count()
.map(quantity -> {
String productName = grouped.getKey();
return Pair.of(productName, quantity);
}))
.flatMap(order -> store
.purchase(order.getKey(), order.getValue())
.subscribeOn(schedulerA))
.reduce(BigDecimal::add)
.single();
这段代码相当复杂,所以在显示输出之前,让我们快速浏览一下。首先,我们将产品简单地按其名称进行分组,因此我们使用了一个恒等函数 prod -> prod。作为返回, 我们得到了一个尴尬的Observable<GroupedObservable<String, String>> 。这并没有什么错。接下来,flatMap()接收每个代表同一名称的所有产品的GroupedObservable<String, String>。例如,会有一个 ["egg", "egg", "egg"]的Observable,它的key也是“egg”。如果groupBy()使用了不同的键函数,如 prod.length(),同样是["egg", "egg", "egg"]的Observablekey,但是key将是3。
在这一点上,在flatMap()中,我们需要构造一个Pair<String, Integer>事件类型的Observable,,代表每一个独特的产品及其数量。count()和map()都返回一个Observable,所以所有的东西都是完美的。第二个flatMap()接收Pair<String, Integer> 类型的订单,并执行购买,这一次购买数量可以更大,输出看起来完美;请注意,较大的订单稍微慢一些,但仍然比多次请求要快得多:
164 | Sched-A-0 | Purchasing 1 bread
165 | Sched-A-1 | Purchasing 1 butter
166 | Sched-A-2 | Purchasing 3 egg
166 | Sched-A-3 | Purchasing 1 milk
166 | Sched-A-4 | Purchasing 2 tomato
166 | Sched-A-5 | Purchasing 1 cheese
1151 | Sched-A-0 | Done 1 bread
1178 | Sched-A-1 | Done 1 butter
1180 | Sched-A-5 | Done 1 cheese
1183 | Sched-A-3 | Done 1 milk
1253 | Sched-A-4 | Done 2 tomato
1354 | Sched-A-2 | Done 3 egg
如果您认为您的系统可以从这种方式或其他方式中获益,请参阅第297页的“批处理和崩溃命令”