我们可以使用RxJava的Observables和操作符,以相同的、统一的方式集成数百个系统。例如,假设您希望监视文件系统的新文件(与第138页上的“定期轮询”进行比较)。有了Camel对RxJava的支持,这个任务非常简单:

CamelContext camel = new DefaultCamelContext();
ReactiveCamel reactiveCamel = new ReactiveCamel(camel);
reactiveCamel
    .toObservable("file:/home/user/tmp")
    .subscribe(e ->
        log.info("New file: {}", e));

就是这样啦。创建DefaultCamelContext和ReactiveCamel之后,我们就可以开始消费消息了。由Camel支持的每个集成平台都通过URI编码的:在我们的例子中是file:/home/user。通过使用这样的URI来调用toObservable(),我们创建了Observable<Message>,每次在指定的目录中出现新文件时它都会发出事件。

results matching ""

    No results matching ""