SELECT * FROM PEOPLE实际上并不是最先进的SQL查询。首先,您不应该盲目地获取所有列,但是获取所有行则更具有破坏性。我们的旧API没有分页的能力,即只查看表的一个子集。在传统的企业应用中,它可能是这样的:

List<Person> listPeople(int page) {
    return query(
        "SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?",
        PAGE_SIZE,
        page * PAGE_SIZE
    );
}

本书不是SQL方面的书,所以我们将把实现的细节放在一边。这个API的作者是无情的:我们没有选择记录范围的自由选择,我们只能在基于0的页码上操作。然而,在RxJava中,由于惰性,我们实际上可以模拟从某个给定的页码开始,读取出整个的数据库:

import static rx.Observable.defer;
import static rx.Observable.from;
Observable<Person> allPeople(int initialPage) {
    return defer(() -> from(listPeople(initialPage)))
        .concatWith(defer(() ->
            allPeople(initialPage + 1)));
}

这个代码片段延迟加载数据库记录的初始页面,例如10个条目。如果没有人订阅,即使是第一个查询也没有被调用。如果有一个订阅者只使用了一些最初的元素(例如,allPeople(0). take(3)),RxJava将自动从我们的流中取消订阅,不再执行查询。那么比如说,当我们请求11个条目时候,但是第一个listPeople()调用只返回10,会发生什么呢?好吧,RxJava计算出最初的Observable值已经耗尽,但消费者仍处于饥饿状态。幸运的是,它看到concatWith()操作符,基本上是这样说的:当在左边的可Observable完成,而不是向订阅者传播完成通知,而是继续订阅右边的Observable并继续,就像什么都没有发生一样,正如下图所示:

换句话说,concatWith()可以将两个Observables连接在一起,这样当第一个完成时,第二个就接管了。在a . concatwith(b). subscribe(…)中,订阅者将首先从a中接收所有事件,然后从b接收所有事件。在这种情况下,订阅者首先接收最初的10个条目,随后又是10个条目。然而,仔细看,我们的代码中有一个所谓的无限递归! allPeople(initialPage)在没有任何停止条件的情况下调用allPeople(initialPage + 1)。这在绝大多数语言中都是一个StackOverflowError的方法,但在这里它并不是。同样,调用allPeople()总是惰性的,因此当您停止侦听(取消订阅)时,这个递归就结束了。从技术上讲,concatWith()仍然可以产生堆栈溢出错误。等到“Honoring the Requested Amount of Data”时,您将学习如何处理关于输入数据的不同需求。

以块的形式惰性加载数据的技术非常有用,因为它允许您将精力集中在业务逻辑上,而不是在低级管道上。我们已经看到了在很小的范围内使用RxJava的一些好处。设计一个带有Rx的API不会影响整个体系结构,因为我们总是可以退回到BlockingObservable和Java集合。但它还有更广的潜力,如果可能的话,我们可以进一步精简它。

results matching ""

    No results matching ""