传统的api大部分时间都在阻塞,这意味着它们迫使您同步等待结果。这种方法工作的很好,至少在你 听说过RxJava之前。但是,当需要将数据从API生产者推给消费者时,阻塞的API尤其成问题——这是RxJava真正发挥作用的领域。这样的例子不胜枚举,API设计人员采用了各种方法。通常,我们需要提供一些API调用的回调,通常称为事件监听器。最常见的场景之一是Java Message Service (JMS)。使用JMS通常需要实现应用程序服务器或容器通知每个传入消息的类。我们可以用相对容易的Observable来替换这些侦听器,它更加健壮和通用。传统的侦听器与下面这个类很类似,下面是在Spring框架中使用JMS的支持,但是我们的解决方案是技术无关的:
@Component
class JmsConsumer {
@JmsListener(destination = "orders")
public void newOrder(Message message) {
//...
}
}
当收到JMS消息时,JmsConsumer类必须决定如何处理它。通常,在消息使用者内部调用一些业务逻辑。当一个新组件想要被这些消息通知时,它必须适当地修改JmsConsumer。想象一下,Observable<Message>可以被任何人订阅,此外,还可以使用整个RxJava的操作符,允许映射、过滤和组合功能。将一个基于回调的和push的API转换为一个Observable的最简单的方法就是使用Subjects。每次发送新的JMS消息时,我们都将该消息推送到一个外表看起来只是普通Hot类型Observable的PublishSubject 上。
private final PublishSubject<Message> subject = PublishSubject.create();
@JmsListener(destination = "orders", concurrency="1")
public void newOrder(Message msg) {
subject.onNext(msg);
}
Observable<Message> observe() {
return subject;
}
记住,Observable< Message >是Hot类型的;它一旦被消费就开始发出JMS消息。如果此刻没有人订阅,消息就会丢失。ReplaySubject 是一个替代版本,但是因为它缓存了应用程序启动后的所有事件,它不适合长时间运行的流程。如果您的订阅者必须接收所有消息,请确保在初始化JMS消息侦听器之前订阅它。此外,我们的消息侦听器有一个concurrency="1"的参数,以确保不会从多个线程调用Subject。作为替代方案,您可以使用Subject.toSerialized() 。
作为附带说明,Subject 比较容易学习,但在一段时间之后就会出现问题。在这种特殊情况下,我们应该使用更习惯的Observable.create来替换掉Subject:
public Observable<Message> observe(
ConnectionFactory connectionFactory,
Topic topic) {
return Observable.create(subscriber -> {
try {
subscribeThrowing(subscriber, connectionFactory, topic);
} catch (JMSException e) {
subscriber.onError(e);
}
});
}
private void subscribeThrowing(
Subscriber<? super Message> subscriber,
ConnectionFactory connectionFactory,
Topic orders) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(true, AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(orders);
consumer.setMessageListener(subscriber::onNext);
subscriber.add(onUnsubscribe(connection));
connection.start();
}
private Subscription onUnsubscribe(Connection connection) {
return Subscriptions.create(() -> {
try {
connection.close();
} catch (Exception e) {
log.error("Can't close", e);
}
});
}
JMS API提供了从代理接收消息的两种方式:通过同步阻塞 receive()方法,非阻塞方式使用 MessageListener。非阻塞API有很多好处;例如,它拥有更少的资源,比如线程和堆栈内存。它也与Rx编程风格完美地结合在一起。与其创建MessageListener实例并从其中调用我们的订阅者,我们可以使用方法引用这个简洁的语法
consumer.setMessageListener(subscriber::onNext)
此外,我们必须注意资源清理和错误的处理。这个微小的转换层允许我们轻松地使用JMS消息,而不必担心API内部信息。这里有一个例子,使用流行的ActiveMQ消息传递代理在本地运行:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
Observable<String> txtMessages =
observe(connectionFactory, new ActiveMQTopic("orders"))
.cast(TextMessage.class)
.flatMap(m -> {
try {
return Observable.just(m.getText());
} catch (JMSException e) {
return Observable.error(e);
}
});
和JDBC一样,JMS也有大量使用检查异常JMSException的声明,即使在TextMessage上调用getText()时也是如此。要正确处理错误(请参阅第243页上的“错Error Handling”了解更多的细节),我们使用flatMap()并包装异常。从那时起,您就可以像任何其他异步和非阻塞流一样处理JMS消息。顺便说一下,我们使用了cast()操作符,它乐观地将上游事件转换为给定类型,否则,失败的话会使用onError()。cast()基本上是一个特殊的map()操作符,它的行为像map(x - >(TextMessage)x)