-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-8582:Transactional support in PostgresSubscribableChannel #8587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@igorlovich Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@igorlovich Thank you for signing the Contributor License Agreement! |
6ad6722
to
1fd9694
Compare
The failing test has nothing to do with the MR |
private void notifyAll(Set<Subscription> subscriptions) { | ||
subscriptions.forEach(it -> { | ||
try { | ||
this.executor.submit(it::notifyUpdate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cannot we move this executor logic into a channel implementation?
Imaging we have several channels subscribed to this publisher, so it is really that channel responsibility to handle a notification its own way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can; my thinking was that since the Subscription interface is public, we can potentially have poorly behaving Subscriptions blocking the notification thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is already our responsibility.
We can warn, though, in that Subscription
Javadocs.
From here I wonder if that would be really better to have this feature implemented against this library: https://impossibl.github.io/pgjdbc-ng/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved the executor into the channel.
* | ||
* @param pool The pool to shut down | ||
*/ | ||
private void shutdownAndAwaitTermination(@Nullable ExecutorService pool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we come back to a single task in this class as it was before, then we don't need this logic any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this logic would go into the channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably no. The default SimpleAsyncTaskExecutor
we use over there has no hook for shutdown.
It is really recommended to provide externally managed Executor
, which in most cases is a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
.
...java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very close to what we all can agree in the end.
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) { | ||
this.dispatcher.dispatch(message); | ||
} | ||
this.executor.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we look into the way to schedule a task per message?
Kinda perform an execute for single poll, if it was not null, then schedule the next one.
If there is a FIFO requirements, that's where this task executor can be customized externally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By injecting a AsyncTaskExecutor
and not an Executor
we can get a CompletableFuture
with the result. However getting the result (to determine whether to continue the loop) implies blocking, and is therefore equivalent to doing the loop as a single task, no matter what kind of an executor is passed in.
I can see 2 approaches (there may be more ) to achieve what you're proposing
-
Have an
AtomicReference
outside the loop that is set inside the task to the result of the poll. The exit condition for the while loop is the reference != null. Disadvantage of this approach is that in the case of a multi-threaded executor, depending on the order of tasks finishing, we can get useless iterations through the loop if the null from a later submitted, but earlier finishing task is overridden. -
Ignore what is happening inside the task. The condition to exit from the while is
this.jdbcChannelMessageStore.messageGroupSize(groupId) == 0
. There would need to be a sleep(X ms) to avoid hammering the message store. Disadvantage is that the call to notifyUpdate will take an X ms latency penalty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. My suggestion is something like:
public void notifyUpdate() {
this.executor.execute(this::pollAndSendMessage);
}
void pollAndSendMessage() {
Message<?> message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
if (message != null) {
// Process message
this.executor.execute(this::pollAndSendMessage);
}
}
Do you see anything wrong with such an async stacking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not equally imply sequential message processing, irrespective of the Executor
, since each task only submits the next one after it finishes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... I see now. This is indeed the same what you have just explained with a CompletableFuture
approach.
We could swap an new task submitting with the message processing. This way we can try to pull a new message in a new thread while we process the current one.
But is it really a goal for us? We would just eliminate any FIFO from this channel logic.
It might be as an option though, or can be implemented downstream as a bridge().channel(new ExecutorChannel())
to not care about FIFO.
But that's fully different story.
I think it is time for me to step back and agree with your current solution with a single-threaded process for the whole loop.
I'm going to pull your PR locally for the final review if you don't have anything else to add.
Thank you for your patience!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries it is important to get it right (at least with our current understanding of "right"). I will address your feedback on the documentation
} | ||
|
||
/** | ||
* Sets the transaction manager to use for message processing. Each message will be processed in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imperative language in method Javadocs, please: https://github.com/spring-projects/spring-framework/wiki/Code-Style#javadoc-formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Sets the transaction manager to use for message processing. Each message will be processed in a | ||
* separate transaction | ||
* @param transactionManager The transaction manager to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@since 6.0.5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final JdbcChannelMessageStore jdbcChannelMessageStore; | ||
|
||
private TransactionTemplate transactionTemplate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, move this non-final
property to respective section of props.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/reference/asciidoc/jdbc.adoc
Outdated
@@ -632,6 +632,14 @@ Such connection pools do normally expect that issued connections are closed with | |||
For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions. | |||
==== | |||
|
|||
==== Transaction support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a dedicated section id for these paragraphs. It is fully OK to have this info withing that top-level postgresql-push
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/reference/asciidoc/jdbc.adoc
Outdated
@@ -632,6 +632,14 @@ Such connection pools do normally expect that issued connections are closed with | |||
For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions. | |||
==== | |||
|
|||
==== Transaction support | |||
Specifying a `PlatformTransactionManager` on `PostgresSubscribableChannel` will notify subscribers in a transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to mention that this feature is available starting with version 6.0.5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/reference/asciidoc/jdbc.adoc
Outdated
Transactional support is not active by default. | ||
|
||
==== Retries | ||
A retry policy can be controlled by providing a `RetryTemplate` to `PostgresSubscribableChannel`. By default, no retries are performed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One sentence per line, please: https://asciidoctor.org/docs/asciidoc-recommended-practices/#one-sentence-per-line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: I prefer to not have these kind of comments in the PR.
If you agree with me, just push the change.
Otherwise feel free to discuss.
In the end this done
looks like a spam in my e-mail box 😄
Merged as e39449b after some minor clean up. thank you very much for contribution; looking forward for more! |
As discussed in Transactional support in PostgresSubscribableChannel