PostgreSQL有一个特殊的内置消息传递机制,通过LISTEN 和NOTIFY 扩展SQL语句。每个PostgreSQL客户机都可以通过一条SQL语句发送通知到虚拟通道,如下所示:
NOTIFY my_channel;
NOTIFY my_channel, '{"answer": 42}';
在这个例子中,我们将一个空的通知,然后是一个任意的字符串(它可以是JSON、XML或任何其他编码中的数据),发送到以my_channel命名的通道。通道基本上是在PostgreSQL数据库引擎中管理的队列。有趣的是,发送通知是事务的一部分,所以交付在commit之后发生,在回滚的情况下,消息被丢弃。
要使用特定通道的通知,我们首先必须LISTEN该通道。当我们开始监听给定的连接时,获取通知的唯一方法是使用getNotifications()方法定期轮询。这将引入随机延迟和不必要的CPU负载和上下文切换;不幸的是,这就是API的设计方式。完整的阻塞示例如下:
try (Connection connection =
DriverManager.getConnection("jdbc:postgresql:db")) {
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN my_channel");
}
Jdbc4Connection pgConn = (Jdbc4Connection) connection;
pollForNotifications(pgConn);
}
}
//...
void pollForNotifications(Jdbc4Connection pgConn) throws Exception {
while (!Thread.currentThread().isInterrupted()) {
final PGNotification[] notifications = pgConn.getNotifications();
if (notifications != null) {
for (final PGNotification notification : notifications) {
System.out.println(
notification.getName() + ": " +
notification.getParameter());
}
}
TimeUnit.MILLISECONDS.sleep(100);
}
}
我们不仅阻塞了客户端线程,还被迫保持了一个JDBC连接,因为监听与特定的连接有关。至少我们可以同时收听多个channels。前面的代码相当冗长,但是很简单。在调用LISTEN之后,我们进入一个无限循环,请求新的通知。调用getNotifications()是破坏性的,这意味着它丢弃了返回的通知,因此调用它两次将不会返回相同的事件。getName()是通道名称(例如,my_channel),而getParameter()返回可选的事件内容,例如JSON格式的有效负载。
该API非常过时,使用null来表示没有挂起的通知和,返回值还是数组,而不是集合。让我们来让它具有响应性一点。在没有任何基于推送的通知机制的情况下,我们被迫使用非阻塞interval()操作符来重新实现轮询。有许多微小的细节允许我们的自定义Observable 可以正确的表现,我们将在示例之后进一步讨论(目前尚不完整):
Observable<PGNotification> observe(String channel, long pollingPeriod) {
return Observable.<PGNotification>create(subscriber -> {
try {
Connection connection = DriverManager
.getConnection("jdbc:postgresql:db");
subscriber.add(Subscriptions.create(() ->
closeQuietly(connection)));
listenOn(connection, channel);
Jdbc4Connection pgConn = (Jdbc4Connection) connection;
pollForNotifications(pollingPeriod, pgConn)
.subscribe(Subscribers.wrap(subscriber));
} catch (Exception e) {
subscriber.onError(e);
}
}).share();
}
void listenOn(Connection connection, String channel) throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.execute("LISTEN " + channel);
}
}
void closeQuietly(Connection connection) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
令人惊讶的是,这个例子本可以短得多,如果没有IOException的话。我们的目标是产生强劲的 Observable<PGNotification>。首先,我们延迟打开到数据库的连接,直到有人真正订阅。此外,为了避免连接泄漏(任何直接处理JDBC的应用程序中的严重问题),我们确保当订阅方取消订阅时,连接会被关闭。此外,当流中出现错误时,取消订阅,从而关闭连接。
现在我们准备调用listenOn(),并开始通过一个开放的连接来接收通知。如果在执行此语句时抛出异常,则通过调用subscriber.onError(e)来捕获和处理该异常。它不仅将错误无缝地传播给订阅者,而且还强制关闭连接。但是,如果LISTEN请求成功,则下一个调用getNotifications()将返回随后发送的所有事件。
我们不想阻止任何线程所以,我们在 pollForNotifications()方法内部hinterval()创建了一个内部Observable。我们用相同的Subscriber 去订阅那个Observable,但是用Subscribers.wrap()包装了一下,因此,onStart()不会在该Subscriber 上执行两次
Observable<PGNotification> pollForNotifications(
long pollingPeriod,
AbstractJdbc2Connection pgConn) {
return Observable
.interval(0, pollingPeriod, TimeUnit.MILLISECONDS)
.flatMap(x -> tryGetNotification(pgConn))
.filter(arr -> arr != null)
.flatMapIterable(Arrays::asList);
}
Observable<PGNotification[]> tryGetNotification(
AbstractJdbc2Connection pgConn) {
try {
return Observable.just(pgConn.getNotifications());
} catch (SQLException e) {
return Observable.error(e);
}
}
我们会定期检查getnotification()的内容,首先将其封装在一个笨拙的 Observable<PGNotification[]>中。因为返回的数组PGNotification[]可以是null,然后我们filter()出null,然后通过flatMapIterable()打开数组,首先将它转换为一个List<PGNotification>
,通过Arrays::asList。我鼓励您仔细地遍历所有这些步骤,跟踪中间Observables的类型。包含closeQuietly()和tryGetNotification()的唯一原因是为了处理SQLException异常检查。注意,我们在closeQuietly()中吞下了这个异常,因为它是由我们所无能为力的上下文中调用的。例如,当某人取消订阅时,没有办法转发它。
最后一小部分实现是publish()和refCount(),接近第一个方法的末尾。这两种方法使得在多个订阅者之间共享一个JDBC连接成为可能。没有它们,每一个新用户都将打开一个新的连接并监听它,这是非常浪费的。此外,refCount()跟踪订阅者的数量,当最后一个用户取消订阅时,物理上关闭数据库连接。请参阅第54页的“Single Subscription with publish().refCount()”,以获得关于publish()和refCount()的更多细节,特别是它如何改变一个lambda表达式在observable . create()中的行为。
记住一个连接可以监听多个channel,作为练习来实现observe(),以便它重用在所有订阅者和所有他们感兴趣的渠道上的相同的连接。如果您调用observe()一次并订阅多次,那么当前实现共享一个连接,它可以很容易地重用相同的连接,即使是对不同channel感兴趣的订阅者。
在PostgreSQL中探索LISTEN和NOTIFY没有什么实际的理由;市场上有更快、更健壮、更可靠的消息队列。但是这个案例研究展示了如何在更响应式的场景中使用JDBC,即使它仍然需要一点阻塞或轮询。