Couchbase服务器是NoSQL家族中的一个现代文档数据库。有趣的是,Couchbase在其客户端API中支持RxJava作为一等公民。响应式扩展不仅用作包装器,而且在与数据库交互时得到了官方支持和习惯用法。许多其他存储引擎都有非阻塞的异步API,但是Couchbase的创建者选择了RxJava作为客户端层的最佳基础。
例如,让我们查询一个名为travel- sample的示例数据集,它恰好有一个ID route_14197的文档。在一个示例数据集中,route文档如下:
{
"id": 14197,
"type": "route",
"airline": "B6",
"airlineid": "airline_3029",
"sourceairport": "PHX",
"destinationairport": "BOS",
"stops": 0,
"equipment": "320",
"schedule": [
{
"day": 0,
"utc": "22:12:00",
"flight": "B6928"
},
{
"day": 0,
"utc": "06:40:00",
"flight": "B6387"
},
...
{
"day": 1,
"utc": "08:16:00",
"flight": "B6922"
}
...
每个查询返回一个Observable,从这一点来说,我们可以安全地转换检索到的记录,无论我们以什么方式:
CouchbaseCluster cluster = CouchbaseCluster.create();
cluster
.openBucket("travel-sample")
.get("route_14197")
.map(AbstractDocument::content)
.map(json -> json.getArray("schedule"))
.concatMapIterable(JsonArray::toList)
.cast(Map.class)
.filter(m -> ((Number)m.get("day")).intValue() == 0)
.map(m -> m.get("flight").toString())
.subscribe(flight -> System.out.println(flight));
get()返回Observable<JsonDocument>。JSON文档本质上是固有的松散类型,因此为了提取有意义的信息,我们必须事先了解它们的结构。
事先知道文档的样子,就很容易理解JsonDocument中的转换。转换的序列首先提取“schedule”元素,然后进一步拉取所有的“day”节点等于0的“flight”节点。Observable最终接收到“B6928”、“B6387”等字符串。令人惊讶的是,RxJava同样适用于以下几个方面:
数据检索,包括超时、缓存和错误处理
数据转换,如提取、过滤、钻进数据、聚合等
这显示了Observable的抽象的力量,您可以在非常不同的场景中使用,同时仍然暴露相同的简洁API