Skip to content

Commit 44bc1de

Browse files
committed
Explicitly reject invalid aggregate event registrations during publishing.
We now detect that the consumption of the events published during a persistence operation has produced new event instances that would go unpublished and raise an explaining exception. Previously such a scenario would've resulted in a ConcurrentModificationException. We primarily reject such a scenario as handling the additional event would extend our convenience mechanism over the publishing scope a direct 1:1 replacement with ApplicationEventPublisher would've achieved. Fixes GH-3116.
1 parent 1f347dc commit 44bc1de

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

Diff for: src/main/java/org/springframework/data/repository/core/support/EventPublishingRepositoryProxyPostProcessor.java

+40-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.lang.annotation.Annotation;
1919
import java.lang.reflect.Method;
20+
import java.util.ArrayList;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Map;
@@ -28,7 +29,6 @@
2829
import org.springframework.context.ApplicationEventPublisher;
2930
import org.springframework.data.domain.AfterDomainEventPublication;
3031
import org.springframework.data.domain.DomainEvents;
31-
import org.springframework.data.repository.CrudRepository;
3232
import org.springframework.data.repository.core.RepositoryInformation;
3333
import org.springframework.data.util.AnnotationDetectionMethodCallback;
3434
import org.springframework.lang.Nullable;
@@ -111,7 +111,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
111111
return result;
112112
}
113113

114-
Iterable<?> arguments = asCollection(invocation.getArguments()[0], invocation.getMethod());
114+
Iterable<?> arguments = asIterable(invocation.getArguments()[0], invocation.getMethod());
115115

116116
eventMethod.publishEventsFrom(arguments, publisher);
117117

@@ -144,6 +144,9 @@ static class EventPublishingMethod {
144144
private static Map<Class<?>, EventPublishingMethod> cache = new ConcurrentReferenceHashMap<>();
145145
private static @SuppressWarnings("null") EventPublishingMethod NONE = new EventPublishingMethod(Object.class, null,
146146
null);
147+
private static String ILLEGAL_MODIFCATION = "Aggregate's events were modified during event publication. "
148+
+ "Make sure event listeners obtain a fresh instance of the aggregate before adding further events. "
149+
+ "Additional events found: %s.";
147150

148151
private final Class<?> type;
149152
private final Method publishingMethod;
@@ -188,18 +191,33 @@ public static EventPublishingMethod of(Class<?> type) {
188191
* @param aggregates can be {@literal null}.
189192
* @param publisher must not be {@literal null}.
190193
*/
191-
public void publishEventsFrom(Iterable<?> aggregates, ApplicationEventPublisher publisher) {
194+
public void publishEventsFrom(@Nullable Iterable<?> aggregates, ApplicationEventPublisher publisher) {
195+
196+
if (aggregates == null) {
197+
return;
198+
}
192199

193200
for (Object aggregateRoot : aggregates) {
194201

195202
if (!type.isInstance(aggregateRoot)) {
196203
continue;
197204
}
198205

199-
for (Object event : asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot), null)) {
206+
var events = asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot));
207+
208+
for (Object event : events) {
200209
publisher.publishEvent(event);
201210
}
202211

212+
var postPublication = asCollection(ReflectionUtils.invokeMethod(publishingMethod, aggregateRoot));
213+
214+
if (events.size() != postPublication.size()) {
215+
216+
postPublication.removeAll(events);
217+
218+
throw new IllegalStateException(ILLEGAL_MODIFCATION.formatted(postPublication));
219+
}
220+
203221
if (clearingMethod != null) {
204222
ReflectionUtils.invokeMethod(clearingMethod, aggregateRoot);
205223
}
@@ -272,23 +290,34 @@ private static Method getClearingMethod(AnnotationDetectionMethodCallback<?> cle
272290
* one-element collection, {@literal null} will become an empty collection.
273291
*
274292
* @param source can be {@literal null}.
275-
* @return
293+
* @return will never be {@literal null}.
276294
*/
277295
@SuppressWarnings("unchecked")
278-
private static Iterable<Object> asCollection(@Nullable Object source, @Nullable Method method) {
296+
private static Collection<Object> asCollection(@Nullable Object source) {
279297

280298
if (source == null) {
281299
return Collections.emptyList();
282300
}
283301

284-
if (method != null && method.getName().startsWith("saveAll")) {
285-
return (Iterable<Object>) source;
286-
}
287-
288302
if (Collection.class.isInstance(source)) {
289-
return (Collection<Object>) source;
303+
return new ArrayList<>((Collection<Object>) source);
290304
}
291305

292306
return Collections.singletonList(source);
293307
}
308+
309+
/**
310+
* Returns the given source object as {@link Iterable}.
311+
*
312+
* @param source can be {@literal null}.
313+
* @return will never be {@literal null}.
314+
*/
315+
@Nullable
316+
@SuppressWarnings("unchecked")
317+
private static Iterable<Object> asIterable(@Nullable Object source, @Nullable Method method) {
318+
319+
return method != null && method.getName().startsWith("saveAll")
320+
? (Iterable<Object>) source
321+
: asCollection(source);
322+
}
294323
}

Diff for: src/test/java/org/springframework/data/repository/core/support/EventPublishingRepositoryProxyPostProcessorUnitTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.*;
2121

2222
import java.lang.reflect.Method;
23+
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collection;
2526
import java.util.Collections;
@@ -324,6 +325,32 @@ void doesNotEmitEventsFromPrimitiveValue() throws Throwable {
324325
verify(publisher, never()).publishEvent(any());
325326
}
326327

328+
@Test // GH-3116
329+
void rejectsEventAddedDuringProcessing() throws Throwable {
330+
331+
var originalEvent = new SomeEvent();
332+
var eventToBeAdded = new SomeEvent();
333+
334+
var events = new ArrayList<Object>();
335+
events.add(originalEvent);
336+
337+
var aggregate = MultipleEvents.of(events);
338+
339+
doAnswer(invocation -> {
340+
341+
events.add(eventToBeAdded);
342+
return null;
343+
344+
}).when(publisher).publishEvent(any(Object.class));
345+
346+
var method = EventPublishingMethod.of(MultipleEvents.class);
347+
348+
assertThatIllegalStateException()
349+
.isThrownBy(() -> method.publishEventsFrom(List.of(aggregate), publisher))
350+
.withMessageContaining(eventToBeAdded.toString())
351+
.withMessageNotContaining(originalEvent.toString());
352+
}
353+
327354
private static void mockInvocation(MethodInvocation invocation, Method method, Object parameterAndReturnValue)
328355
throws Throwable {
329356

0 commit comments

Comments
 (0)