就像Couchbase一样,MongoDB允许在没有预定义模式的情况下存储任意json格式的文档。客户端库对RxJava提供了头等类的支持,允许异步存储和查询数据。下面的例子说明了这两点。它首先将12个文档插入到数据库中;一旦批量插入完成,它就会查询它们:
import com.mongodb.rx.client.*;
import org.bson.Document;
import java.time.Month;
MongoCollection<Document> monthsColl = MongoClients
.create()
.getDatabase("rx")
.getCollection("months");
Observable
.from(Month.values())
.map(month -> new Document()
.append("name", month.name())
.append("days_not_leap", month.length(false))
.append("days_leap", month.length(true))
)
.toList()
.flatMap(monthsColl::insertMany)
.flatMap(s -> monthsColl.find().toObservable())
.toBlocking()
.subscribe(System.out::println);
Month类是有从1月到12月值的Enum类。此外,我们可以轻易地获得任何一个月的长度,无论是在闰年还是不在闰年。首先,我们创建 12个BSON(二进制JSON)文档,每个文档代表一个月的长度。然后我们在MongoCollection中使用insertMany()批量处理插入 List<Document>。这产生一个Observable<Success>(值本身不包含任何有意义的信息);这是一个单例对象)。当Success 事件出现时,我们可以通过调用find(). toobservable()来查询数据库。我们希望刚刚插入的12个文档被找到。为了清楚期间,没有包括自动指定的_id属性,下面这就是在最后打印的内容:
Document{{name=JANUARY, days_not_leap=31, days_leap=31}}
Document{{name=FEBRUARY, days_not_leap=28, days_leap=29}}
Document{{name=MARCH, days_not_leap=31, days_leap=31}}
...
同样,真正的力量来自组合。通过MongoDB的RxJava驱动,您可以轻松地同时查询多个集合,并实现并发,却不需要真正地考虑它。下面的代码片段将两个并发请求发送给MongoDB,另一个请求发给定价服务。请注意, first()不是Observable的操作符;相反,它是一个MongoDB操作符,它在构造查询后返回Observable的结果。find()相当于SQL中的WHERE子句,而projection()表示SELECT . first() is like LIMIT 1 :
Observable<Integer> days = db.getCollection("months")
.find(Filters.eq("name", APRIL.name()))
.projection(Projections.include("days_not_leap"))
.first()
.map(doc -> doc.getInteger("days_not_leap"));
Observable<Instant> carManufactured = db.getCollection("cars")
.find(Filters.eq("owner.name", "Smith"))
.first()
.map(doc -> doc.getDate("manufactured"))
.map(Date::toInstant);
Observable<BigDecimal> pricePerDay = dailyPrice(LocalDateTime.now());
Observable<Insurance> insurance = Observable
.zip(days, carManufactured, pricePerDay,
(d, man, price) -> {
//Create insurance
});
从技术上讲,你可以混合和匹配任何Observables,不管它们的性质和来源。前面的示例对MongoDB进行了两个不同集合的查询,并在dailyPrice()中做了另一个查询--------例如,通过Retrofit 做一个HTTP请求返回回来一个Observable。归根结底:Observable的来源无关紧要,您可以以任何方式编写异步计算和请求。您计划查询多个数据库,并结合web服务和本地文件系统操作吗?所有这些都可以并发地运行,并且可以轻松地组合在一起。掌握了RxJava的一般行为之后,每一个Observable的源在表面上都是相同的。