Skip to content

Add property to @KafkaListener to allow filtering expression in RecordFilterStrategy #2134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ufasoli opened this issue Feb 24, 2022 · 6 comments · Fixed by #2138
Closed

Comments

@ufasoli
Copy link

ufasoli commented Feb 24, 2022

Expected Behavior

Add an extra property on the @KafkaListener property, for example recordFilterDiscriminator that would allow to filter a given message based on the annotated Listener of the message

Current Behavior

Currently it's not possible to filter a message based on the Listener

Context

In same cases it would be useful to be able to filter a message based on the method handling the message; for example in a case where multiple message types are sent to a central topic with a discriminator value in the header and listeners within the application would only handle messages based on that discriminator and ignore others.

For example

@KafkaListener(recordFilterDiscriminator ="xxx", topic="central-topic")
public void consumerMessagesXXX(ConsumerRecord r){
// reads only XXXX messages skip all others
}


@KafkaListener(recordFilterDiscriminator ="yyy", topic="central-topic")
public void consumerMessagesYYY(ConsumerRecord r){
// reads only YYY messages skip all others
}

initially requested in stackoverflow here

@garyrussell garyrussell added this to the 3.0.0-M3 milestone Feb 24, 2022
@garyrussell garyrussell self-assigned this Feb 24, 2022
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 24, 2022
Resolves spring-projects#2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 24, 2022
Resolves spring-projects#2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**
@ufasoli
Copy link
Author

ufasoli commented Mar 2, 2022

besides the possibility of overriding the filter I think it could be useful to have a set of properties or property that could be read by the filter so that rather than overriding the filter for each listener the main filter could do some processing

@KafkaListener(filterProperties="key1=value1,key2=value2")

So that in the filter I could do something like

   public boolean filter(ConsumerRecord consumerRecord) {
       Header source = consumerRecord.headers().lastHeader("source");
      var properties = KafkaUtils.getFilterProperties().getBytes().split(",");
     // process and filter message based on conditions
}

Additionally when using the current workaround it can cause issues since id has to be unique but I could want multiple listeners with the same pattern

@garyrussell
Copy link
Contributor

Good idea, but I think it has wider applicability; I am thinking something like this:

/**
 * Static information that will be added as a header with key
 * {@link KafkaHeaders#LISTENER_INFO}. This can be used, for example, in a
 * {@link RecordInterceptor}, {@link RecordFilterStrategy} or the listener itself, for
 * any purposes.
 * <p>
 * SpEL {@code #{...}} and property place holders {@code ${...}} are supported, but it
 * must resolve to a String or `byte[]`.
 * <p>
 * This header will be stripped out if an outbound record is created with the headers
 * from an input record.
 * @return the group id.
 * @since 2.8.4
 */
String info() default "";

@ufasoli
Copy link
Author

ufasoli commented Mar 2, 2022

Even better yes as you said it could have a wider scope..
In the meantime is there a way I can access the consumerId from the filter when I define it as idIsGroup = false ?
just a heads up there's a typo in the code as the @return javadoc says it returns the group id

@garyrussell
Copy link
Contributor

Thanks; yes, that was just a mock-up (copy/paste).

You cannot currently access the id directly, only the consumer group.id.

Having said that; you can get it via a circuitous route and a bit of reflection:

  • add an @EventListener method to your listener class to consume ConsumerStartingEvents
  • call the RabbitListenerEndpointRegistry.getListenerContainers()
  • iterate over the containers to find which one is for this listener
  • capture the id and store it in a ThreadLocal

This will only work if the listeners are in different classes.

@SpringBootApplication
public class So71237300Application {

	public static void main(String[] args) {
		SpringApplication.run(So71237300Application.class, args);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("so71237300").partitions(2).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template) {

		return args -> {
			ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
			record.headers().add("which", "xxx".getBytes());
			template.send(record);
			record = new ProducerRecord<>("so71237300", "test.to.yyy");
			record.headers().add("which", "yyy".getBytes());
			template.send(record);
		};
	}

}

@Component
class Discriminator implements RecordFilterStrategy<String, String> {

	private static final ThreadLocal<byte[]> ids = new ThreadLocal<>();

	Discriminator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
		factory.setRecordFilterStrategy(this);
	}

	@Override
	public boolean filter(ConsumerRecord<String, String> rec) {
		Header which = rec.headers().lastHeader("which");
		return which == null || !Arrays.equals(which.value(), ids.get());
	}

	public static void setId(String id) {
		ids.set(id.getBytes());
	}

	public static void clearId() {
		ids.remove();
	}

}

abstract class Listener {

	private static final Logger log = LoggerFactory.getLogger(Listener.class);

	private final KafkaListenerEndpointRegistry registry;

	public Listener(KafkaListenerEndpointRegistry registry) {
		this.registry = registry;
	}

	@EventListener
	public void starting(ConsumerStartingEvent starting) {
		Iterator<MessageListenerContainer> iterator = this.registry.getListenerContainers().iterator();
		while (iterator.hasNext()) {
			MessageListenerContainer next = iterator.next();
			Object messageListener = new DirectFieldAccessor(next.getContainerProperties().getMessageListener())
					.getPropertyValue("delegate.handlerMethod.invokerHandlerMethod.bean");
			String listenerId = next.getListenerId();
			if (messageListener.equals(this) && Thread.currentThread().getName().contains(listenerId)) {
				Discriminator.setId(listenerId);
				log.info("{} Setting id {}", getClass().getSimpleName(), next.getListenerId());
				break;
			}
		}
	}

	@EventListener
	public void stopping(ConsumerStoppedEvent starting) {
		Iterator<MessageListenerContainer> iterator = this.registry.getListenerContainers().iterator();
		while (iterator.hasNext()) {
			MessageListenerContainer next = iterator.next();
			Object messageListener = new DirectFieldAccessor(next.getContainerProperties().getMessageListener())
					.getPropertyValue("delegate.handlerMethod.invokerHandlerMethod.bean");
			String listenerId = next.getListenerId();
			if (messageListener.equals(this) && Thread.currentThread().getName().contains(listenerId)) {
				Discriminator.clearId();
				log.info("{} Clearing id {}", getClass().getSimpleName(), next.getListenerId());
				break;
			}
		}
	}

}

@Component
class Listener1 extends Listener {

	Listener1(KafkaListenerEndpointRegistry registry) {
		super(registry);
	}

	@KafkaListener(id = "xxx", topics = "so71237300", groupId = "group1", concurrency = "2")
	void listen1(String in) {
		System.out.println("1(xxx):" + in);
	}

}

@Component
class Listener2 extends Listener {

	Listener2(KafkaListenerEndpointRegistry registry) {
		super(registry);
	}

	@KafkaListener(id = "yyy", topics = "so71237300", groupId = "group2", concurrency = "2")
	void listen2(String in) {
		System.out.println("2(yyy):" + in);
	}

}
1(xxx):test.to.xxx
2(yyy):test.to.yyy

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 2, 2022
Resolves spring-projects#2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 2, 2022
Resolves spring-projects#2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**
@garyrussell
Copy link
Contributor

@ufasoli I have added the info attribute to the PR #2138

@ufasoli
Copy link
Author

ufasoli commented Mar 3, 2022

Great thanks!

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 7, 2022
Resolves spring-projects#2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**
artembilan pushed a commit that referenced this issue Mar 7, 2022
Resolves #2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**

* Polishing per review comments.

* Add info property to KafkaListener.

* Rebase; add since.
artembilan pushed a commit that referenced this issue Mar 7, 2022
Resolves #2134

Also clean up parsing of `errorHandler` and `contentTypeConverter`.

**cherry-pick to 2.8.x**

* Polishing per review comments.

* Add info property to KafkaListener.

* Rebase; add since.
garyrussell added a commit that referenced this issue Mar 7, 2022
garyrussell added a commit that referenced this issue Mar 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants