Hystrix最先进的特性之一是批处理请求。假设在处理一个上游请求的过程中,您正在处理几个小的下游请求。例如,假设您正在显示一个图书列表,而对于每本书,您必须向外部系统请求其评级:

Observable<Book> allBooks() { /* ... */ }
Observable<Rating> fetchRating(Book book) { /* ... */ }

allBooks()方法返回我们想要处理的Book的流,而fetchRating()检索每本书的评分。幼稚的实现只会遍历所有的图书,并逐个检索评级。幸运的是,在RxJava中异步运行子任务非常简单:

Observable<Rating> ratings = allBooks()
    .flatMap(this::fetchRating);

下面的图表是不使用flatMap()调用fetchRating()的流程图。Send阶段表示发送请求,proc表示服务端处理过程,recv表示传输响应。 服务器端处理的proc和用于传输响应的recv。下面的图片说明了抓取数据的顺序:

PS-IMAGE

下图演示了如何使用flatMap()进行抓取:

PS-IMAGE

这很好,通常我们会看到令人满意的性能表现。所有fetchRating()调用都是并发执行的,大大提高了延迟。但是,如果您认为每次调用fetchRating()都意味着一定数量的网络延迟,那么像这样对几十本书分别调用就是一种浪费了。对所有的书籍发出一个批处理的请求并收到一个包含所有书籍评级的响应,这听起来更有效率:

PS-IMAGE

请注意所有的阶段:发送、处理和接收都会稍微慢一些。所有这些都可以传输或处理更多的数据,所以这是可以理解的。因此,与多个小请求相比,总延迟实际上更高。改善是有问题的。但你必须看更大的图景。

尽管单个请求的延迟增加了,但是系统吞吐量可能大大提高了。我们可以执行的并发连接数量、网络吞吐量和JVM线程是有限的和稀缺的资源。如果您请求的依赖项具有有限的吞吐量,而利用相对较少的并发性事务就可以使其饱和。擅自使用flatMap()可以提高单个请求的延迟,但是它会通过饱和资源来降低所有其他请求的性能。因此,我们可能需要牺牲一些延迟来实现更好的总体吞吐量,而且不会对下游依赖项产生过多的负载。最后,延迟实际上也得到了改进:请求在共享资源方面更加公平,因此延迟更容易预测。

那么我们如何实现批处理呢?Hystrix知道您执行的每个单独的命令。当它发现您将要同时启动两个类似的命令时(例如,取两个图书评级),它可以将这两个命令折叠成一个更大的批处理命令。这个批处理命令被调用,当批处理响应到达时,响应被映射回单个请求。首先,我们需要一个批处理命令的实现,它可以同时检索多个图书的评级:

class FetchManyRatings extends HystrixObservableCommand<Rating> {
    private final Collection<Book> books;
    protected FetchManyRatings(Collection<Book> books) {
        super(HystrixCommandGroupKey.Factory.asKey("Books"));
        this.books = books;
    }
    @Override
    protected Observable<Rating> construct() {
        return fetchManyRatings(books);
    }
}

fetchManyRatings()方法以Book集合作为参数,并发射出多个Rating(即图书的评级)实例。在内部,它可以生成一个请求多个图书评级的的批操作HTTP请求,而不是像 fetchRating(book)方法那样,一次只能接受一个值。索要多个Rating肯定会慢,但是肯定比串行索要图书评级要快。但是,我们不希望通过手动批处理几个单独的请求,然后解压缩批处理的响应。在处理单个事务时,这可能很简单,但是如果我们有多个并发客户端,每个客户都索要一些图书评级,那该怎么办?当两个浏览器的两个独立的请求击中我们的服务器时,我们仍然希望将这两个请求一起批量处理,并只调用一个下游调用。然而,这需要线程间同步和所有请求的全局注册表。假设一个线程试图调用一个给定的命令,另一个线程在几毫秒后调用相同的命令(使用不同的参数)。我们希望在第一个请求尝试启动一个命令后稍等片刻,以防另一个线程在此后不久调用相同的命令。在这种情况下,我们希望捕获这两个请求,将它们合并在一起,只生成一个批处理请求,并将批处理响应映射回单个请求。这正是Hystrix在我们的帮助下可以做到的:

public class FetchRatingsCollapser
    extends HystrixObservableCollapser<Book, Rating,
        Rating, Book> {
    private final Book book;
    public FetchRatingsCollapser(Book book) {
        //Explained below
    }
    public Book getRequestArgument() {
        return book;
    }
    protected HystrixObservableCommand<Rating> createCommand(
        Collection<HystrixCollapser.CollapsedRequest<Rating, Book>> requests) {
        //Explained below
    }
protected void onMissingResponse(
    HystrixCollapser.CollapsedRequest<Rating, Book> r)
{
    r.setException(new RuntimeException("Not found for: "
+ r.getArgument()));
}
    protected Func1<Book, Book> getRequestArgumentKeySelector() {
        return x -> x;
    }
    protected Func1<Rating, Rating> getBatchReturnTypeToResponseTypeMapper() {
        return x -> x;
    }
    protected Func1<Rating, Book> getBatchReturnTypeKeySelector() {
        return Rating::getBook;
    }
}

