rx.Single如果不提供可组合的操作符,将是无用的。您将遇到的最重要的操作符是 Single.zip(),它与Observable.zip(参见在第79页上的“Pairwise Composing Using zip() and zipWith()”)一样,但是语义更简单。Single总是发出精确的一个值(或异常),因此Single. zip()(或Single. zipwith()实例版本)的结果总是恰好是一对/元组。zip()基本上是在两个基础的Singles完成时创建第三个Single的方法。
假设您正在渲染一篇文章以呈现在您的网站上。需要做三个独立的操作来满足这个请求:从数据库中读取文章内容,向一个社交媒体网站询问到目前为止的收藏的数量,并更新阅读计数指标。天真的实现不仅串行的执行这三种操作,而且如果任何步骤都很慢,也会导致一个不可接受的延迟。而通过Single,每一步都是分别建模的:
import org.springframework.jdbc.core.JdbcTemplate;
//...
Single<String> content(int id) {
return Single.fromCallable(() -> jdbcTemplate
.queryForObject(
"SELECT content FROM articles WHERE id = ?",
String.class, id))
.subscribeOn(Schedulers.io());
}
Single<Integer> likes(int id) {
//asynchronous HTTP request to social media website
}
Single<Void> updateReadCount() {
//only side effect, no return value in Single
}
作为一个例子,我们展示了如何通过传递lambda表达式来使用fromCallable来创建Single 。这个实用程序非常有用,因为它为我们管理错误处理(参见244页的“Where Are My Exceptions?”)。content()方法在使用Spring框架中的一个方便的JdbcTemplate,不显眼地从数据库加载文章内容。JDBC本质上是阻塞API,所以我们显式地调用subscribeOn()来实现Single 的异步。省略了like()和updateReadCount()的实现。您可以想像一下,like()使用RxNetty(请参阅第184页的“非阻塞HTTP客户端与RxNetty”)对某些API进行异步HTTP请求。updateReadCount()很有趣,因为它有一个 Single<Void>的类型。这表明它执行了一些没有返回值但显著延迟的副作用。然而,我们仍然可能希望被告知异步可能发生的故障。对于这种情况,RxJava也有特殊类型:Completable。这就指定了要么完成没有结果,要么异步产生异常。
将这三个操作与zip结合起来非常简单:
Single<Document> doc = Single.zip(
content(123),
likes(123),
updateReadCount(),
(con, lks, vod) -> buildHtml(con, lks)
);
//...
Document buildHtml(String content, int likes) {
//...
}
Single.zip() 接收了三个Single (它有2个或9个Single实例的重载版本),当所有三个都完成时调用我们的自定义函数。然后将这个自定义函数的结果放在Single<Document>实例中,我们可以进一步转换。您应该知道,转换不会使用Void结果。这意味着我们要等待updateReadCount()完成,但是我们不需要它的(Void的)结果。这可能是一个需求,或者可以提出可能的优化:如果updateReadCount()是异步执行的,在不需要等待它的完成或失败的情况下构建HTML文档也很有效。
现在想象一下,如果调用like()失败或需要很长很长的时间才能完成(实际上更糟糕)的时候,会发生什么。没有响应式的扩展的渲染,HTML完全失败或花费大量时间。然而,在这方面,我们的执行情况并不好。Single 支持多个操作符,例如timeout()、onErrorReturn()和onErrorResumeNext(),以增强弹性和错误处理。所有这些操作符的行为方式都与其对等的Observable相同。