Skip to content

Adds a listener for Postgres-based channel message stores. #3884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

raphw
Copy link
Contributor

@raphw raphw commented Sep 8, 2022

Addresses #3872.

This listener allows for push notifications upon the arrival of new messages for a given group id and region.

@raphw raphw force-pushed the postgres-notification-listener branch from 41c6066 to c1a723f Compare September 8, 2022 08:41
@raphw raphw marked this pull request as draft September 8, 2022 08:41
@raphw
Copy link
Contributor Author

raphw commented Sep 8, 2022

I added this as a draft for creating a better environment for discussing this. I addressed your last comments, however not the point about the DataSource as it will most likely not work. If one used

dataSource.getConnection().unwrap(PgConnection.class)

for getting a connection, the DataSource would likely be a pooled one. These pools normally expect a connection to return to the pool after a given timeout, otherwise, they close the physical connection and report a connection leak. This would abort the listener, and repeat the process as a new connection was checked out. Therefore, a connection must normally be read directly from the driver, often via Driver.getConnection, that's why the interface is added.

@artembilan
Copy link
Member

the DataSource would likely be a pooled one

So, you mean that the logic like this dataSource.getConnection().unwrap(PgConnection.class) is not not always correct because we cannot be sure what type of DataSource is provided.
If that is a case, then we must not tempt the fate and avoid problems as much as possible.
Therefore and approach with a Supplier<PgConnection> is the way to go - and we leave a choice up to end-user. And here already we can show a sample as this dataSource.getConnection().unwrap(PgConnection.class).

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;

public final class PostgresChannelMessageTableSubscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that I can accept this abstraction:

  1. It is probably better to have a JdbcChannelMessageStore as a ctor for the subscriber.
  2. Have getter for it over there as well
  3. The Postgres message channel should expect the subscriber as its ctor and take that store from there.
    This way we would avoid the problem when subscriber and store look into different DBs or tables.
  4. The subscription should be an inner interface of the subscriber to avoid classes tangle.
  5. This way a region would be just an internal of the subscriber since both subscriber and channel would rely on the same store, therefore on the same region.
  6. I don't believe in external Runnable onPossibleUpdate: it is really has to be an onUpdate() impl details. That's why I'm asking about a SubscribableChannel impl together with the mentioned subscription contract.
  7. Somehow "subscriber" and "subscription" words don't work for me here: they are clashing with Reactive Streams. Not sure yet in other names though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow, you suggest to make the JdbcChannelMessageStore a constructor argument to the subscriber? Then it would no longer be possible to reuse the subscriber for different regions. This would require multiple connections to block for each region, would it not? Ideally, I wanted to avoid this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... Sorry, I forgot that JdbcChannelMessageStore is restricted to specific region.
We will have to document it properly (and in JavaDocs as well), that JdbcChannelMessageStore on the channel we want to subscribe from and PgConnectionSupplier must belong to the same table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this will need some extensive javadoc, but I wanted to get the API right before I started on writing anything there. Due to the sharing requirement, the API will get slightly more complicated than it otherwise would have been, but I think it's worth that performance-wise.

@raphw
Copy link
Contributor Author

raphw commented Sep 8, 2022

the DataSource would likely be a pooled one

So, you mean that the logic like this dataSource.getConnection().unwrap(PgConnection.class) is not not always correct because we cannot be sure what type of DataSource is provided. If that is a case, then we must not tempt the fate and avoid problems as much as possible. Therefore and approach with a Supplier<PgConnection> is the way to go - and we leave a choice up to end-user. And here already we can show a sample as this dataSource.getConnection().unwrap(PgConnection.class).

For example Spring Boot would always auto-configure a data source where you could not use it without constant reconnects. And I have rarely seen unpooled data sources being used. I think that's the right approach.

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;

public final class PostgresChannelMessageTableSubscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... Sorry, I forgot that JdbcChannelMessageStore is restricted to specific region.
We will have to document it properly (and in JavaDocs as well), that JdbcChannelMessageStore on the channel we want to subscribe from and PgConnectionSupplier must belong to the same table.

}
// Avoid that stop/start sequence reactivates previously stopped thread.
Object current = new Object();
identity = current;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just avoid this via making those start() and stop() as synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would not work as the method returns before the executor thread might have ended. If the executor's thread dangles around, it could pick up a subsequent start. By switching the identity, it is certain that this additional listener should no longer listen or produce events, even if it takes the thread some time to complete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to wait in the stop() until that thread is done with its loop before returning from the method. Something like CountDownLatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed that to use a latch now.

@raphw raphw force-pushed the postgres-notification-listener branch from c1a723f to 0197628 Compare September 12, 2022 17:14
}
// Avoid that stop/start sequence reactivates previously stopped thread.
Object current = new Object();
identity = current;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to wait in the stop() until that thread is done with its loop before returning from the method. Something like CountDownLatch

-- RETURNS TRIGGER AS
-- $BODY$
-- BEGIN
-- EXECUTE 'NOTIFY int_channel_message_notify, ''' || QUOTE_LITERAL(NEW.REGION || NEW.GROUP_KEY) || '''';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there is no some readable concatenator between region and group?
What if I have them like this: myregion and mygroup ?
So, with this I'll end up as myregionmygroup. something what is not end-user friendly 😢

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that, since it's not exposed to the user, it does not matter much, but it's probably nice for debugging purposes. I added a space now.

@raphw raphw force-pushed the postgres-notification-listener branch 10 times, most recently from 8f2b3db to 5d39c70 Compare September 12, 2022 21:49
this.onPossibleUpdate.run();
}

public static PostgresChannelMessageTableSubscription ofMessageStore(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still against this solution.
I'd like to see a subscription interface and some SubscribableChannel implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I redid it to yield a SubscribableChannel. Of course, the implementation could be changed to only allow for this abstraction, but this would take away the possibility to being informed that there are new messages without polling them off the queue.

@raphw raphw force-pushed the postgres-notification-listener branch 3 times, most recently from f0c3a32 to a6396ba Compare September 13, 2022 16:00
return result;
}

public static SubscribableChannel asSubscribableChannel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you still insist on this factory?
This is not Spring-friendly and I doubt anyone is going to use a subscription outside of the channel.
The PostgresSubscribableChannel must be a top-level API and no any extra classes for some onPossibleUpdate .
and I doubt in this onPossibleUpdate() name yet. Why just not notify() ?
What is wrong with extracting contract Subscription interface and implement it on the channel?
The PostgresChannelMessageTableSubscriber must be a ctor argument for that channel.
Also dispatching must be handled by the MessageDispatcher abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is to expose the most primitive abstraction on the base level, which is receiving a notification whenever a message was added to the group. If you wanted to use this information to read a message from a worker thread or directly, should be up to the user. I made a few changes now to expose a different API that always goes via a SubscribableChannel, if you'd prefer not introducing more API. I think this would satisfy the most common use.

@raphw raphw force-pushed the postgres-notification-listener branch from a6396ba to ab121ed Compare September 13, 2022 20:11
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more not forgot to say:

No reason to squash your commits all the time: we are going to do that on merge.
Meanwhile, during review and feedback process, it is better to keep commits history.
This way we know what was before we started and concluded some discussion.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are not going to implement a MessageChannel as a first-class citizen for Spring Integration functionality and not going to add tests, I cannot accept the PR as it is right now and take it over for original expectations to follow up. We have some other priorities at the moment, so it may take some time until we incorporate this feature.

Thank you for understanding!

@@ -0,0 +1,198 @@
/*
* Copyright 2002-2022 the original author or authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new class, so only the current year applies.

import java.sql.Statement;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No asterisk imports, please.
And also this is wrong order.
You simply can run :spring-integration-jdbc:check Gradle to see what is wrong with Checkstyle we require.

return subscriptions.add(subscription);
}

boolean unsubscribe(PostgresChannelMessageTableSubscription subscription) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these two methods must be public

import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

final class PostgresChannelMessageTableSubscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see this class as a Spring Integration structure.
Your original concern was exactly about a MessageChannel to be ready for notification from the PostgreSQL and there is no a MessageChannel as a top-level Spring Integration API in your PR.
It is fully not clean why you resist to follow a simple Subscription contract interface and a MessageChannel.
Plus I said before that without an inner interface in the subscriber there is going to be a class tangle, which you exactly have just introduced with that asSubscribableChannel() factory method.

There is also no tests yet for this PR.
We have in the jdbc module a MySqlContainerTest to test against MySQL container in Docker.
Probably we can have something similar for Postgres to be sure in the functionality you introduce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand what API you are looking for. Before that, I would not want to spend time on cleaning up the implementation or adding tests as this would require additional effort for every API suggestion. Once we find out that we're on the same page regarding the API, it's easy to add tests and clean up, that's why I set this PR as a draft.

I made the subscription class package-private to indicate that it's not the API I would go for but rather expose a factory for a SubscribableChannel. We have not discussed ChannelDispatcher before, only subscribable channel, would you rather have it as a factory for such a dispatcher?

What do you mean by "inner interface"? That the PostgresChannelMessageTableSubscriber should implement one? There's no interface in Spring integration that fits the necessary API, so I would need to create one. This goes back to what we discussed that it needs to be shared across regions and group ids as it fully occupies a physical Postgres connection.

@artembilan
Copy link
Member

@raphw ,

my idea about an implementation and API is like this: artembilan@92de4ca

@raphw
Copy link
Contributor Author

raphw commented Sep 16, 2022

I have changed the implementation, cleaned up and added javadoc and a test.

The test is still failing, somehow the notification does never arrive at the listener. Is the setup so that it wraps everything in a separate transaction? It is really strange that the listen does not trigger here.

@raphw
Copy link
Contributor Author

raphw commented Sep 16, 2022

@raphw
Copy link
Contributor Author

raphw commented Sep 19, 2022

For some reason, Postgres does not like the inline concatenation when NOTIFY is used, but accepts it via the pg_notify function. The integration test is now passing and I would consider this a working solution.

@raphw raphw marked this pull request as ready for review September 19, 2022 18:34
@raphw raphw requested a review from artembilan September 19, 2022 18:34
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good progress.
This probably won't make it into today's release, but definitely next milestone.
Thank you!

* @return {@code true} if the subscription was not already added.
*/
public boolean subscribe(Subscription subscription) {
Set<Subscription> subscriptions = this.subscriptions.computeIfAbsent(subscription.getRegion() + " " + getKey(subscription.getGroupId()), ignored -> ConcurrentHashMap.newKeySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer code like length not more than 120 symbols.

.isTrue();
}
finally {
postgresChannelMessageTableSubscriber.stop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After making this as a bean, the ApplicationContext will take care about its lifecycle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to control the life cycle explicitly since one test checks a registration prior to starting the life cycle and one while the subscriber is already running.

@raphw
Copy link
Contributor Author

raphw commented Sep 20, 2022

I addressed all comments but am left with not having the subscriber being registered as a bean to allow for two tests where subscribers are registered in a started and in a stopped condition.

@raphw raphw requested a review from artembilan September 20, 2022 17:58
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good.
Would you mind to rebase your branch to the latest main and I'll it locally for final review.
Perhaps it may make it into today's release after all 😃 .

@raphw raphw force-pushed the postgres-notification-listener branch from ef54d8d to a03d082 Compare September 20, 2022 18:33
@raphw
Copy link
Contributor Author

raphw commented Sep 20, 2022

I rebased on master.

@raphw raphw requested a review from artembilan September 20, 2022 18:37
@artembilan
Copy link
Member

Merged as 14d8597.

Unfortunately I don't have time today to add docs for this new feature, but we are OK to go with then in the next milestone.

@raphw ,

thank you very much for the contribution; looking forward for more! For example those docs for this new feature 😄

@artembilan artembilan closed this Sep 20, 2022
@raphw
Copy link
Contributor Author

raphw commented Sep 20, 2022

Of course, thank you for your feedback along the way.

As for the documentation: #3890

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants