-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Adds a listener for Postgres-based channel message stores. #3884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
41c6066
to
c1a723f
Compare
I added this as a draft for creating a better environment for discussing this. I addressed your last comments, however not the point about the dataSource.getConnection().unwrap(PgConnection.class) for getting a connection, the |
So, you mean that the logic like this |
...on-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PgConnectionSupplier.java
Show resolved
Hide resolved
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageHandler; | ||
|
||
public final class PostgresChannelMessageTableSubscription { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that I can accept this abstraction:
- It is probably better to have a
JdbcChannelMessageStore
as a ctor for the subscriber. - Have getter for it over there as well
- The Postgres message channel should expect the subscriber as its ctor and take that store from there.
This way we would avoid the problem when subscriber and store look into different DBs or tables. - The subscription should be an inner interface of the subscriber to avoid classes tangle.
- This way a
region
would be just an internal of the subscriber since both subscriber and channel would rely on the same store, therefore on the same region. - I don't believe in external
Runnable onPossibleUpdate
: it is really has to be anonUpdate()
impl details. That's why I'm asking about aSubscribableChannel
impl together with the mentioned subscription contract. - Somehow "subscriber" and "subscription" words don't work for me here: they are clashing with Reactive Streams. Not sure yet in other names though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I follow, you suggest to make the JdbcChannelMessageStore a constructor argument to the subscriber? Then it would no longer be possible to reuse the subscriber for different regions. This would require multiple connections to block for each region, would it not? Ideally, I wanted to avoid this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Sorry, I forgot that JdbcChannelMessageStore
is restricted to specific region.
We will have to document it properly (and in JavaDocs as well), that JdbcChannelMessageStore
on the channel we want to subscribe from and PgConnectionSupplier
must belong to the same table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this will need some extensive javadoc, but I wanted to get the API right before I started on writing anything there. Due to the sharing requirement, the API will get slightly more complicated than it otherwise would have been, but I think it's worth that performance-wise.
For example Spring Boot would always auto-configure a data source where you could not use it without constant reconnects. And I have rarely seen unpooled data sources being used. I think that's the right approach. |
...java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
Outdated
Show resolved
Hide resolved
...tegration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql
Outdated
Show resolved
Hide resolved
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageHandler; | ||
|
||
public final class PostgresChannelMessageTableSubscription { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Sorry, I forgot that JdbcChannelMessageStore
is restricted to specific region.
We will have to document it properly (and in JavaDocs as well), that JdbcChannelMessageStore
on the channel we want to subscribe from and PgConnectionSupplier
must belong to the same table.
} | ||
// Avoid that stop/start sequence reactivates previously stopped thread. | ||
Object current = new Object(); | ||
identity = current; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You just avoid this via making those start()
and stop()
as synchronized
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would not work as the method returns before the executor thread might have ended. If the executor's thread dangles around, it could pick up a subsequent start. By switching the identity, it is certain that this additional listener should no longer listen or produce events, even if it takes the thread some time to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to wait in the stop()
until that thread is done with its loop before returning from the method. Something like CountDownLatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed that to use a latch now.
c1a723f
to
0197628
Compare
} | ||
// Avoid that stop/start sequence reactivates previously stopped thread. | ||
Object current = new Object(); | ||
identity = current; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to wait in the stop()
until that thread is done with its loop before returning from the method. Something like CountDownLatch
-- RETURNS TRIGGER AS | ||
-- $BODY$ | ||
-- BEGIN | ||
-- EXECUTE 'NOTIFY int_channel_message_notify, ''' || QUOTE_LITERAL(NEW.REGION || NEW.GROUP_KEY) || ''''; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why there is no some readable concatenator between region and group?
What if I have them like this: myregion
and mygroup
?
So, with this I'll end up as myregionmygroup
. something what is not end-user friendly 😢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that, since it's not exposed to the user, it does not matter much, but it's probably nice for debugging purposes. I added a space now.
8f2b3db
to
5d39c70
Compare
...java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
Show resolved
Hide resolved
...n-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java
Show resolved
Hide resolved
...tegration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-postgresql.sql
Outdated
Show resolved
Hide resolved
this.onPossibleUpdate.run(); | ||
} | ||
|
||
public static PostgresChannelMessageTableSubscription ofMessageStore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still against this solution.
I'd like to see a subscription interface and some SubscribableChannel
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I redid it to yield a SubscribableChannel
. Of course, the implementation could be changed to only allow for this abstraction, but this would take away the possibility to being informed that there are new messages without polling them off the queue.
f0c3a32
to
a6396ba
Compare
return result; | ||
} | ||
|
||
public static SubscribableChannel asSubscribableChannel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you still insist on this factory?
This is not Spring-friendly and I doubt anyone is going to use a subscription outside of the channel.
The PostgresSubscribableChannel
must be a top-level API and no any extra classes for some onPossibleUpdate
.
and I doubt in this onPossibleUpdate()
name yet. Why just not notify()
?
What is wrong with extracting contract Subscription
interface and implement it on the channel?
The PostgresChannelMessageTableSubscriber
must be a ctor argument for that channel.
Also dispatching must be handled by the MessageDispatcher
abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition is to expose the most primitive abstraction on the base level, which is receiving a notification whenever a message was added to the group. If you wanted to use this information to read a message from a worker thread or directly, should be up to the user. I made a few changes now to expose a different API that always goes via a SubscribableChannel
, if you'd prefer not introducing more API. I think this would satisfy the most common use.
a6396ba
to
ab121ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more not forgot to say:
No reason to squash your commits all the time: we are going to do that on merge.
Meanwhile, during review and feedback process, it is better to keep commits history.
This way we know what was before we started and concluded some discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are not going to implement a MessageChannel
as a first-class citizen for Spring Integration functionality and not going to add tests, I cannot accept the PR as it is right now and take it over for original expectations to follow up. We have some other priorities at the moment, so it may take some time until we incorporate this feature.
Thank you for understanding!
@@ -0,0 +1,198 @@ | |||
/* | |||
* Copyright 2002-2022 the original author or authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new class, so only the current year applies.
import java.sql.Statement; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No asterisk imports, please.
And also this is wrong order.
You simply can run :spring-integration-jdbc:check
Gradle to see what is wrong with Checkstyle we require.
return subscriptions.add(subscription); | ||
} | ||
|
||
boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these two methods must be public
import org.springframework.messaging.SubscribableChannel; | ||
import org.springframework.util.Assert; | ||
|
||
final class PostgresChannelMessageTableSubscription { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see this class as a Spring Integration structure.
Your original concern was exactly about a MessageChannel
to be ready for notification from the PostgreSQL and there is no a MessageChannel
as a top-level Spring Integration API in your PR.
It is fully not clean why you resist to follow a simple Subscription
contract interface and a MessageChannel
.
Plus I said before that without an inner interface in the subscriber there is going to be a class tangle, which you exactly have just introduced with that asSubscribableChannel()
factory method.
There is also no tests yet for this PR.
We have in the jdbc
module a MySqlContainerTest
to test against MySQL container in Docker.
Probably we can have something similar for Postgres to be sure in the functionality you introduce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand what API you are looking for. Before that, I would not want to spend time on cleaning up the implementation or adding tests as this would require additional effort for every API suggestion. Once we find out that we're on the same page regarding the API, it's easy to add tests and clean up, that's why I set this PR as a draft.
I made the subscription class package-private to indicate that it's not the API I would go for but rather expose a factory for a SubscribableChannel. We have not discussed ChannelDispatcher before, only subscribable channel, would you rather have it as a factory for such a dispatcher?
What do you mean by "inner interface"? That the PostgresChannelMessageTableSubscriber should implement one? There's no interface in Spring integration that fits the necessary API, so I would need to create one. This goes back to what we discussed that it needs to be shared across regions and group ids as it fully occupies a physical Postgres connection.
@raphw , my idea about an implementation and API is like this: artembilan@92de4ca |
I have changed the implementation, cleaned up and added javadoc and a test. The test is still failing, somehow the notification does never arrive at the listener. Is the setup so that it wraps everything in a separate transaction? It is really strange that the listen does not trigger here. |
As an example: a very similar implementation using Testcontainers that works: https://github.com/Skatteetaten/skatteprosessen-task/blob/main/task-processor-jdbc/src/main/java/no/skatteetaten/fastsetting/formueinntekt/felles/task/processor/jdbc/PostgresProcessorNotifier.java |
For some reason, Postgres does not like the inline concatenation when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good progress.
This probably won't make it into today's release, but definitely next milestone.
Thank you!
...on-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PgConnectionSupplier.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
Outdated
Show resolved
Hide resolved
* @return {@code true} if the subscription was not already added. | ||
*/ | ||
public boolean subscribe(Subscription subscription) { | ||
Set<Subscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer code like length not more than 120
symbols.
...java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
Show resolved
Hide resolved
.../org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTest.java
Outdated
Show resolved
Hide resolved
...n-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java
Outdated
Show resolved
Hide resolved
...ringframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTest-context.xml
Outdated
Show resolved
Hide resolved
.../org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTest.java
Outdated
Show resolved
Hide resolved
.isTrue(); | ||
} | ||
finally { | ||
postgresChannelMessageTableSubscriber.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After making this as a bean, the ApplicationContext
will take care about its lifecycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to control the life cycle explicitly since one test checks a registration prior to starting the life cycle and one while the subscriber is already running.
.../org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTest.java
Outdated
Show resolved
Hide resolved
I addressed all comments but am left with not having the subscriber being registered as a bean to allow for two tests where subscribers are registered in a started and in a stopped condition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good.
Would you mind to rebase your branch to the latest main
and I'll it locally for final review.
Perhaps it may make it into today's release after all 😃 .
...n-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java
Show resolved
Hide resolved
… for a JdbcChannelMessageStore.
…ract * Implement a `PostgresSubscribableChannel`
ef54d8d
to
a03d082
Compare
I rebased on master. |
Of course, thank you for your feedback along the way. As for the documentation: #3890 |
Addresses #3872.
This listener allows for push notifications upon the arrival of new messages for a given group id and region.