在这本书中,有几个理由来研究Hystrix。首先,它是建立在RxJava之上的,这使得它成为现实生活中响应式拓展的一个很好的实例。其次,我们可以调用Hystrix命令,并获得一个Observable的返回值。最后,但最重要的是,Hystrix支持非阻塞命令(参见294页上的“带HystrixObservableCommand的非阻塞命令”)。
在继续之前,让我们探讨一下如何在最简单的阻塞场景中使用Hystrix。一个经验法则是,在执行过程或机器运行时,使用Hystrix。进行网络调用(同时访问I / O)会显著增加失败的风险:风险可能包括不可预测的延迟、网络分区和包丢失。当我们识别出这样一个潜在的危险代码块时,我们将它封装在一个HystrixCommand中:
import org.apache.commons.io.IOUtils;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
class BlockingCmd extends HystrixCommand<String> {
public BlockingCmd() {
super(HystrixCommandGroupKey.Factory.asKey("SomeGroup"));
}
@Override
protected String run() throws IOException {
final URL url = new URL("http://www.example.com");
try (InputStream input = url.openStream()) {
return IOUtils.toString(input, StandardCharsets.UTF_8);
}
}
}
在run()方法中包装可能失败的阻塞代码。该方法的结果类型T由HystrixCommand <T >的泛型来定义的。如果我们想要参数化操作(例如,使用不同的URL),它必须通过构造函数传递。HystrixCommand是命令设计模式的一个实现。
有了HystrixCommand的实例,我们现在必须以某种方式执行它。有两种方法可以阻塞:调用execute或通过获取的一个Future来调用,这两种方法对我们来说都不是很有趣:
String string = new BlockingCmd().execute();
Future<String> future = new BlockingCmd().queue();
execute()方法通过超时、高级错误处理等的安全网络间接调用run()方法。你可以在第295页的“隔板模式和快速失败”中了解更多。该方法会阻塞,只有当底层run()完成或抛出异常时,方法才会返回。在这种情况下,将异常传播给调用者。另一方面,queue()是非阻塞的,但是返回的是Future <T>。旧的Future 接口并不是特别具有响应性,因此我们在本书中对execute()和queue()都不是很感兴趣。
PS:请注意,我们总是创建一个BlockingCmd的新实例——您不能为多次执行重用一个命令实例。HystrixCommand应该在执行之前直接创建,而不能重用它。在实践中,您通常会在构造上参数化命令,因此实例的可重用性仍然值得怀疑。
Hystrix支持Observable作为一流公民,可以以流的形式返回命令的结果:
Observable<String> eager = new BlockingCmd().observe();
Observable<String> lazy = new BlockingCmd().toObservable();
observe()和toObservable()之间的语义差异非常重要。toObservable()将命令转换为惰性和Cold类型的Observable——命令将不会执行,直到有人真正订阅了这个Observable。此外,Observable是不支持缓存的,因此每个 subscribe()都将触发命令执行。 相比之下,observe()异步调用命令,返回一个Hot类型的Observable,而且支持缓存。正如我们在121页的“Embracing Laziness”中学到的 ,惰性的Observables非常方便;例如,我们可以在任何时候创建它们,但是跳过订阅,避免任何类似于网络调用的副作用。我们还可以非常有效地批处理请求。但是,在有多个订阅者的情况下,Cold类型的Observable 可能会出现多次调用某个操作的风险。cache()操作符可以在这些情况下提供帮助。在一般情况下,惰性允许最高效的并发性,因此您应该更倾向于 toObservable()而不是 observable(),除非您有一个非常有效的点来急切地调用一个命令。
有一个Observable ,你可以对其应用各种操作;例如,您可以使用retry()重新尝试失败的命令:
Observable<String> retried = new BlockingCmd()
.toObservable()
.doOnError(ex -> log.warn("Error ", ex))
.retryWhen(ex -> ex.delay(500, MILLISECONDS))
.timeout(3, SECONDS);
前面的管道调用一个命令,但是一旦失败的话,会在500毫秒后重试。然而,重试可能需要3秒钟;在上面这个TimeoutException被抛出(参见第251页“Timing Out When Events Do Not Occur”)。稍后我们将了解如何在Hystrix中构建超时才会有所帮助。