在Observable所支持的操作符的方面,Single是相当类似的,所以我们在这里不会花太多的时间。相反,我们将简单地将它们与Observable的副本进行比较,并将重点放在Single 的用例上。很少有方法可以创建Single,让我们从常量的just()和error()操作符开始:
import rx.Single;
Single<String> single = Single.just("Hello, world!");
single.subscribe(System.out::println);
Single<Instant> error =
Single.error(new RuntimeException("Opps!"));
error
.observeOn(Schedulers.io())
.subscribe(
System.out::println,
Throwable::printStackTrace
);
这里没有多参数的重载版本的just()——毕竟,Single只能按定义持有一个值。另外,subscribe()方法采用两个参数,而不是三个参数。有一个onComplete()回调是没有意义的;Single要么以一个值(第一个回调),或者是一个异常(第二个回调)完成。监听完成就相当于订阅单个值。此外,我们还包括了observeOn()操作符,它的工作原理与它对等的Observable完全相同。同样适用于subscribeOn()(参见125页中的“Imperative Concurrency”)。最后,您可以使用error()操作符创建一个始终与给定异常一起完成的Single。
让我们实现一个更真实的HTTP请求的场景。在184页的“非阻塞HTTP客户机和RxNetty”中,我们学习了如何使用RxNetty构建异步HTTP客户端。这次我们将使用在下面使用Netty的async -http- client。在发出HTTP请求之后,我们可以提供一个回调实现,当响应或错误返回时,它将被异步调用。这很好地契合了Single是如何创建的:
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;
import rx.Single;
import rx.SingleSubscriber;
import java.io.IOException;
public class SingleHTTP {
AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
Single<Response> fetch(String address) {
return Single.create(subscriber ->
asyncHttpClient
.prepareGet(address)
.execute(handler(subscriber)));
}
AsyncCompletionHandler handler(SingleSubscriber<? super Response> subscriber) {
return new AsyncCompletionHandler() {
public Response onCompleted(Response response) {
subscriber.onSuccess(response);
return response;
}
public void onThrowable(Throwable t) {
subscriber.onError(t);
}
};
}
}
create()看起来类似于Observable . create()(参见第35页“Mastering Observable.create()”),但它有一些重要的约束;您必须调用onSuccess()一次或onError()一次。从技术上讲,它也可以有一个从未完成的Single ,但是不允许有多个onSuccess()调用。说到Single. create(),你也可以尝试 Single.fromCallable(),它接收一个Callable<T>,返回一个Single<T>。它很简单。
回到我们的HTTP客户端示例,当响应返回时,我们通过调用onSuccess()来通知订阅者,或者在异步失败回调时通过onError()将异常传播。你也可以以Observable的风格来使用Single :
Single<String> example =
fetch("http://www.example.com")
.flatMap(this::body);
String b = example.toBlocking().value();
//...
Single<String> body(Response response) {
return Single.create(subscriber -> {
subscriber.onSuccess(response.getResponseBody());
});
}
//Same functionality as body():
Single<String> body2(Response response) {
return Single.fromCallable(() ->
response.getResponseBody());
}
不幸的是,Response.getResponseBody()抛出IOException,因此我们不能简单地说:map(Response::getResponseBody)。但至少我们看到了 Single.flatgMap() 是如何工作的。通过将具有潜在危险的getResponseBody()方法包装为Single<String> ,我们确保潜在的失败被封装并在类型系统中清楚的表达出来。fl Single.flatMap() 可以像您所期望的那样工作,正如你所知道的Observable.flatMap() :如果计算的第二阶段(在我们的例子中是 this::body)失败了,那么整个Single也会失败。有趣的是,Single有map()和flatMap(),但没有filter()。你能猜出为什么?如果不符合某些 Predicate<T>,则filter()可能会过滤掉Single< T >的内容。但是,Single< T >必须在其中只有一个项,而filter()可能会导致Single没有值。
就像BlockingObservable (见118页的BlockingObservable: Exiting the Reactive World”)一样,Single也有自己的BlockingSingle,通过Single.toBlocking()创建。类似地,创建BlockingSingle<T>还没有阻塞。但是,调用它的value()则会阻塞,直到类型T的值(在我们的示例中是包含响应主体的字符串)可用为止。在异常情况下,它将从value()方法中重新抛出。