RxJava是与并发无关的,事实上,它本身并不引入并发性。但是,一些处理线程的抽象会暴露给最终用户。而且,某些操作符在没有并发的情况下不能正常工作;请参阅第163页的“Other Uses for Scheduler”。幸运的是,Scheduler类,您必须注意的唯一一个,它相当简单。原则上,它的工作方式与java . util。concurrent中的ScheduledExecutorService类似——它可以执行任意代码块,也可能在将来的某个事件执行。但是,为了满足Rx契约,它提供了一些更细粒度的抽象,您可以在第146页的高级部分“Scheduler implementation details overview”中看到更多内容。

Scheduler与subscribeOn()和observeOn()操作符一起使用,并且在创建特定类型的Observables时使用。调度器只创建负责调度和运行代码的Worker的实例。当RxJava需要调度一些代码时,它首先要求Scheduler提供一个Worker,并使用后者来调度后续的任务。稍后您将找到该API的示例,但首先要熟悉可用的内置调度器:

  • Schedulers.newThread()

每次通过subscribeOn()或observeOn()请求时,这个调度程序都会启动一个新线程。newThread()几乎不是一个好的选择,不仅是因为启动线程时所涉及的延迟,而且因为这个线程没有被重用。堆栈空间必须预先分配(通常在一个兆字节左右,由JVM的- Xss参数控制),操作系统必须启动新的本地线程。当这个Worker完成时,线程就会终止。只有当任务是粗粒度的时,这个调度器才有用:完成它需要花费大量的时间,但是它们很少,所以线程不太可能被重用。请参阅第329页上的“Thread per Connection”。在实践中,遵循 Schedulers.io()几乎总是一个更好的选择。

  • Schedulers.io()

此调度器与newThread()类似,但已经启动了的线程可以回收,并可能从来处理将来的请求。这个实现类似于java.util.concurrent中的ThreadPoolExecutor,他有一个无界的线程池。每次请求新Worker时,要么启动一个新线程(稍后它将保持空闲一段时间),要么重新使用空闲的线程。

方法的名字叫io()并不是一个巧合。考虑使用这个Scheduler为I/O绑定要非常少的CPU资源的任务。然而,它们往往需要相当长的时间,等待网络或磁盘。因此,有一个相对较大的线程池是一个好主意。但是,要注意任何类型的无界资源----如果存在缓慢或无响应的外部依赖,如web服务,io()调度器可能会启动大量线程,导致您自己的应用程序也变得毫无响应。关于如何解决这个问题,请参阅第291页的“Hystrix管理失败”。

  • Schedulers.computation()

当任务完全受cpu限制时,您应该使用一个计算调度器;也就是说,它们需要的只是计算能力,并且没有阻塞代码(从磁盘、网络读取、sleep、等待锁定等)。因为在这个调度程序上执行的每一个任务都应该充分利用一个CPU核心,与现有的核心相比,并行执行更多这样的任务不会带来多大价值。因此,在默认情况下,computation()调度器会限制运行的线程数,使其与availableProcessors()的值相同步,正如Runtime.getRuntime()中发现的实用类。

如果由于某种原因你需要一个不同于默认数量的线程,你可以使用rx.scheduler.max-computation-threads系统属性。通过更少的线程可以确保总有一个或多个CPU核闲置,甚至在高负荷的情况下,computation()线程池不会使你的服务器饱和。不可能有比内核更多的计算线程。

computation()调度器在每个线程的前面使用无界队列,因此如果任务被调度,但是所有的内核都被占用,它们就会排队。在负载峰值情况下,此调度器将保持线程数量有限。但是,在每个线程之前的队列将保持增长。

幸运的是,内置的操作符,特别是我们将要在159页的“Declarative Concurrency with observeOn()”中研究的observeOn(),会确保此调度器没有超载。

PS-BUG:这一块因为源码也没有讲解,所以翻译的比较拗口,可以先跳过。

  • Schedulers.from(Executor executor)

调度器在内部比java.util.concurrent中的Executor 更复杂。因此,因此需要一个单独的抽象。但是因为它们在概念上非常相似,因此毫不奇怪,有一个包装器可以使用from()工厂方法将Executor转换为Scheduler :

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import rx.Scheduler;
import rx.schedulers.Schedulers;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
//...
ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .build();
Executor executor = new ThreadPoolExecutor(
    10, //corePoolSize
    10, //maximumPoolSize
    0L, TimeUnit.MILLISECONDS, //keepAliveTime, unit
    new LinkedBlockingQueue<>(1000), //workQueue
    threadFactory
);
Scheduler scheduler = Schedulers.from(executor);

我故意使用这种冗长的语法来创建ExecutorService,而不是更简单的版本:

import java.util.concurrent.Executors;
//...
ExecutorService executor = Executors.newFixedThreadPool(10);