这里有很多代码,让我们一步一步地分析它。当我们想要检索某本书的一个评级时,我们会创建一个FetchRatingsCollapser的实例:

Observable<Rating> ratingObservable =
    new FetchRatingsCollapser(book).toObservable();

客户端代码完全忽略了正在发生的批处理和崩溃,这要感谢HystrixObservableCollapser。从外部看,我们使用它就好像它在为一本书检索一个评级。但在内部,有一些有趣的细节允许批处理。首先,在构造函数中,除了为这个请求存储Book之外,我们还配置崩溃的请求:

public FetchRatingsCollapser(Book book) {
    super(withCollapserKey(HystrixCollapserKey.Factory.asKey("Books"))
        .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
            .withTimerDelayInMilliseconds(20)
            .withMaxRequestsInBatch(50)
            )
        .andScope(Scope.GLOBAL));
    this.book = book;
}

withTimerDelayInMilliseconds()配置的20毫秒是指发生崩溃的时间窗口长度是多少(默认值是10毫秒)。当第一个单独的请求发生时,一个20毫秒的计时器延迟它的实际调用。在这段时间内,Hystrix等待其他的请求,可能来自其他线程。Hystrix延迟第一个请求,以查看是否有更多相同类型的命令到达。当这个时间流逝或多达50个请求已经排队(参考withMaxRequestsInBatch(50)的参数)时,网关就会被打开。此时,该库应该在单个批处理中调用所有排队的命令。但是,Hystrix不会神奇地将你的命令变成一个;你必须指导它怎么做。下面是如何做到这一点:

protected HystrixObservableCommand<Rating>
createCommand(
    Collection<HystrixCollapser.CollapsedRequest<Rating,
Book>> requests) {
    List<Book> books = requests.stream()
        .map(c -> c.getArgument())
        .collect(toList());
    return new FetchManyRatings(books);
}

createCommand()方法的职责是将单个请求转换成一个批处理命令。它接收在20毫秒时间的事件窗口内收集的所有请求的集合,现在应该将其合并成单个的批处理请求。在我们的例子中,我们构造了一个FetchManyRatings命令的实例,该命令接收所有请求Ratings的Book。请注意,HystrixObservableCommand允许返回多个值,这正是我们正在寻找的。

当值开始从fFetchManyRatings 中出现时,我们必须以某种方式将Rating实例映射回独立的请求。记住,此时我们可能有几个单独的线程和事务,每个线程都在等待一个Rating。在以下方法的帮助下,对小请求的批处理的响应的路由和分派或多或少地自动发生:

  • getRequestArgumentKeySelector()

将一个单独的请求参数(Book)映射到一个用于映射批处理响应的一个键。在我们的例子中,我们只是使用了相同的Book实例,因此只需要使用恒等式x->x转换即可。

  • getBatchReturnTypeToResponseTypeMapper()

这将一个条目从批处理响应映射到单个响应。同样,在我们的例子中x -> x恒等式就够了

  • getBatchReturnTypeKeySelector()

你可以使用这个方法来指示Hystrix,该响应( Rating )是针对哪个键(Book)的应答。为了简化起见,批处理返回的每个Rating都有一个getBook()方法,该方法指示它与哪一本书相关。

有了这所有的方法(尤其是最后一个:getBatchReturnTypeKeySelector()),Hystrix通过请求键(Book)准备相应个人请求的映射,每当一个新的Rating出现在批处理响应中,它就可以自动映射该响应回相应的请求。

这是相当多的管道工作来使得批处理运行起来,但它的回报快速的。当多个客户机访问相同的下游依赖项时,例如缓存服务器----我们可以将许多请求收集到一个里。这大大降低了带宽成本。当我们的依赖项成为瓶颈时,并且吞吐量是有限的,崩溃请求大大减少了对依赖项的负载。然而,批处理为客户端带来了额外的延迟。通过默认配置为10毫秒(withTimerDelayInMilliseconds(10)),在高负载下,每个请求平均延迟5毫秒。实际的延迟取决于请求是否刚刚启动一个新的计时器,或者请求是在当前批处理即将崩溃之前出现。

注意,在低负载下,批处理是没有意义的。如果很少有多个请求形成批处理,那么您几乎不会为每个请求添加10毫秒的额外延迟。这是Hystrix毫无意义的等待,期待着其他的请求,但是实际上请求频率很小。因此,请求批处理的调优很重要。首先,如果您的计时器延迟是10毫秒,那么只有当您每秒至少发出100个请求时,批处理才有意义。否则,很少会有多个请求组成批处理。

PS:Tuning withTimerDelayInMilliseconds

人们很容易取一个很长的定时器延迟,让尽可能多的请求变成单个的批处理。100毫秒甚至1秒的值是很好的,但是这在产生大量的流量和延迟不是问题的离线系统中会更好。

批处理是在高负载下工作最好的特性。因此,Hystrix提供了非常全面的监控机制,帮助您了解整个系统的性能。

results matching ""

    No results matching ""