diff --git a/src/reference/asciidoc/jdbc.adoc b/src/reference/asciidoc/jdbc.adoc index 90abcab395d..6c9a5bcf8bb 100644 --- a/src/reference/asciidoc/jdbc.adoc +++ b/src/reference/asciidoc/jdbc.adoc @@ -480,6 +480,8 @@ Instead, if possible, consider using either JMS- or AMQP-backed channels instead For further reference, see the following resource: * https://mikehadlow.blogspot.com/2012/04/database-as-queue-anti-pattern.html[The Database As Queue Anti-Pattern]. + +If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described <>. ==== ===== Concurrent Polling @@ -575,6 +577,58 @@ The message data for a persistent channel is keyed in the store on the channel n Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them. To avoid this danger, you can use the message store `region` to keep data separate for different physical channels that have the same logical name. + +[[postgresql-push]] +==== PostgreSQL: receiving push notifications + +PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations. +Spring Integration leverages this mechanism to allow for receiving push notifications when new messages are added to a `JdbcChannelMessageStore`. +When using this feature, a database trigger must be defined, which can be found as part of the comments of the `schema-postgresql.sql` file which is included in the JDBC module of Spring Integration. + +Push notifications are received via the `PostgresChannelMessageTableSubscriber` class which allows its subscribers to receive a callback upon the arrival of new messages for any given `region` and `groupId`. +These notifications are received even if a message was appended on a different JVM, but to the same database. +To read messages when they arrive, a `PostgresSubscribableChannel` can delegate new messages to a `MessageHandler` what allows to implement an effective pub-sub mechanism based on JDBC. + +For example, push notifications for `some group` can be received as follows: + +==== +[source,java] +---- +@Bean +public JdbcChannelMessageStore messageStore( + DataSource dataSource) { + JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource); + messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider()); + return messageStore; +} + +@Bean +public PostgresChannelMessageTableSubscriber subscriber( + @Value("${spring.datasource.url}") String url, + @Value("${spring.datasource.username}") String username, + @Value("${spring.datasource.password}") String password) { + return new PostgresChannelMessageTableSubscriber(() -> DriverManager.getConnection( + url, username, password).unwrap(PgConnection.class)); +} + +@Bean +public PostgresSubscribableChannel channel( + PostgresChannelMessageTableSubscriber subscriber, + JdbcChannelMessageStore messageStore) { + return new PostgresSubscribableChannel(messageStore, "some group", subscriber); +} +---- +==== + +[IMPORTANT] +==== +Any active `PostgresChannelMessageTableSubscriber` occupies an exclusive JDBC `Connection` for the duration of its active life cycle. +It is therefore important that this connection does not originate from a pooling `DataSource`. +Such connection pools do normally expect that issued connections are closed within a predefined timeout window. + +For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions. +==== + [[stored-procedures]] === Stored Procedures diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index dc5b3e9d803..1d056b8e9b2 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -88,6 +88,10 @@ The `DefaultLockRepository` can now be supplied with a `PlatformTransactionManag See <<./jdbc.adoc#jdbc-lock-registry,JDBC Lock Registry>> for more information. +Furthermore, a `PostgresChannelMessageTableSubscriber` allows to receive push notifications upon new messages to a `JdbcChannelMessageStore` if using PostgreSQL as a database. + +See <<./jdbc.adoc#postgresql-push,PostgreSQL: receiving push notifications>> for more information. + === TCP/IP Changes The `lookupHost` property of the `AbstractConnectionFactory` and `DatagramPacketMessageMapper` is now set to `false` by default to avoid delays in the environments where DNS is not configured.