有许多方法可以构造一个支持下游背压请求的Observable。最简单的解决方案是使用像range()或者from(Iterable<T>)这样的内置工厂方法。后者创建一个依靠Iterable的源,但是内置了背压。这意味着,这样一个Observable不会一次从Iterable发射出所有的值;更确切地说,它将会随着消费者需求的逐渐增加而逐渐增加。注意,这并不意味着要首先将所有数据都加载到List<T>中(扩展Iterable<T>)。Iterable基本上是一个Iterator的工厂,因此我们可以安全动态的装载数据。

一个令人感兴趣的支持背压的Observable的例子是,将JDBC获得的Result Set(结果集)包装为一个流。注意,ResultSet是基于拉的,就像启用背压的Observable。但是它不是一个Iterable或Iterator,因此我们必须首先将它转换为 Iterator<Object[]>—一个Object[]是一个数据库中对单个行进行松散类型的表示:

public class ResultSetIterator implements Iterator<Object[]> {
    private final ResultSet rs;
    public ResultSetIterator(ResultSet rs) {
        this.rs = rs;
    }
    @Override
    public boolean hasNext() {
        return !rs.isLast();
    }
    @Override
    public Object[] next() {
        rs.next();
        return toArray(rs);
    }
}

上面的转换器是从Apache Commons DbUtils开放源码库中提取的ResultSetIterator ,这是一个非常简化的版本,没有错误处理。这个类还提供了对Iterable<Object[]> 的简单转换:

public static Iterable<Object[]> iterable(final ResultSet rs) {
    return new Iterable<Object[]>() {
        @Override
        public Iterator<Object[]> iterator() {
            return new ResultSetIterator(rs);
        }
    };
}

PS:ResultSet handling

请记住,将ResultSet作为一个Iterator迭代器(特别是Iterable)是一个有漏洞的抽象。首先,ResultSet像Iterator,但与Iterable不同。您只能遍历Iterator迭代器一次,通常这也适用于ResultSet。其次,Iterable是一个新生Iterator 的工厂,而前面的转换器总是返回一个由相同ResultSet 支持的Iterator 。这意味着调用iterator()两次将不会产生相同的值—-----两个Iterator 将在相同的ResultSet上进行竞争。最后,ResultSet 必须在完成时关闭,但是Iterator 没有这样的生命周期。完全依赖客户端代码读取Iterator 来完成清理工作是过于乐观的。

所有这些转换器就绪后,我们终于可以构建Observable<Object[]>,它由ResultSet和背压所支持:

Connection connection = //...
PreparedStatement statement =
    connection.prepareStatement("SELECT ...");
statement.setFetchSize(1000);
ResultSet rs = statement.executeQuery();
Observable<Object[]> result =
    Observable
        .from(ResultSetIterator.iterable(rs))
        .doAfterTerminate(() -> {
            try {
                rs.close();
                statement.close();
                connection.close();
            } catch (SQLException e) {
                log.warn("Unable to close", e);
            }
});

结果Observable支持背压,因为内置的from()操作符支持背压。因此,Subscriber的吞吐量是不再相关,我们将不再看到MissingBackpressureException。注意,setFetchSize()是必需的;否则,一些JDBC驱动程序可能会尝试将所有记录加载到内存中,如果我们想要在将一个大的结果集上进行流化,则会非常低效。

正如我们已经提到的,支持反压力的底层机制是一个Producer 的自定义实现。然而,这个任务非常容易出错,因此创建了一个helper类,它名字是“SyncOnSubscribe”。Observable.OnSubscribe的实现是基于拉(pull)的,,并且透明的内置了背压 。让我们从最简单的无状态Observable 例子开始-------这在现实生活中是很难找到的。这种类型的Observable在onNext()调用之间不具有任何状态。但即使是最简单的range()或just()也必须记住哪些项已经发出。下面是发射随机数的无状态Ibservable:

import rx.observables.SyncOnSubscribe;
Observable.OnSubscribe<Double> onSubscribe =
    SyncOnSubscribe.createStateless(
        observer -> observer.onNext(Math.random())
    );
Observable<Double> rand = Observable.create(onSubscribe);

变量名为rand的Observable是一个普通的Observable,您可以转换、合并和订阅。但在底部,它有成熟完整的背压支持。如果订阅者或管道中的任何其他操作符请求的事件数量有限,那么这个Observable 将正确地服从命令。我们必须提供给createStateless()的惟一东西是为每个请求的事件调用的lambda表达式;因此,如果下游调用request(3),假设每次调用只发出一个事件,这个自定义的lambda表达式将被调用三次。在调用之间没有上下文(状态),因此称为无状态。

现在,让我们构建一个有状态的操作符。这种SyncOnSubscribe的变体允许在调用之间传递一个不可变的状态变量。此外,每个调用必须返回一个新的状态值。作为一个例子,我们将建立一个无界的自然数生成器,从0开始。如果你想用单调递增的自然数压缩一个任意长的序列,这样的运算符实际上是非常有用的。range()也会起作用,但它需要提供一个上限,这并不总是实用的:

Observable.OnSubscribe<Long> onSubscribe =
                SyncOnSubscribe.createStateful(
                        () -> 0L,
                        (cur, observer) -> {
                            observer.onNext(cur);
                            return cur + 1;
                        }
                );
Observable<Long> naturals = Observable.create(onSubscribe);

这一次,我们为createStateful()工厂方法提供两个lambda表达式。在这种情况下,第一个惰性的初始化状态值-----在我们例子中该初始化值是0。第二个表达式更重要:它应该基于当前状态将一个条目推到下游,并返回新的状态值。这个状态被期望是不可变的,因此这个方法允许返回一个新的状态,而不是改变它。您可以很容易地重写自然Observable ,以便返回BigInteger,并防止假设溢出。这种Observable 可以产生无限数量的自然数字,但完全支持背压。这意味着它可以根据Subscribers的喜好调整产生事件的速度。将其与幼稚的实现相比较,这无疑要简单得多,但在Subcribers速度太慢的情况下并不适合:

Observable<Long> naturals = Observable.create(subscriber -> {
    long cur = 0;
    while (!subscriber.isUnsubscribed()) {
        System.out.println("Produced: " + cur);
        subscriber.onNext(cur++);
    }
});

如果您喜欢使用单独的一个状态变量,在遍历它(比如JDBC的ResultSet)的过程中会发生改变这个状态变量,那么SyncOnSubscribe也为您提供了一种方法。下面的代码不能编译通过,因为有检查异常,但是我们想首先强调整体的使用模式:

ResultSet resultSet = //...
Observable.OnSubscribe<Object[]> onSubscribe = SyncOnSubscribe.createSingleState(
    () -> resultSet,
    (rs, observer) -> {
        if (rs.next()) {
        observer.onNext(toArray(rs));
        } else {
        observer.onCompleted();
        }
        observer.onNext(toArray(rs));
    },
    ResultSet::close
);
Observable<Object[]> records = Observable.create(onSubscribe);

有三个回调要实现:

  • 状态生成器(Generator)。这个lambda被调用一次,以产生状态变量,它将作为参数传递给后续的表达式。
  • 生成下一个值的回调(Callback),通常是基于状态来做的。这个回调可以自由地改变第一个参数传递来的状态值。
  • 昂取消订阅时候会调用第三个回调函数。这是清理ResultSet的地方

带有错误处理的更完整的实现看起来如下。注意,在取消订阅期间发生的错误很难在下游正常传播:

Observable.OnSubscribe<Object[]> onSubscribe = SyncOnSubscribe.createSingleState(
    () -> resultSet,
    (rs, observer) -> {
        try {
            rs.next();
            observer.onNext(toArray(rs));
        } catch (SQLException e) {
            observer.onError(e);
        }
    },
    rs -> {
        try {
            //Also close Statement, Connection, etc.
            rs.close();
        } catch (SQLException e) {
            log.warn("Unable to close", e);
        }
    }
)

SyncOnSubscribe是一个方便的实用工具,它允许您编写支持背压的Observable .与 Observable.create()相比,他可能要复杂一点,但是由Subscribe控制背压的好处很难被低估。您应该避免直接使用create()操作符,而是应该考虑内置的工厂方法,比如from()或SyncOnSubscribe。

Backpressure是一种功能强大的机制,Subscriber可以控制Observable的节流。反馈通道显然带来了一些开销,但松散耦合可管理的生产者和消费者是有巨大的优势的。Backpressure经常被批处理,因此开销很小,但是如果Subscriber真的很慢(甚至是短暂的),那么这个缓慢就会立即反映出来,整个系统的稳定性就会得到保护。通过使用onBackpressure *()方法,可以在一定程度上减轻没有背压的缺陷,但这不是长远之计。

在创建Observables时,请考虑正确处理背压请求。毕竟,您无法控制Subscribers的吞吐量。另一种方法是避免在Subscribe做重量级的工作,而是将其卸载到flatMap()。例如,与其在subscribe()中存储数据库中的事件,不如尝试这样做:

source.subscribe(this::store);

考虑让store具有更多的响应性(让它返回保存记录的Observable<UUID >),并让订阅只会触发订阅操作和其他的副作用:

source
    .flatMap(this::store)
    .subscribe(uuid -> log.debug("Stored: {}", uuid));

甚至,批处理UUID以减少日志框架的开销:

source
    .flatMap(this::store)
    .buffer(100)
    .subscribe(
        hundredUuids -> log.debug("Stored: {}", hundredUuids))

通过避免在subscribe()中处理中处理长时间的工作,我们减少了对背压的需求,但是提前考虑它仍然是个好主意。请参阅JavaDocs,以了解操作符是否支持背压。如果缺少这样的信息,操作符很可能不会受到背压的影响,比如map()操作符

results matching ""

    No results matching ""