Observable的实例不会发出任何事件,直到有人真正对某事件感染兴趣为止。为了开始观察一个 Observable,您可以使用subscribe()方法系列:
Observable<Tweet> tweets = //...
tweets.subscribe((Tweet tweet) ->
System.out.println(tweet));
前面的代码片段通过注册回调来订阅Observable<tweets>。每当tweets流决定将事件推到下游时,都会调用这个回调。RxJava契约确保您的回调不会同时从多个线程调用,即使事件可以从多个线程中发出。有多个更加具体的重载版本的subscribe()。我们已经提到过,Observable 是不会抛出异常的。相反,异常只是Observable 的另一种通知(Error事件)。因此,不要在subscribe()调用周围使用try -catch块以捕获沿途产生的异常。相反,您应该提供一个单独的回调:
tweets.subscribe(
(Tweet tweet) -> { System.out.println(tweet); },
(Throwable t) -> { t.printStackTrace(); }
);
subscribe()的第二个参数是可选的。它对在生产项目时可能发生的异常进行通知。可以保证,在异常之后不会出现其他Tweet。您几乎总是希望订阅异常,不仅是合法的项目,即使您不期望它们。Exception异常是Observable 的一级公民。而抛出异常可以快速传播,造成许多副作用,如不一致的数据结构或失败的事务。这通常是个好主意,但通常情况下,异常并不致命。因此,弹性系统应该预期异常的发生,并有专门处理异常的系统。这就是为什么Observable 可以明确对其的建模。
subscribe()的第三个可选回调允许我们监听流完成:
tweets.subscribe(
(Tweet tweet) -> { System.out.println(tweet); },
(Throwable t) -> { t.printStackTrace(); },
() -> {this.noMore();}
);
请记住,RxJava对于产生多少条目、何时产生、何时停止并没有做强制要求。由于流可以是无限的,或者它可以在订阅之后立即完成,它将取决于订阅者是否希望接收完成通知。如果你从一开始就知道一条流是无限的,显然这是毫无意义的。另一方面,在某些情况下,流的完成可能是我们实际上在等待的事件。举例来说,可以考虑Observable<Progress>,它可以跟踪长时间运行的流程。客户机可能对跟踪进度感兴趣,也可能不感兴趣,但它肯定想知道进程何时结束。
作为附带说明,通常您可以使用Java 8方法引用而不是lambdas来提高可读性,如下所示:
tweets.subscribe(
System.out::println,
Throwable::printStackTrace,
this::noMore);