Netty结合RxJava提供了一种非常接近网络工作方式的抽象。它并不是假装HTTP请求类似于JVM中的普通方法调用,而且它还支持异步。此外,我们不能再假装HTTP只是一个请求-响应协议。server-sent event(服务器发送事件:一个请求、多个响应)、WebSockets(全双工通信)的出现,以及最终的HTTP / 2(许多并行请求,并在同一条线上响应,彼此交错)揭示了HTTP的许多不同的使用场景。
客户端的RxNetty为最简单的用例提供了相当简洁的API。你提出了一个请求,并得到了容易组合的Observable结果:
Observable<ByteBuf> response = HttpClient
.newClient("example.com", 80)
.createGet("/")
.flatMap(HttpClientResponse::getContent);
response
.map(bb -> bb.toString(UTF_8))
.subscribe(System.out::println);
调用createGet()方法返回Observable<HttpClientResponse>的子类。显然,客户端不会阻塞等待响应,所以 Observable看起来是个不错的选择。但这仅仅是个开始。HttpClientResponse本身有一个getContent()方法,它返回 Observable<ByteByf>。如果您从第169页的“Netty和RxNetty非阻塞HTTP服务器”中回想起,ByteBuf是对接收到的数据块的抽象。从客户端的角度来看,这是响应的一部分。这是正确的,与其他非阻塞HTTP客户端相比,RxNetty要更进一步,因为那些非阻塞的客户端在整个响应完全到达时候,它不会简单地通知我们。相反,我们得到的是ByteBuf消息流,在服务器决定放弃连接时,随后可选的的是Observable的Completion通知。
这样的模型更接近于TCP / IP堆栈的工作方式,并且在使用方面有更好的扩展性。它可以处理简单的请求/响应流动以及复杂的流场景。但是要注意,即使是在单个响应的情况下------例如,一个包含html的响应----它很可能会以多个数据块的形式出现。当然,RxJava有很多方法可以将它们组合到一起,比如 Observable.toList()或Observable.reduce()。但这是你的选择: if you want to consume data as it comes in small bits,那是绝对可以的。在这方面,RxNetty是相当低级的,但因为抽象不会造成像过度缓冲或阻塞这样的主要性能瓶颈,这提高了可伸缩性。如果您正在寻找一个具有响应性且健壮但更高级的HTTP客户端,参考280页的“Retrofit with Native RxJava Support”。
相对于基于回调的响应式API,RxNetty与其他Observables很好地结合在一起,您可以很容易地并行化、合并和分割工作。例如,假设您有一个URL的流,您必须实时地连接和使用数据。这个流可以是固定的(比如只是一个 List<URL>),或者是动态的,新的URL总是随时出现。如果你想要一个流过所有这些来来源的源源不断的数据包,您可以简单地将它们( flatMap():
Observable<URL> sources = //...
Observable<ByteBuf> packets =
sources
.flatMap(url -> HttpClient
.newClient(url.getHost(), url.getPort())
.createGet(url.getPath()))
.flatMap(HttpClientResponse::getContent);
这是一个稍微有点做作的例子,因为它混合了来自不同来源的ByteBuf消息,但你懂的。对于上游Observable的每个URL,我们从该URL生成ByteByf实例的异步流。如果您想要首先转换传入的数据,比如将数据块组合成一个事件——您可以轻松地完成这一任务,例如使用reduce()。结果是:您可以轻松地拥有成千上万的开放HTTP连接,它们或者空闲或者在接收数据。限制因素不再是内存,而是CPU的处理能力和网络带宽。JVM不需要消耗gb的内存来处理合理数量的事务。
HTTP APIs是现代应用程序的主要瓶颈之一。它们在CPU方面并不昂贵,但是阻塞HTTP的行为就像一个普通的过程调用,极大地限制了可伸缩性。即使您小心地删除了阻塞HTTP、通信,同步代码,也会出现在最令人惊奇的地方。这是 java.net.URL中的equals()方法的陷阱,它会发出一个网络请求。这是正确的:当您比较URL类的两个实例时,这个看似快速的方法会进行网络往返(调用序列,从头到尾读取):
java.net.URL.equals(URL.java)
java.net.URLStreamHandler.equals(URLStreamHandler.java)
java.net.URLStreamHandler.sameFile(URLStreamHandler.java)
java.net.URLStreamHandler.hostsEqual(URLStreamHandler.java)
java.net.URLStreamHandler.getHostAddress(URLStreamHandler.java)
java.net.InetAddress.getByName(InetAddress.java)
java.net.InetAddress.getAllByName(InetAddress.java)
java.net.InetAddress.getAllByName0(InetAddress.java)
java.net.InetAddress.getAddressesFromNameService(InetAddress.java)
java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java)
[native code]
为了确定两个URL是否相等,JVM调用lookupAllHostAddr(),它会调用gethostbyname方法(或类似的方法),它可以对DNS服务器发出同步请求。这可能会造成灾难性的影响,因为您只有少数线程,并且其中一些线程意外地被阻塞。还记得我们RxNetty-based服务器吗?他们最多使用了几十个线程。另一个可能会发生灾难的地方是URL.equals()经常被调用,例如在Set <URL >中。URL的这种意外行为是众所周知的,就像它的equals()实际上可以根据Internet连接产生不同的结果。
我们提出这个事实只是为了说明编写完全响应式的应用程序是困难的,并且充满了陷阱。在下一节中,我们将看到另一个更明显的阻塞源:数据库访问代码。