Skip to content

Adds documentation on usage of the PostgresChannelMessageTableSubscriber. #3890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/reference/asciidoc/jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ Instead, if possible, consider using either JMS- or AMQP-backed channels instead
For further reference, see the following resource:

* https://mikehadlow.blogspot.com/2012/04/database-as-queue-anti-pattern.html[The Database As Queue Anti-Pattern].

If you are still planning to use your database as a queue, consider using PostgreSQL and its notification mechanism which is described <<postgresql-push,in a subsequent section>>.
====

===== Concurrent Polling
Expand Down Expand Up @@ -575,6 +577,58 @@ The message data for a persistent channel is keyed in the store on the channel n
Consequently, if the channel names are not globally unique, the channels can pick up data that is not intended for them.
To avoid this danger, you can use the message store `region` to keep data separate for different physical channels that have the same logical name.


[[postgresql-push]]
==== PostgreSQL: receiving push notifications

PostgreSQL offers a listen and notification framework for receiving push notifications upon database table manipulations.
Spring Integration leverages this mechanism to allow for receiving push notifications when new messages are added to a `JdbcChannelMessageStore`.
When using this feature, a database trigger must be defined, which can be found as part of the comments of the `schema-postgresql.sql` file which is included in the JDBC module of Spring Integration.

Push notifications are received via the `PostgresChannelMessageTableSubscriber` class which allows its subscribers to receive a callback upon the arrival of new messages for any given `region` and `groupId`.
These notifications are received even if a message was appended on a different JVM, but to the same database.
To read messages when they arrive, a `PostgresSubscribableChannel` can delegate new messages to a `MessageHandler` what allows to implement an effective pub-sub mechanism based on JDBC.

For example, push notifications for `some group` can be received as follows:

====
[source,java]
----
@Bean
public JdbcChannelMessageStore messageStore(
DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}

@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() -> DriverManager.getConnection(
url, username, password).unwrap(PgConnection.class));
}

@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
----
====

[IMPORTANT]
====
Any active `PostgresChannelMessageTableSubscriber` occupies an exclusive JDBC `Connection` for the duration of its active life cycle.
It is therefore important that this connection does not originate from a pooling `DataSource`.
Such connection pools do normally expect that issued connections are closed within a predefined timeout window.

For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions.
====

[[stored-procedures]]
=== Stored Procedures

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ The `DefaultLockRepository` can now be supplied with a `PlatformTransactionManag

See <<./jdbc.adoc#jdbc-lock-registry,JDBC Lock Registry>> for more information.

Furthermore, a `PostgresChannelMessageTableSubscriber` allows to receive push notifications upon new messages to a `JdbcChannelMessageStore` if using PostgreSQL as a database.

See <<./jdbc.adoc#postgresql-push,PostgreSQL: receiving push notifications>> for more information.

=== TCP/IP Changes

The `lookupHost` property of the `AbstractConnectionFactory` and `DatagramPacketMessageMapper` is now set to `false` by default to avoid delays in the environments where DNS is not configured.
Expand Down