现在,我们将重点关注事件驱动的方法来编写HTTP服务器,这更有希望实现可伸缩性。一个连接对应一个县城的阻塞处理模型显然不具有伸缩性。我们需要一种方法,只通过几个线程,来管理多个客户端连接。这有很多好处:
减少内存消耗
更好的CPU和CPU缓存利用率
在单个节点上大大提高了可伸缩性
一个警告是失去的简单和清晰。线程不允许在任何操作上阻塞,我们不能和本地方法调用那样在网络上接收和发送数据报。延迟是不可预测的,响应时间也会增加。当你读到它的时候,这里很能会有相当多的旋转硬件在驱动着它,这甚至比局域网还慢。在本节中,我们将使用Netty框架开发一个小的事件驱动应用程序,然后再重构它以使用RxNetty。最后,我们给出了所有解决方案的基准。
Netty 完全是事件驱动的;我们从不阻塞等待被发送或接收的数据。相反,以ByteBuf实例形式的原始字节被推送到我们的处理管道(pipeline)中。TCP/ IP给了我们这么一种印象:建立连接,并且数据按照字节的形式在两台电脑之间流动传输。但实际上,TCP / IP是建立在IP之上的,它几乎不能传输数据块(即我们常说的数据包)。以正确的顺序组装它们,给我们一种流的感觉,而这其实是操作系统的工作。Netty放弃了这种抽象,并在字节序列层中工作,而不是流中。几个字节无论何时到达我们的应用程序,Netty将通知我们的处理程序。每当我们发送几个字节时,我们就会得到一个没有阻塞的ChannelFuture 。
我们的非阻塞HTTP服务器示例有三个组件。第一个简单地启动服务器并设置环境:
PS:Netty4.x
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
class HttpTcpNettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
new ServerBootstrap()
.option(ChannelOption.SO_BACKLOG, 50_000)
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpInitializer())
.bind(8080)
.sync()
.channel()
.closeFuture()
.sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这是Netty中最基本的HTTP服务器。关键部分是bossGroup池,它负责接收传入的连接和处理事件的workerGroup。这些池并不非常大:一个是bossGroup,另一个是workerGroup,但是对于编写一个良好的Netty服务器来说,这已经足够了。除了监听端口8080,我们还没有指定服务器应该做什么。这是通过ChannelInitializer可配置的:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
class HttpInitializer extends ChannelInitializer<SocketChannel> {
private final HttpHandler httpHandler = new HttpHandler();
@Override
public void initChannel(SocketChannel ch) {
ch
.pipeline()
.addLast(new HttpServerCodec())
.addLast(httpHandler);
}
}
我们没有提供处理连接的单独函数,而是构建了一个管道(pipeline),该管道(pipeline)处理传入的ByteBuf实例。管道(pipeline)的第一步将将原始输入字节解码为更高级别的HTTP请求对象。这个处理器是内置的。它还用于将HTTP响应编码回原始字节。在更健壮的应用程序中,您常常会看到更多的集中在更小的功能上的处理器;例如,帧解码、协议解码、安全等。每一条数据和通知都流经这条管道(pipeline)。
您可能已经开始在这里看到与RxJava的相似之处了。我们管道(pipeline)的第二步:业务逻辑组件实际上是处理请求,而不是拦截或充实请求。尽管HttpServerCodec本质上是有状态的(它将传入的数据包转换为高级的HttpRequest实例),但是我们自定义的HttpHandler可以是无状态的单例:
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import java.nio.charset.Charset;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@ChannelHandler.Sharable
class HttpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
sendResponse(ctx);
}
}
private void sendResponse(ChannelHandlerContext ctx) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer("OK".getBytes(Charset.forName("UTF-8"))));
response.headers().add("Content-length", 2);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// log.error("Error", cause);
System.out.println("error :" + cause);
ctx.close();
}
}
译者注:由于作者的代码,引用的包没有全面,以及某些工具类并没有标识,所以做了一些替换。
在构建响应对象之后,我们write()回DefaultFullHttpResponse。然而,write()不会像普通的套接字那样阻塞。相反,它返回一个ChannelFuture,我们可以通过addListener()订阅它,并异步关闭该通道:
ctx
.writeAndFlush(response)
.addListener(ChannelFutureListener.CLOSE);
Channel 是关于通信链路(例如,HTTP连接)的抽象,因此关闭了Channel就是关闭了连接。同样,为了实现持久连接,我们不希望这样做。
Netty只使用少量线程来处理数千个连接。对于每个连接,我们不保留任何重量级数据结构或者线程。计算机接收到一个IP包,并在目标端口上唤醒进程监听。TCP/ IP连接只是一个经常使用线程实现的抽象。但是,当应用程序在负载和连接数量上要求更高时,在包级别上直接操作就更加健壮了。我们还有通道(轻量级代表)和有状态处理器的管道。
Observable server with RxNetty
Netty是许多成功产品和框架的重要骨干,如Akka、Elasticsearch, 、HornetQ、Play framework、Ratpack和Vert.x。我们也有一个围绕Netty的包装器,他是Netty的API与我们的RxJava的桥梁,让我们将非阻塞Netty服务器重写为RxNetty。但我们将从一个简单的服务器开始,以熟悉API:
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
class EurUsdCurrencyTcpServer {
private static final BigDecimal RATE = new BigDecimal("1.06448");
public static void main(final String[] args) {
TcpServer
.newServer(8080)
.<String, String>pipelineConfigurator(pipeline -> {
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder(UTF_8));
})
.start(connection -> {
Observable<String> output = connection
.getInput()
.map(BigDecimal::new)
.flatMap(eur -> eurToUsd(eur));
return connection.writeAndFlushOnEach(output);
})
.awaitShutdown();
}
static Observable<String> eurToUsd(BigDecimal eur) {
return Observable
.just(eur.multiply(RATE))
.map(amount -> eur + " EUR is " + amount + " USD\n")
.delay(1, TimeUnit.SECONDS);
}
}
这是一个独立的TCP / IP服务器,构建在RxNetty之上。你应该对它的主要部分有一个粗略的了解。首先,我们创建一个新的TCP / IP服务器监听端口8080。Netty提供了通过管道传递消息的的ByteBuf低级抽象。我们还必须配置这样的管道。第一个处理器将ByteBuf序列通过内置的LineBasedFrameDecoder 解码器重新排列。第二,解码器将包含完整行的ByteBuf转换为实际的字符串实例。从这里开始,我们只使用字符串。
每当有新的连接到达时,都会执行回调。连接对象允许我们异步的发送和接收数据。首先,我们从connection.getInput()开始。该对象类型为Observable<String> ,并在每次在服务器上出现客户端请求的新行时,都会发出一个值。getInput() Observable异步的向我们通知新的输入。首先,我们将字符串解析为BigDecimal。然后,使用辅助方法eurToUsd(),我们假装调用了一个货币交换服务。为了使示例更加实际,我们人为地应用了delay(),因此我们必须等待一些响应。显然,delay()是异步的,不涉及任何sleep。与此同时,我们不断接收和转换请求。
在所有这些转换之后, output Observable被直接输入到writeAndFlushOnEach()中。我相信这是完全可理解的——我们接收一系列输入,转换它们,并将转换后的序列作为输出序列。现在,让我们使用telnet与此服务器交互。注意,由于我们伪造的货币服务器具有延迟,所以会看见响应是在消费了几个请求之后才出现的:
$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
2.5
2.5 EUR is 2.661200 USD
0.99
0.99 EUR is 1.0538352 USD
0.94
0.94 EUR is 1.0006112 USD
20
30
40
20 EUR is 21.28960 USD
30 EUR is 31.93440 USD
40 EUR is 42.57920 USD
我们将服务器当作一个从请求数据到响应数据的函数。因为TCP / IP连接不仅仅是一个简单的函数,而是一些相互依赖的数据块的流,RxJava在这个场景中非常出色。一组丰富的运算符可以以非凡的方式,很容易地将输入转换为输出。当然,输出流不需要基于输入;例如,如果您正在实现服务器发送的事件,服务器就会简单地发布数据,而不考虑传入的数据。
EurUsdCurrencyTcpServer是响应式的,因为它只在数据出现时才起作用。对于每个客户机,我们没有专门的线程。这个实现可以很容易地承受数千个并发连接,而垂直的可伸缩性只受它必须处理的流量的限制,而不是有多少空闲连接的数量。
了解了RxNetty大体是如何工作的,我们可以返回返回之前那个OK响应的原始HTTP服务器。RxNetty内置了对HTTP客户端和服务器的支持,但我们将从基于TCP / IP的简单实现开始:
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.reactivex.netty.examples.AbstractServerExample;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import static java.nio.charset.StandardCharsets.UTF_8;
class HttpTcpRxNettyServer {
public static final Observable<String> RESPONSE = Observable.just(
"HTTP/1.1 200 OK\r\n" +
"Content-length: 2\r\n" +
"\r\n" +
"OK");
public static void main(final String[] args) {
TcpServer
.newServer(8080)
.<String, String>pipelineConfigurator(pipeline -> {
pipeline.addLast(new LineBasedFrameDecoder(128));
pipeline.addLast(new StringDecoder(UTF_8));
})
.start(connection -> {
Observable<String> output = connection
.getInput()
.flatMap(line -> {
if (line.isEmpty()) {
return RESPONSE;
} else {
return Observable.empty();
}
});
return connection.writeAndFlushOnEach(output);
})
.awaitShutdown();
}
}
脑中有了EurUsdCurrencyTcpServere 概念,理解HttpTcpRxNettyServer就简单了。因为出于教学的目的,我们总是返回静态的200 OK响应,而且解析请求是没有意义的。然而,一个行为良好的服务器在读取请求之前不应该发送响应。因此,我们首先查找getInput()中的空行,标记HTTP请求的结束。只有这样我们才能产生200 OK的行。通过这种方式构建的输出Observable 被传递给connection.writeString()。换句话说,当请求包含第一个空行时(空行标识一个请求结束),然后响应将被发送到客户端。
使用TCP / IP实现HTTP服务器是一种有趣的练习,可以帮助您理解HTTP的复杂性。幸运的是,我们不需要一直使用TCP / IP抽象去实现HTTP和RESTful web服务。与Netty类似,RxNetty也有一些内置组件来为HTTP服务服务:
import io.reactivex.netty.protocol.http.server.HttpServer;
class RxNettyHttpServer {
private static final Observable<String> RESPONSE_OK =
Observable.just("OK");
public static void main(String[] args) {
HttpServer
.newServer(8086)
.start((req, resp) ->
resp
.setHeader(CONTENT_LENGTH, 2)
.writeStringAndFlushOnEach(RESPONSE_OK)
).awaitShutdown();
}
}
如果您厌倦了仅仅返回一个静态的200 OK,那么我们可以相对轻松地构建非阻塞RESTful web服务:
class RestCurrencyServer {
private static final BigDecimal RATE = new BigDecimal("1.06448");
public static void main(final String[] args) {
HttpServer
.newServer(8080)
.start((req, resp) -> {
String amountStr = req.getDecodedPath().substring(1);
BigDecimal amount = new BigDecimal(amountStr);
Observable<String> response = Observable
.just(amount)
.map(eur -> eur.multiply(RATE))
.map(usd ->
"{\"EUR\": " + amount + ", " +
"\"USD\": " + usd + "}");
return resp.writeString(response);
})
.awaitShutdown();
}
}
我们可以使用web浏览器或curl来与此服务器交互。初始的subString(1)需要从请求中去掉第一个斜杠:
$ curl -v localhost:8080/10.99
> GET /10.99 HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:8080
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
<
{"EUR": 10.99, "USD": 11.6986352}
通过使用这个简单的HTTP服务器的一些实现,我们可以在性能、可伸缩性和吞吐量上进行比较。这就是我们放弃熟悉的基于线程的模型,并开始使用RxJava和异步api的原因。