正如前面几节所看到的,这个Vert.x开发模型使用回调。在协调多个异步操作时,这个基于callback的开发模型往往会产生复杂的代码。例如,让我们看看如何从数据库中检索数据。首先,我们需要连接到数据库,然后向数据库发送一个从查询,处理结果,并释放连接。所有这些操作都是异步的。使用回调函数,您可以使用Vert.x JDBC客户端写出如下的代码:

client.getConnection(conn -> {
    if (conn.failed()) {/* failure handling */}
    else {
        SQLConnection connection = conn.result();
        connection.query("SELECT * from PRODUCTS", rs -> {
            if (rs.failed()) {/* failure handling */}
            else {
                List<JsonArray> lines =
                rs.result().getResults();
                for (JsonArray l : lines) {
                System.out.println(new Product(l));
                }
                connection.close(done -> {
                    if (done.failed()) {/* failure handling */}
                });
            }
        });
    }
})

虽然仍然可以管理,但示例显示回调可以很快导致不可读的代码。你也可以使用Vert.t 的Future来处理异步操作。不像Java的Futures,Vert。x Futures是非阻塞的。Futures提供更高级别的复合操作符来构建操作序列或并行执行操作。通常,如下一个代码片段所示,我们组合future,来构建异步的操作序列:

Future<SQLConnection> future = getConnection();
future
    .compose(conn -> {
        connection.set(conn);
        // Return a future of ResultSet
        return selectProduct(conn);
    })
    // Return a collection of products by mapping
    // each row to a Product
    .map(result -> toProducts(result.getResults()))
    .setHandler(ar -> {
        if (ar.failed()) { /* failure handling */ }
        else {
            ar.result().forEach(System.out::println);
        }
        connection.get().close(done -> {
            if (done.failed()) { /* failure handling */ }
        });
    });

然而,尽管Futures 使得代码变得更具有声明性时,但是我们将在一个批处理中检索所有行并处理它们。这个结果可能很大,需要大量的时间来检索。同时,您不需要整个结果都检索完毕才来开始处理它。我们可以一个接一个地处理每一行。幸运的是,Vert.x为这个开发模型挑战提供了答案,并为您提供了一种使用响应式编程开发模型实现响应式微服务的方法。Vert.x提供了RxJava API来:

  • 合并和协调异步任务

  • 对以流式输入形式传入的消息作出响应/反应

让我们使用RxJava api重写前面的代码。

// We retrieve a connection and cache it,
// so we can retrieve the value later.
Single<SQLConnection> connection = client
    .rxGetConnection();
connection
    .flatMapObservable(conn ->
        conn
        // Execute the query
        .rxQueryStream("SELECT * from PRODUCTS")
        // Publish the rows one by one in a new Observable
        .flatMapObservable(SQLRowStream::toObservable)
        // Don't forget to close the connection
        .doAfterTerminate(conn::close)
)
// Map every row to a Product
.map(Product::new)
// Display the result one by one
.subscribe(System.out::println);

除了提高可读性之外,响应式编程还允许您可以到结果流上,并在它们可用的时候处理条目。在Vert.x中您可以选择您喜欢的开发模式。在本报告中,我们将使用回调和RxJava。

results matching ""

    No results matching ""