-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add property to @KafkaListener to allow filtering expression in RecordFilterStrategy #2134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Resolves spring-projects#2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`.
Resolves spring-projects#2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x**
besides the possibility of overriding the filter I think it could be useful to have a set of properties or property that could be read by the filter so that rather than overriding the filter for each listener the main filter could do some processing
So that in the filter I could do something like
Additionally when using the current workaround it can cause issues since id has to be unique but I could want multiple listeners with the same pattern |
Good idea, but I think it has wider applicability; I am thinking something like this: /**
* Static information that will be added as a header with key
* {@link KafkaHeaders#LISTENER_INFO}. This can be used, for example, in a
* {@link RecordInterceptor}, {@link RecordFilterStrategy} or the listener itself, for
* any purposes.
* <p>
* SpEL {@code #{...}} and property place holders {@code ${...}} are supported, but it
* must resolve to a String or `byte[]`.
* <p>
* This header will be stripped out if an outbound record is created with the headers
* from an input record.
* @return the group id.
* @since 2.8.4
*/
String info() default ""; |
Even better yes as you said it could have a wider scope.. |
Thanks; yes, that was just a mock-up (copy/paste). You cannot currently access the id directly, only the consumer Having said that; you can get it via a circuitous route and a bit of reflection:
This will only work if the listeners are in different classes. @SpringBootApplication
public class So71237300Application {
public static void main(String[] args) {
SpringApplication.run(So71237300Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71237300").partitions(2).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
record.headers().add("which", "xxx".getBytes());
template.send(record);
record = new ProducerRecord<>("so71237300", "test.to.yyy");
record.headers().add("which", "yyy".getBytes());
template.send(record);
};
}
}
@Component
class Discriminator implements RecordFilterStrategy<String, String> {
private static final ThreadLocal<byte[]> ids = new ThreadLocal<>();
Discriminator(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
factory.setRecordFilterStrategy(this);
}
@Override
public boolean filter(ConsumerRecord<String, String> rec) {
Header which = rec.headers().lastHeader("which");
return which == null || !Arrays.equals(which.value(), ids.get());
}
public static void setId(String id) {
ids.set(id.getBytes());
}
public static void clearId() {
ids.remove();
}
}
abstract class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
private final KafkaListenerEndpointRegistry registry;
public Listener(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
@EventListener
public void starting(ConsumerStartingEvent starting) {
Iterator<MessageListenerContainer> iterator = this.registry.getListenerContainers().iterator();
while (iterator.hasNext()) {
MessageListenerContainer next = iterator.next();
Object messageListener = new DirectFieldAccessor(next.getContainerProperties().getMessageListener())
.getPropertyValue("delegate.handlerMethod.invokerHandlerMethod.bean");
String listenerId = next.getListenerId();
if (messageListener.equals(this) && Thread.currentThread().getName().contains(listenerId)) {
Discriminator.setId(listenerId);
log.info("{} Setting id {}", getClass().getSimpleName(), next.getListenerId());
break;
}
}
}
@EventListener
public void stopping(ConsumerStoppedEvent starting) {
Iterator<MessageListenerContainer> iterator = this.registry.getListenerContainers().iterator();
while (iterator.hasNext()) {
MessageListenerContainer next = iterator.next();
Object messageListener = new DirectFieldAccessor(next.getContainerProperties().getMessageListener())
.getPropertyValue("delegate.handlerMethod.invokerHandlerMethod.bean");
String listenerId = next.getListenerId();
if (messageListener.equals(this) && Thread.currentThread().getName().contains(listenerId)) {
Discriminator.clearId();
log.info("{} Clearing id {}", getClass().getSimpleName(), next.getListenerId());
break;
}
}
}
}
@Component
class Listener1 extends Listener {
Listener1(KafkaListenerEndpointRegistry registry) {
super(registry);
}
@KafkaListener(id = "xxx", topics = "so71237300", groupId = "group1", concurrency = "2")
void listen1(String in) {
System.out.println("1(xxx):" + in);
}
}
@Component
class Listener2 extends Listener {
Listener2(KafkaListenerEndpointRegistry registry) {
super(registry);
}
@KafkaListener(id = "yyy", topics = "so71237300", groupId = "group2", concurrency = "2")
void listen2(String in) {
System.out.println("2(yyy):" + in);
}
}
|
Resolves spring-projects#2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x**
Resolves spring-projects#2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x**
Great thanks! |
Resolves spring-projects#2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x**
Resolves #2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x** * Polishing per review comments. * Add info property to KafkaListener. * Rebase; add since.
Resolves #2134 Also clean up parsing of `errorHandler` and `contentTypeConverter`. **cherry-pick to 2.8.x** * Polishing per review comments. * Add info property to KafkaListener. * Rebase; add since.
Uh oh!
There was an error while loading. Please reload this page.
Expected Behavior
Add an extra property on the
@KafkaListener
property, for examplerecordFilterDiscriminator
that would allow to filter a given message based on the annotated Listener of the messageCurrent Behavior
Currently it's not possible to filter a message based on the Listener
Context
In same cases it would be useful to be able to filter a message based on the method handling the message; for example in a case where multiple message types are sent to a central topic with a discriminator value in the header and listeners within the application would only handle messages based on that discriminator and ignore others.
For example
initially requested in stackoverflow here
The text was updated successfully, but these errors were encountered: