Skip to content

GH-8582:Transactional support in PostgresSubscribableChannel #8587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

igorlovich
Copy link
Contributor

@pivotal-cla
Copy link

@igorlovich Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-cla
Copy link

@igorlovich Thank you for signing the Contributor License Agreement!

@igorlovich igorlovich force-pushed the GH-8582 branch 2 times, most recently from 6ad6722 to 1fd9694 Compare March 28, 2023 08:05
@igorlovich igorlovich marked this pull request as ready for review March 28, 2023 08:49
@igorlovich
Copy link
Contributor Author

The failing test has nothing to do with the MR

private void notifyAll(Set<Subscription> subscriptions) {
subscriptions.forEach(it -> {
try {
this.executor.submit(it::notifyUpdate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cannot we move this executor logic into a channel implementation?
Imaging we have several channels subscribed to this publisher, so it is really that channel responsibility to handle a notification its own way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can; my thinking was that since the Subscription interface is public, we can potentially have poorly behaving Subscriptions blocking the notification thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is already our responsibility.
We can warn, though, in that Subscription Javadocs.

From here I wonder if that would be really better to have this feature implemented against this library: https://impossibl.github.io/pgjdbc-ng/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the executor into the channel.

*
* @param pool The pool to shut down
*/
private void shutdownAndAwaitTermination(@Nullable ExecutorService pool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we come back to a single task in this class as it was before, then we don't need this logic any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this logic would go into the channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably no. The default SimpleAsyncTaskExecutor we use over there has no hook for shutdown.
It is really recommended to provide externally managed Executor, which in most cases is a org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very close to what we all can agree in the end.

while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
this.dispatcher.dispatch(message);
}
this.executor.execute(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we look into the way to schedule a task per message?
Kinda perform an execute for single poll, if it was not null, then schedule the next one.

If there is a FIFO requirements, that's where this task executor can be customized externally.

Copy link
Contributor Author

@igorlovich igorlovich Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By injecting a AsyncTaskExecutor and not an Executor we can get a CompletableFuture with the result. However getting the result (to determine whether to continue the loop) implies blocking, and is therefore equivalent to doing the loop as a single task, no matter what kind of an executor is passed in.

I can see 2 approaches (there may be more ) to achieve what you're proposing

  1. Have an AtomicReference outside the loop that is set inside the task to the result of the poll. The exit condition for the while loop is the reference != null. Disadvantage of this approach is that in the case of a multi-threaded executor, depending on the order of tasks finishing, we can get useless iterations through the loop if the null from a later submitted, but earlier finishing task is overridden.

  2. Ignore what is happening inside the task. The condition to exit from the while is this.jdbcChannelMessageStore.messageGroupSize(groupId) == 0 . There would need to be a sleep(X ms) to avoid hammering the message store. Disadvantage is that the call to notifyUpdate will take an X ms latency penalty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. My suggestion is something like:

public void notifyUpdate() {
     this.executor.execute(this::pollAndSendMessage);
}
void pollAndSendMessage() {
    Message<?> message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
    if (message != null) {
          // Process message
          this.executor.execute(this::pollAndSendMessage);
    }
}

Do you see anything wrong with such an async stacking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not equally imply sequential message processing, irrespective of the Executor , since each task only submits the next one after it finishes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I see now. This is indeed the same what you have just explained with a CompletableFuture approach.
We could swap an new task submitting with the message processing. This way we can try to pull a new message in a new thread while we process the current one.
But is it really a goal for us? We would just eliminate any FIFO from this channel logic.
It might be as an option though, or can be implemented downstream as a bridge().channel(new ExecutorChannel()) to not care about FIFO.
But that's fully different story.

I think it is time for me to step back and agree with your current solution with a single-threaded process for the whole loop.

I'm going to pull your PR locally for the final review if you don't have anything else to add.

Thank you for your patience!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries it is important to get it right (at least with our current understanding of "right"). I will address your feedback on the documentation

}

/**
* Sets the transaction manager to use for message processing. Each message will be processed in a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* Sets the transaction manager to use for message processing. Each message will be processed in a
* separate transaction
* @param transactionManager The transaction manager to use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since 6.0.5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final JdbcChannelMessageStore jdbcChannelMessageStore;

private TransactionTemplate transactionTemplate;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, move this non-final property to respective section of props.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -632,6 +632,14 @@ Such connection pools do normally expect that issued connections are closed with
For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions.
====

==== Transaction support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a dedicated section id for these paragraphs. It is fully OK to have this info withing that top-level postgresql-push.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -632,6 +632,14 @@ Such connection pools do normally expect that issued connections are closed with
For this need of an exclusive connection, it is also recommended that a JVM only runs a single `PostgresChannelMessageTableSubscriber` which can be used to register any number of subscriptions.
====

==== Transaction support
Specifying a `PlatformTransactionManager` on `PostgresSubscribableChannel` will notify subscribers in a transaction.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to mention that this feature is available starting with version 6.0.5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Transactional support is not active by default.

==== Retries
A retry policy can be controlled by providing a `RetryTemplate` to `PostgresSubscribableChannel`. By default, no retries are performed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I prefer to not have these kind of comments in the PR.
If you agree with me, just push the change.
Otherwise feel free to discuss.
In the end this done looks like a spam in my e-mail box 😄

@artembilan
Copy link
Member

Merged as e39449b after some minor clean up.
And cherry-picked to 6.0.x.

@igorlovich ,

thank you very much for contribution; looking forward for more!

@artembilan artembilan closed this Mar 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants