现在让我们改变当前的行为,用两个不同的(路径)参数调用hello microservice两次:
HttpRequest<JsonObject> request1 = client
.get(8080, "localhost", "/Luke")
.as(BodyCodec.jsonObject());
HttpRequest<JsonObject> request2 = client
.get(8080, "localhost", "/Leia")
.as(BodyCodec.jsonObject());
这两个请求是独立的,可以并发执行。但在这里我们要写一个装配这两个调用结果的响应。目前代码需要调用服务两次,然后将两个结果组合在一起,这会使得问题变得复杂。我们需要在收到一个响应时检查是否已经完成了另一个请求。虽然这段代码对于两个请求仍然是可管理的,但是当我们需要处理更多的时候,它就变得过于复杂了。幸运的是,正如前面的章节所指出的,我们可以使用响应式编程和RxJava来简化代码。
我们指示vertx-maven-plugin导入这个Vert.x RxJava API。在HelloConsumerMicroservice中,我们将导入语句替换为:
import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.ext.web.*;
import io.vertx.rxjava.ext.web.client.*;
import io.vertx.rxjava.ext.web.codec.BodyCodec;
import rx.Single;
使用RX,我们可以将之前的复杂代码简化:
private void invokeMyFirstMicroservice(RoutingContext rc) {
HttpRequest<JsonObject> request1 = client
.get(8080, "localhost", "/Luke")
.as(BodyCodec.jsonObject());
HttpRequest<JsonObject> request2 = client
.get(8080, "localhost", "/Leia")
.as(BodyCodec.jsonObject());
Single<JsonObject> s1 = request1.rxSend()
.map(HttpResponse::body);
Single<JsonObject> s2 = request2.rxSend()
.map(HttpResponse::body);
Single
.zip(s1, s2, (luke, leia) -> {
// We have the results of both requests in Luke and Leia
return new JsonObject()
.put("Luke", luke.getString("message"))
.put("Leia", leia.getString("message"));
})
.subscribe(
result -> rc.response().end(result.encodePrettily()),
error -> {
error.printStackTrace();
rc.response()
.setStatusCode(500).end(error.getMessage());
}
);
}
注意rxSend方法调用。来自Vert.x的RxJava方法前缀是rx,很易于识别。rxSend的结果是一个Single,即:只有一个元素的Observable,表示操作的延迟结果。single.zip 方法将接收一系列的 Single,并且一旦所有的他们已经收到他们的值的时候,会使用各个Single的值去调用一个函数。single.zip生成另一个包含函数结果的Single。最后,我们调用subscribe来订阅。该方法以两个函数为参数:
1.第一个函数由zip函数的结果调用(即一个JSON对象)。我们将接收的JSON有效负载写入HTTP响应中。
2.第二个函数式在发生故障时候调用的(超时、异常等)。在本例中,我们使用一个空的JSON对象进行响应。
使用此代码,如果我们打开http://localhost:8081,而hello microservice仍在运行,我们将看到:
{
"Luke" : "hello Luke",
"Leia" : "hello Leia"
}