虽然很诱人,但是Executors工厂类硬编码了一些在企业应用程序中不实用甚至是危险的缺省值。例如,它使用无限制的LinkedBlockingQueue,它可以无限地增长,从而导致OutOfMemoryError错误,而其中还有大量未完成的任务。此外, 默认ThreadFactory使用一些无意义的线程名称,如pool -5-thread- 3。在剖析或分析线程转储时,正确地命名线程是一种非常有用的工具。从头开始实现ThreadFactory有点麻烦,所以我们使用了来自Guav的ThreadFactoryBuilder。如果您有兴趣调优,以进一步的利用线程池,请参阅第331页上的“Thread Pool of Connections”和第291页的“Hystrix管理失败”。从我们有意识配置的执行器中创建调度程序,为处理高负载的项目提供建议。但是,由于RxJava无法控制Executor中独立创建的线程,因此它不能确定线程(也就是说,尝试在相同的线程上保持同一任务的工作,以提高缓存位置)。这个Scheduler几乎不确保证单个Scheduler.Worker可以按顺序处理事件(参见146页的“Scheduler implementation details overview”)。

  • Schedulers.immediate()

Schedulers.immediate()是一个特殊的调度器,它以阻塞的方式在客户机线程内调用任务,而不是异步地调用。使用它是没有意义的,除非您的API的某些部分需要提供调度器,而您完全可以使用Observable 的默认行为,而不涉及任何线程。实际上,通过immediate() Scheduler订阅一个Observable(更多的信息),通常与不使用任何调度器来订阅Observable具有相同的效果。通常,避免使用此调度器,它会阻塞调用线程,并且使用有限。

  • Schedulers.trampoline()

trampoline()调度器与immediate()非常类似,因为它也在同一个线程中调度任务,有效地阻塞了任务。但是,与immediate()相反,在所有先前调度的任务完成时,即将到来的任务才会被执行。 immediate()立即调用给定的任务,而trampoline() 等待当前任务完成。Trampoline是函数式编程中的一个模式,它允许实现递归,而不需要无限地增长调用堆栈。这是最好的例子,首先涉及到immediate()。顺便说一下,注意我们不直接与Scheduler实例交互,而是先创建一个Worker。这在第146页的“Scheduler implementation details overview”中很快就能看到。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log(" Inner start");
        sleepOneSecond();
        log(" Inner end");
    });
    log(" Outer end");
    });
log("Main end");
worker.unsubscribe();

输出与预期相同;实际上,您可以使用简单的方法调用来替换schedule():

1044 | main | Main start
1094 | main | Outer start
2097 | main | Inner start
3097 | main | Inner end
3100 | main | Outer end
3100 | main | Main end

在外部块中,我们对内部块schedule(),它立即被调用,中断外部任务。当内部完成后,控制就会回到外部。同样,这只是通过 immediate() Scheduler间接地以阻塞方式调用任务的一种复杂方式。但是如果我们用Schedulers.trampoline() 替换Schedulers.immediate(),会发生什么呢?其输出是完全不同的:

1030 | main | Main start
1096 | main | Outer start
2101 | main | Outer end
2101 | main | Inner start
3101 | main | Inner end
3101 | main | Main end

你知道为什么外部的任务会先于内部的任务先结束呢?这是因为内部任务在trampoline()调度程序中排队,而该调度程序已经被外部任务占用了。当外部完成时,队列(内)的第一个任务开始。我们还可以更进一步,以确保你们理解其中的区别:

log("Main start");
worker.schedule(() -> {
    log(" Outer start");
    sleepOneSecond();
    worker.schedule(() -> {
        log(" Middle start");
        sleepOneSecond();
        worker.schedule(() -> {
            log(" Inner start");
            sleepOneSecond();
            log(" Inner end");
        });
        log(" Middle end");
    });
    log(" Outer end");
    });
log("Main end");

来自 immediate()的Scheduler的Worker输出如下:

1029 | main | Main start
1091 | main | Outer start
2093 | main | Middle start
3095 | main | Inner start
4096 | main | Inner end
4099 | main | Middle end
4099 | main | Outer end
4099 | main | Main end

而trampoline的输出如下:

1041 | main | Main start
1095 | main | Outer start
2099 | main | Outer end
2099 | main | Middle start
3101 | main | Middle end
3101 | main | Inner start
4102 | main | Inner end
4102 | main | Main end
  • Schedulers.test()

此调度器仅用于测试目的,您在生产代码中看不到它。它的主要优点是能够任意地推进时钟,模拟时间经过。TestScheduler在第260页的“Schedulers in Unit Testing”中有很大的描述。调度器本身并不是很有趣。如果您想了解它们是如何在内部工作的,以及如何实现您自己的,请查看下一节。

PS:Scheduler implementation details overview

这部分是完全可选的,如果您对实现细节不感兴趣的话,可以直接跳到第150页上的“Declarative Subscription with subscribeOn()”。

译者注:建议有一定能力的开发还是看一下,陪着我~~

调度器不仅分离了任务和任务的执行(通常是在另一个线程中运行它们),而且它还会抽象出时钟,因为我们将在第258页的“Virtual Time”中学习。与ScheduledExecutorService相比,调度器的API要简单一些:

