20
20
import java .util .concurrent .CompletionStage ;
21
21
import java .util .function .Consumer ;
22
22
import java .util .function .Function ;
23
+ import java .util .function .Supplier ;
23
24
import java .util .stream .Collectors ;
24
25
import java .util .stream .StreamSupport ;
25
26
@@ -124,7 +125,7 @@ public class AsyncCassandraTemplate
124
125
125
126
private final StatementFactory statementFactory ;
126
127
127
- private @ Nullable ApplicationEventPublisher eventPublisher ;
128
+ private final EntityLifecycleEventDelegate eventDelegate ;
128
129
129
130
private @ Nullable EntityCallbacks entityCallbacks ;
130
131
@@ -190,11 +191,12 @@ public AsyncCassandraTemplate(AsyncCqlTemplate asyncCqlTemplate, CassandraConver
190
191
this .entityOperations = new EntityOperations (converter );
191
192
this .exceptionTranslator = asyncCqlTemplate .getExceptionTranslator ();
192
193
this .statementFactory = new StatementFactory (converter );
194
+ this .eventDelegate = new EntityLifecycleEventDelegate ();
193
195
}
194
196
195
197
@ Override
196
198
public void setApplicationEventPublisher (ApplicationEventPublisher applicationEventPublisher ) {
197
- this .eventPublisher = applicationEventPublisher ;
199
+ this .eventDelegate . setPublisher ( applicationEventPublisher ) ;
198
200
}
199
201
200
202
@ Override
@@ -214,6 +216,18 @@ public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) {
214
216
this .entityCallbacks = entityCallbacks ;
215
217
}
216
218
219
+ /**
220
+ * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
221
+ * published or whether emission should be suppressed. Enabled by default.
222
+ *
223
+ * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events.
224
+ * @since 4.0
225
+ * @see CassandraMappingEvent
226
+ */
227
+ public void setEntityLifecycleEventsEnabled (boolean enabled ) {
228
+ this .eventDelegate .setEventsEnabled (enabled );
229
+ }
230
+
217
231
@ Override
218
232
public AsyncCqlOperations getAsyncCqlOperations () {
219
233
return this .cqlOperations ;
@@ -456,11 +470,12 @@ private ListenableFuture<Boolean> doDelete(Query query, Class<?> entityClass, Cq
456
470
tableName );
457
471
SimpleStatement delete = builder .build ();
458
472
459
- maybeEmitEvent (new BeforeDeleteEvent <>(delete , entityClass , tableName ));
473
+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(delete , entityClass , tableName ));
460
474
461
475
ListenableFuture <Boolean > future = doExecute (delete , AsyncResultSet ::wasApplied );
462
476
463
- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(delete , entityClass , tableName )), e -> {});
477
+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(delete , entityClass , tableName )),
478
+ e -> {});
464
479
465
480
return future ;
466
481
}
@@ -677,8 +692,8 @@ private ListenableFuture<WriteResult> doDeleteVersioned(Object entity, QueryOpti
677
692
678
693
if (!result .wasApplied ()) {
679
694
throw new OptimisticLockingFailureException (
680
- String .format ("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile" ,
681
- entity , source .getVersion (), tableName ));
695
+ String .format ("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile" , entity ,
696
+ source .getVersion (), tableName ));
682
697
}
683
698
});
684
699
}
@@ -702,10 +717,11 @@ public ListenableFuture<Boolean> deleteById(Object id, Class<?> entityClass) {
702
717
StatementBuilder <Delete > builder = getStatementFactory ().deleteById (id , entity , tableName );
703
718
SimpleStatement delete = builder .build ();
704
719
705
- maybeEmitEvent (new BeforeDeleteEvent <>(delete , entityClass , tableName ));
720
+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(delete , entityClass , tableName ));
706
721
707
722
ListenableFuture <Boolean > future = doExecute (delete , AsyncResultSet ::wasApplied );
708
- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(delete , entityClass , tableName )), e -> {});
723
+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(delete , entityClass , tableName )),
724
+ e -> {});
709
725
710
726
return future ;
711
727
}
@@ -719,10 +735,11 @@ public ListenableFuture<Void> truncate(Class<?> entityClass) {
719
735
Truncate truncate = QueryBuilder .truncate (tableName );
720
736
SimpleStatement statement = truncate .build ();
721
737
722
- maybeEmitEvent (new BeforeDeleteEvent <>(statement , entityClass , tableName ));
738
+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(statement , entityClass , tableName ));
723
739
724
740
ListenableFuture <Boolean > future = doExecute (statement , AsyncResultSet ::wasApplied );
725
- future .addCallback (success -> maybeEmitEvent (new AfterDeleteEvent <>(statement , entityClass , tableName )), e -> {});
741
+ future .addCallback (success -> maybeEmitEvent (() -> new AfterDeleteEvent <>(statement , entityClass , tableName )),
742
+ e -> {});
726
743
727
744
return new MappingListenableFutureAdapter <>(future , aBoolean -> null );
728
745
}
@@ -753,7 +770,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
753
770
private <T > ListenableFuture <EntityWriteResult <T >> executeSave (T entity , CqlIdentifier tableName ,
754
771
SimpleStatement statement , Consumer <WriteResult > beforeAfterSaveEvent ) {
755
772
756
- maybeEmitEvent (new BeforeSaveEvent <>(entity , tableName , statement ));
773
+ maybeEmitEvent (() -> new BeforeSaveEvent <>(entity , tableName , statement ));
757
774
T entityToSave = maybeCallBeforeSave (entity , tableName , statement );
758
775
759
776
ListenableFuture <AsyncResultSet > result = doQueryForResultSet (statement );
@@ -766,7 +783,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
766
783
767
784
beforeAfterSaveEvent .accept (writeResult );
768
785
769
- maybeEmitEvent (new AfterSaveEvent <>(entityToSave , tableName ));
786
+ maybeEmitEvent (() -> new AfterSaveEvent <>(entityToSave , tableName ));
770
787
771
788
return writeResult ;
772
789
});
@@ -775,7 +792,7 @@ private <T> ListenableFuture<EntityWriteResult<T>> executeSave(T entity, CqlIden
775
792
private ListenableFuture <WriteResult > executeDelete (Object entity , CqlIdentifier tableName , SimpleStatement statement ,
776
793
Consumer <WriteResult > resultConsumer ) {
777
794
778
- maybeEmitEvent (new BeforeDeleteEvent <>(statement , entity .getClass (), tableName ));
795
+ maybeEmitEvent (() -> new BeforeDeleteEvent <>(statement , entity .getClass (), tableName ));
779
796
780
797
ListenableFuture <AsyncResultSet > result = doQueryForResultSet (statement );
781
798
@@ -786,7 +803,7 @@ private ListenableFuture<WriteResult> executeDelete(Object entity, CqlIdentifier
786
803
787
804
resultConsumer .accept (writeResult );
788
805
789
- maybeEmitEvent (new AfterDeleteEvent <>(statement , entity .getClass (), tableName ));
806
+ maybeEmitEvent (() -> new AfterDeleteEvent <>(statement , entity .getClass (), tableName ));
790
807
791
808
return writeResult ;
792
809
});
@@ -864,9 +881,7 @@ public String getCql() {
864
881
}
865
882
}
866
883
867
- return getAsyncCqlOperations ()
868
- .execute (new GetConfiguredPageSize ())
869
- .completable ().join ();
884
+ return getAsyncCqlOperations ().execute (new GetConfiguredPageSize ()).completable ().join ();
870
885
}
871
886
872
887
@ SuppressWarnings ("unchecked" )
@@ -876,12 +891,12 @@ private <T> Function<Row, T> getMapper(Class<?> entityType, Class<T> targetType,
876
891
877
892
return row -> {
878
893
879
- maybeEmitEvent (new AfterLoadEvent <>(row , targetType , tableName ));
894
+ maybeEmitEvent (() -> new AfterLoadEvent <>(row , targetType , tableName ));
880
895
881
896
T result = getConverter ().project (projection , row );
882
897
883
898
if (result != null ) {
884
- maybeEmitEvent (new AfterConvertEvent <>(row , result , tableName ));
899
+ maybeEmitEvent (() -> new AfterConvertEvent <>(row , result , tableName ));
885
900
}
886
901
887
902
return result ;
@@ -899,11 +914,8 @@ private static MappingCassandraConverter newConverter(CqlSession session) {
899
914
return converter ;
900
915
}
901
916
902
- protected <E extends CassandraMappingEvent <T >, T > void maybeEmitEvent (E event ) {
903
-
904
- if (this .eventPublisher != null ) {
905
- this .eventPublisher .publishEvent (event );
906
- }
917
+ protected <E extends CassandraMappingEvent <T >, T > void maybeEmitEvent (Supplier <E > event ) {
918
+ this .eventDelegate .publishEvent (event );
907
919
}
908
920
909
921
protected <T > T maybeCallBeforeConvert (T object , CqlIdentifier tableName ) {
0 commit comments