在本节中,我们将创建另一个微服务来调用消息驱动的hello微服务,通过向“hello”这个地址发送一条消息并得到一个回复。微服务将重新实现与前一章相同的逻辑,并两次调用服务(一次使用Luke,一次使用Leia)。
像往常一样,让我们创建一个新项目。:
mkdir hello-consumer-microservice-message
cd hello-consumer-microservice-message
mvn io.fabric8:vertx-maven-plugin:1.0.5:setup \
-DprojectGroupId=io.vertx.microservice \
-DprojectArtifactId=hello-consumer-microservice-message \
-Dverticle=io.vertx.book.message.HelloConsumerMicroservice \
-Ddependencies=infinispan,rx
在这里,我们也加入了这个Vert.x RxJava支持,以从Vert.x提供的RX-ified API中获益。如果您在前一节中更新了Infinispan配置,则需要将其复制到这个新项目中。
现在编辑io.vertx.book.message.HelloConsumerMicroservice。因为我们要使用RxJava,所以要修改import语句以匹配 io.vertx.rxjava.core.AbstractVerticle。然后用以下方法实现start方法:
@Override
public void start() {
EventBus bus = vertx.eventBus();
Single<JsonObject> obs1 = bus
.<JsonObject>rxSend("hello", "Luke")
.map(Message::body);
Single<JsonObject> obs2 = bus
.<JsonObject>rxSend("hello", "Leia")
.map(Message::body);
Single
.zip(obs1, obs2, (luke, leia) ->
new JsonObject()
.put("Luke", luke.getString("message"))
.put("Leia", leia.getString("message"))
)
.subscribe(
x -> System.out.println(x.encode()),
Throwable::printStackTrace);
}
这段代码非常类似于前一章的代码。我们将使用事件总线将消息发送到到"hello"地址上并提取应答的主体,而不是使用WebClient调用HTTP端点。我们使用zip操作符来检索两个响应并构建最终结果。在subscribe方法中,我们将最终结果打印到控制台或打印错误的堆栈跟踪。
让我们将其与HTTP服务器结合起来。当收到HTTP请求时,我们会两次调用hello服务,并将构建的结果作为响应返回:
import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.eventbus.Message;
import rx.Single;
public class HelloConsumerMicroservice extends AbstractVerticle {
@Override
public void start() {
vertx.createHttpServer()
.requestHandler(
req -> {
Single<JsonObject> obs1 = vertx.eventBus()
.<JsonObject>rxSend("hello", "Luke")
.map(Message::body);
Single<JsonObject> obs2 = vertx.eventBus()
.<JsonObject>rxSend("hello", "Leia")
.map(Message::body);
Single
.zip(obs1, obs2, (luke, leia) ->
new JsonObject()
.put("Luke", luke.getString("message")
+ " from " + luke.getString("served-by"))
.put("Leia", leia.getString("message")
+ " from " + leia.getString("served-by"))
)
.subscribe(
x -> req.response().end(x.encodePrettily()),
t -> req.response().setStatusCode(500).end(t.getMessage())
);
})
.listen(8082);
}
最后一段代码只是将事件总线交互封装到一个requestHandler中,并处理HTTP响应。如果出现故障,我们返回一个包含错误消息的JSON对象。
如果你是使用mvn compile vertx:run -Dvertx.runArgs="-cluster -Djava.net.preferIPv4Stack=true"启动的话,那么打开浏览器 http://localhost:8082,你会看到这样的显示:
{
"Luke" : "hello Luke from ...HelloMicroservice@39721ab",
"Leia" : "hello Leia from ...HelloMicroservice@39721ab"
}