abstract class Scheduler {
    abstract Worker createWorker();
    long now();
    abstract static class Worker implements Subscription {
        abstract Subscription schedule(Action0 action);
        abstract Subscription schedule(Action0 action,
                                        long delayTime, TimeUnit unit);
        long now();
    }
}

当RxJava想要安排一个任务(大概是,但不一定在后台)时,它必须首先请求一个Worker实例。就是这个Worker,允许在没有任何延迟的情况下或在某个时间点调度任务。Scheduler 和Worker都有一个可覆盖的时间源(即now()方法),用于确定给定任务的运行的时间。您可以天真的将Scheduler 想像成线程池,而Worker则是线程池中的线程。

Scheduler 和Worker之间的分离对于轻松实现Rx契约执行的一些指导方针是必要的,即按顺序调用订阅者的方法,而不是同时调用。Worker的契约提供的只是:同一个Worker上调度的两个任务不会同时运行。但是,来自相同Scheduler的独立Worker可以同时运行任务。

让我们来分析一下现有的Scheduler(即HandlerScheduler)的源代码,而不是通过这个API,就像在RxAndroid项目中发现的那样。此Scheduler 仅在Android UI线程上运行所有调度任务。只允许从该线程更新用户界面(参见277页的“Android Development with RxJava”获得更多详细信息)。这类似于Swing中的Event Dispatch Thread(事件派发线程EDT),其中大多数对windows和组件的更新必须在专用线程(EDT)中执行。毫无意外,还有一个RxSwing项目。

package rx.android.schedulers;
import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.Subscriptions;
import java.util.concurrent.TimeUnit;
public final class SimplifiedHandlerScheduler extends Scheduler {
    @Override
    public Worker createWorker() {
        return new HandlerWorker();
    }
    static class HandlerWorker extends Worker {
        private final Handler handler = new Handler(Looper.getMainLooper());
        @Override
        public void unsubscribe() {
            //Implementation coming soon...
        }
        @Override
        public boolean isUnsubscribed() {
            //Implementation coming soon...
            return false;
        }
        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
        @Override
        public Subscription schedule(
                Action0 action, long delayTime, TimeUnit unit) {
            ScheduledAction scheduledAction = new ScheduledAction(action); //包装action
            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));  //注意这行
            scheduledAction.add(Subscriptions.create(() ->
            handler.removeCallbacks(scheduledAction)));
            return scheduledAction;
        }
    }
}

目前Android API的细节并不重要。这里发生的情况是,每当我们在一个HandlerWorker上调度某个东西时,一块代码块就会传递给postDelayed(...)方法,该方法会在一个专用的Android线程上执行这个代码块。这里只有一个这样的线程,所以事件不仅在内部被串行化,而且还跨越Worker之间。

在我们传递要执行的action之前,我们用ScheduledAction来包装它,它实现了Runnable和Subscription。无论何时,RxJava都是惰性的——这也适用于调度任务。如果出于任何原因,您决定一个给定的action不应该被执行(当action被安排进来在未来的某个时间执行,而不是立即执行的话,这是有意义的),您只需要在从schedule()返回的Subscription 中运行unsubscribe()。正确处理取消预订是Worker 的责任。

客户端代码还可以决定从整个Worker中unsubscribe()。这将取消所有排队的任务以及释放 Worker,这样底层线程就可以在以后重用。下面的代码片段通过增加Worker退订流,增强SimplifiedHandlerScheduler(仅包括修改方法):

private CompositeSubscription compositeSubscription =
    new CompositeSubscription();
@Override
public void unsubscribe() {
    compositeSubscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
    return compositeSubscription.isUnsubscribed();
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (compositeSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    final ScheduledAction scheduledAction = new ScheduledAction(action);
    scheduledAction.addParent(compositeSubscription);
    compositeSubscription.add(scheduledAction);
    handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
    scheduledAction.add(Subscriptions.create(() ->
        handler.removeCallbacks(scheduledAction)));
    return scheduledAction;
}

在32页的“Controlling Listeners by Using Subscription and Subscriber<T>”,我们探索了SSubscriptionubsc接口,但从未真正研究过实现细节。CompositeSubscription是众多实现中的一个,它本身只是一个用于子Subscription的容器(一个Composite设计模式)。从CompositeSubscription中取消订阅意味着从它的所有子Subscription上解除订阅,当然你也可以从CompositeSubscription中删除或者添加子Subscription。

在我们的自定义Schaeduler中,CompositeSubscription用于跟踪所有从之前的schedule()调用来的 Subscription(参见compositeSubscription.add(scheduledAction))。另一方面,子ScheduledAction需要知道它的父进程(参见:addParent()),以便在操作完成或取消时它可以自行移除自己。否则,Worker就会永远积累不新鲜的子Subscription。当客户端代码决定不再需要Handler Worker实例时,它就会取消订阅。取消订阅将传播到所有(如果有的话)子Subscription上。

这是对RxJava中调度器的简要介绍。他们内部的细节在日常工作中并不是很有用;事实上,它们被这样设计的原因是可以使RxJava的使用更加直观和可预测。这就是说,让我们快速了解一下调度器是如何解决Rx中的许多并发问题的。

results matching ""

    No results matching ""