Skip to content

There is no easy way to set up the order of listening to topics #1827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
BOOMeranGG opened this issue Jun 11, 2021 · 5 comments · Fixed by #1832
Closed

There is no easy way to set up the order of listening to topics #1827

BOOMeranGG opened this issue Jun 11, 2021 · 5 comments · Fixed by #1832
Assignees
Milestone

Comments

@BOOMeranGG
Copy link

I used spring-kafka in my work, and I had a need to configure the order of starting listening to topics. I mean, start listening to topic B only after all the messages from topic A have been processed.
At the moment, there is no easy way to do this (like setting up the order in @KafkaListener). Please correct me if I'm wrong.

If you think that this functionality is worth implementing, I can try to do it.

@garyrussell
Copy link
Contributor

You can add 2 @KafkaListener annotations; set the one for topic B autoStartup = "false".

For the first container enable idle container events and add an event listener.

When the first container goes idle, in the event listener, stop the first container and start the second one.

@BOOMeranGG
Copy link
Author

Yes, I have implemented as you wrote. But it becomes not so convenient when there are several such dependencies. Topic A is waiting for B, C, D, topic E is waiting for F and G, e.t.c.
Recently I had to implement this for the second time, and I think it would be convenient to specify in @KafkaListener.
For example initializationOrder=2, and the topic will start only when other topics with initializationOrder=1 finish receiving data.

@garyrussell
Copy link
Contributor

We already have a property containerGroup; I think a new component would be simpler, and easier to configure/understand. Something like this...

/**
 * Sequence the starting of container groups when all containers in the previous group are
 * idle.
 *
 * @author Gary Russell
 * @since 2.8
 *
 */
public abstract class ContainerGroupSequencer {

	/**
	 * Set containers in each group to not auto start. Start the containers in the first
	 * group. Start containers in group[n] when all containers in group[n-1] are idle;
	 * stop the containers in group[n-1].
	 * @param registry the registry.
	 * @param defaultIdleEventInterval the idle event interval if not already set.
	 * @param containerGroups The list of container groups, in order.
	 */
	public ContainerGroupSequencer(KafkaListenerEndpointRegistry registry, long defaultIdleEventInterval,
			String... containerGroups) {

		// ...
	}

}

@BOOMeranGG
Copy link
Author

BOOMeranGG commented Jun 14, 2021

Thanks! This looks like what I need

@garyrussell garyrussell added this to the 2.7.3 milestone Jun 14, 2021
@garyrussell garyrussell self-assigned this Jun 14, 2021
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 14, 2021
Resolves spring-projects#1827

Add a mechanism to automatically control a sequence of containers,
starting a group of containers when the current group goes idle.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jun 14, 2021
Resolves spring-projects#1827

Add a mechanism to automatically control a sequence of containers,
starting a group of containers when the current group goes idle.
@garyrussell
Copy link
Contributor

I created a PR; sorry, I didn't notice your comment about submitting a change.

It turned out to be a bit more complicated than I thought it would be due to some initialization timing issues.

artembilan pushed a commit that referenced this issue Jun 15, 2021
Resolves #1827

Add a mechanism to automatically control a sequence of containers,
starting a group of containers when the current group goes idle.

* Improve test; fix race.

* Fix typo and revert test logging level.
garyrussell added a commit that referenced this issue Jun 16, 2021
garyrussell added a commit that referenced this issue Jul 1, 2021
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Aug 9, 2021
Set `autoStartup` to `false` so we see the `Starting first group` message.

**cherry-pick to 2.7.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Aug 9, 2021
Set `autoStartup` to `false` so we see the `Starting first group` message.

**cherry-pick to 2.7.x**
artembilan pushed a commit that referenced this issue Aug 9, 2021
Set `autoStartup` to `false` so we see the `Starting first group` message.

**cherry-pick to 2.7.x**
artembilan pushed a commit that referenced this issue Aug 9, 2021
Set `autoStartup` to `false` so we see the `Starting first group` message.

**cherry-pick to 2.7.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Aug 12, 2021
- increase idle event interval to prevent sequencer stopping the container too soon.
- tighten up synchronization in the sequencer

**cherry-pick to 2.7.x**
artembilan pushed a commit that referenced this issue Aug 12, 2021
- increase idle event interval to prevent sequencer stopping the container too soon.
- tighten up synchronization in the sequencer

**cherry-pick to 2.7.x**
artembilan pushed a commit that referenced this issue Aug 12, 2021
- increase idle event interval to prevent sequencer stopping the container too soon.
- tighten up synchronization in the sequencer

**cherry-pick to 2.7.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants