Vert.x提供了一个事件总线,允许应用程序的不同组件使用消息进行交互。消息被发送到某个地址上,并有一组headers和一个body。地址是一个表示目的地的不透明字符串。消息消费者将自己注册到该地址以接收消息。事件总线也是集群的,这意味着它可以通过分布式发送者和消费者之间的网络发送消息。以cluster模式启动一个Vert.x应用程序,节点被连接以允许共享数据结构、故障检测挡板和负载平衡组通信。事件总线可以在集群中的所有节点之间分派消息。要创建这样的集群配置,可以使用Apache Ignite、Apache Zookeeper、Infinispan或Hazelcast。在本报告中,我们将使用Infinispan,但我们不会进入高级配置。为此,请参考Infinispan文档(http://infinispan.org)。当Infinispan(或您选择的技术)管理节点发现和目录时,事件总线使用直接的对等TCP连接来通信。
事件总线提供了三种类型的传递语义。首先,send方法允许一个组件向某个地址发送消息。一个单独的消费者将收到消息。如果在这个地址上注册了不止一个消费者,那么Vert.x采用循环策略来选择消费者:
// Consumer
vertx.eventBus().consumer("address", message -> {
System.out.println("Received: '" + message.body() + "'");
});
// Sender
vertx.eventBus().send("address", "hello");
与send相反,您可以使用publish方法将消息传递给在某个地址上注册的所有消费者。最后,send方法可以与reply处理器一起使用。这个request/response机制允许在两个组件之间实现基于消息的异步交互:
// Consumer
vertx.eventBus().consumer("address", message -> {
message.reply("pong");
});
// Sender
vertx.eventBus().send("address", "ping", reply -> {
if (reply.succeeded()) {
System.out.println("Received: " + reply.result().body());
} else {
// No reply or failure
reply.cause().printStackTrace();
}
});
如果您使用的是Rx-ified API,您可以使用rxSend方法,它返回一个Single。当接收到应答时,此Single接收到一个值。我们很快就会看到这种方法。