传统的api大部分时间都在阻塞,这意味着它们迫使您同步等待结果。这种方法工作的很好,至少在你 听说过RxJava之前。但是,当需要将数据从API生产者推给消费者时,阻塞的API尤其成问题——这是RxJava真正发挥作用的领域。这样的例子不胜枚举,API设计人员采用了各种方法。通常,我们需要提供一些API调用的回调,通常称为事件监听器。最常见的场景之一是Java Message Service (JMS)。使用JMS通常需要实现应用程序服务器或容器通知每个传入消息的类。我们可以用相对容易的Observable来替换这些侦听器,它更加健壮和通用。传统的侦听器与下面这个类很类似,下面是在Spring框架中使用JMS的支持,但是我们的解决方案是技术无关的:

@Component
class JmsConsumer {
    @JmsListener(destination = "orders")
    public void newOrder(Message message) {
        //...
    }
}

当收到JMS消息时,JmsConsumer类必须决定如何处理它。通常,在消息使用者内部调用一些业务逻辑。当一个新组件想要被这些消息通知时,它必须适当地修改JmsConsumer。想象一下,Observable<Message>可以被任何人订阅,此外,还可以使用整个RxJava的操作符,允许映射、过滤和组合功能。将一个基于回调的和push的API转换为一个Observable的最简单的方法就是使用Subjects。每次发送新的JMS消息时,我们都将该消息推送到一个外表看起来只是普通Hot类型Observable的PublishSubject 上。

private final PublishSubject<Message> subject = PublishSubject.create();
    @JmsListener(destination = "orders", concurrency="1")
    public void newOrder(Message msg) {
        subject.onNext(msg);
        }
    Observable<Message> observe() {
        return subject;
    }

记住,Observable< Message >是Hot类型的;它一旦被消费就开始发出JMS消息。如果此刻没有人订阅,消息就会丢失。ReplaySubject 是一个替代版本,但是因为它缓存了应用程序启动后的所有事件,它不适合长时间运行的流程。如果您的订阅者必须接收所有消息,请确保在初始化JMS消息侦听器之前订阅它。此外,我们的消息侦听器有一个concurrency="1"的参数,以确保不会从多个线程调用Subject。作为替代方案,您可以使用Subject.toSerialized() 。

作为附带说明,Subject 比较容易学习,但在一段时间之后就会出现问题。在这种特殊情况下,我们应该使用更习惯的Observable.create来替换掉Subject:

public Observable<Message> observe(
    ConnectionFactory connectionFactory,
    Topic topic) {
    return Observable.create(subscriber -> {
        try {
            subscribeThrowing(subscriber, connectionFactory, topic);
        } catch (JMSException e) {
            subscriber.onError(e);
        }
    });
}
private void subscribeThrowing(
        Subscriber<? super Message> subscriber,
        ConnectionFactory connectionFactory,
        Topic orders) throws JMSException {
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(true, AUTO_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer(orders);
    consumer.setMessageListener(subscriber::onNext);
    subscriber.add(onUnsubscribe(connection));
    connection.start();
}
private Subscription onUnsubscribe(Connection connection) {
    return Subscriptions.create(() -> {
        try {
            connection.close();
        } catch (Exception e) {
            log.error("Can't close", e);
        }
    });
}

JMS API提供了从代理接收消息的两种方式:通过同步阻塞 receive()方法,非阻塞方式使用 MessageListener。非阻塞API有很多好处;例如,它拥有更少的资源,比如线程和堆栈内存。它也与Rx编程风格完美地结合在一起。与其创建MessageListener实例并从其中调用我们的订阅者,我们可以使用方法引用这个简洁的语法

consumer.setMessageListener(subscriber::onNext)

此外,我们必须注意资源清理和错误的处理。这个微小的转换层允许我们轻松地使用JMS消息,而不必担心API内部信息。这里有一个例子,使用流行的ActiveMQ消息传递代理在本地运行:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
ConnectionFactory connectionFactory =
    new ActiveMQConnectionFactory("tcp://localhost:61616");
Observable<String> txtMessages =
    observe(connectionFactory, new ActiveMQTopic("orders"))
    .cast(TextMessage.class)
    .flatMap(m -> {
        try {
            return Observable.just(m.getText());
        } catch (JMSException e) {
            return Observable.error(e);
        }
    });

和JDBC一样,JMS也有大量使用检查异常JMSException的声明,即使在TextMessage上调用getText()时也是如此。要正确处理错误(请参阅第243页上的“错Error Handling”了解更多的细节),我们使用flatMap()并包装异常。从那时起,您就可以像任何其他异步和非阻塞流一样处理JMS消息。顺便说一下,我们使用了cast()操作符,它乐观地将上游事件转换为给定类型,否则,失败的话会使用onError()。cast()基本上是一个特殊的map()操作符,它的行为像map(x - >(TextMessage)x)

results matching ""

    No results matching ""