在本节中,我们将创建另一个微服务来调用消息驱动的hello微服务,通过向“hello”这个地址发送一条消息并得到一个回复。微服务将重新实现与前一章相同的逻辑,并两次调用服务(一次使用Luke,一次使用Leia)。

像往常一样,让我们创建一个新项目。:

mkdir hello-consumer-microservice-message
cd hello-consumer-microservice-message
mvn io.fabric8:vertx-maven-plugin:1.0.5:setup \
    -DprojectGroupId=io.vertx.microservice \
    -DprojectArtifactId=hello-consumer-microservice-message \
    -Dverticle=io.vertx.book.message.HelloConsumerMicroservice \
    -Ddependencies=infinispan,rx

在这里,我们也加入了这个Vert.x RxJava支持,以从Vert.x提供的RX-ified API中获益。如果您在前一节中更新了Infinispan配置,则需要将其复制到这个新项目中。

现在编辑io.vertx.book.message.HelloConsumerMicroservice。因为我们要使用RxJava,所以要修改import语句以匹配 io.vertx.rxjava.core.AbstractVerticle。然后用以下方法实现start方法:

@Override
public void start() {
    EventBus bus = vertx.eventBus();
    Single<JsonObject> obs1 = bus
        .<JsonObject>rxSend("hello", "Luke")
        .map(Message::body);
    Single<JsonObject> obs2 = bus
        .<JsonObject>rxSend("hello", "Leia")
        .map(Message::body);
    Single
        .zip(obs1, obs2, (luke, leia) ->
            new JsonObject()
            .put("Luke", luke.getString("message"))
            .put("Leia", leia.getString("message"))
        )
        .subscribe(
            x -> System.out.println(x.encode()),
                Throwable::printStackTrace);
    }

这段代码非常类似于前一章的代码。我们将使用事件总线将消息发送到到"hello"地址上并提取应答的主体,而不是使用WebClient调用HTTP端点。我们使用zip操作符来检索两个响应并构建最终结果。在subscribe方法中,我们将最终结果打印到控制台或打印错误的堆栈跟踪。

让我们将其与HTTP服务器结合起来。当收到HTTP请求时,我们会两次调用hello服务,并将构建的结果作为响应返回:

import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.eventbus.Message;
import rx.Single;

public class HelloConsumerMicroservice extends AbstractVerticle {

    @Override
    public void start() {
        vertx.createHttpServer()
                .requestHandler(
                        req -> {
                            Single<JsonObject> obs1 = vertx.eventBus()
                                    .<JsonObject>rxSend("hello", "Luke")
                                    .map(Message::body);
                            Single<JsonObject> obs2 = vertx.eventBus()
                                    .<JsonObject>rxSend("hello", "Leia")
                                    .map(Message::body);

                            Single
                                    .zip(obs1, obs2, (luke, leia) ->
                                            new JsonObject()
                                                    .put("Luke", luke.getString("message")
                                                            + " from " + luke.getString("served-by"))
                                                    .put("Leia", leia.getString("message")
                                                            + " from " + leia.getString("served-by"))
                                    )
                                    .subscribe(
                                            x -> req.response().end(x.encodePrettily()),
                                            t -> req.response().setStatusCode(500).end(t.getMessage())
                                    );
                        })
                .listen(8082);
    }

最后一段代码只是将事件总线交互封装到一个requestHandler中,并处理HTTP响应。如果出现故障,我们返回一个包含错误消息的JSON对象。

如果你是使用mvn compile vertx:run -Dvertx.runArgs="-cluster -Djava.net.preferIPv4Stack=true"启动的话,那么打开浏览器 http://localhost:8082,你会看到这样的显示:

{
"Luke" : "hello Luke from ...HelloMicroservice@39721ab",
"Leia" : "hello Leia from ...HelloMicroservice@39721ab"
}

results matching ""

    No results matching ""