diff --git a/pom.xml b/pom.xml index 3cb3ee86a..d44db2f0c 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-cassandra-parent - 4.0.0-SNAPSHOT + 4.0.0-GH-1286-SNAPSHOT pom Spring Data for Apache Cassandra diff --git a/spring-data-cassandra-distribution/pom.xml b/spring-data-cassandra-distribution/pom.xml index 31302cd59..a82febb76 100644 --- a/spring-data-cassandra-distribution/pom.xml +++ b/spring-data-cassandra-distribution/pom.xml @@ -8,7 +8,7 @@ org.springframework.data spring-data-cassandra-parent - 4.0.0-SNAPSHOT + 4.0.0-GH-1286-SNAPSHOT ../pom.xml diff --git a/spring-data-cassandra/pom.xml b/spring-data-cassandra/pom.xml index 422c23983..53396cde8 100644 --- a/spring-data-cassandra/pom.xml +++ b/spring-data-cassandra/pom.xml @@ -8,7 +8,7 @@ org.springframework.data spring-data-cassandra-parent - 4.0.0-SNAPSHOT + 4.0.0-GH-1286-SNAPSHOT ../pom.xml diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java index 82a7089d0..d57d545d7 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -124,7 +125,7 @@ public class AsyncCassandraTemplate private final StatementFactory statementFactory; - private @Nullable ApplicationEventPublisher eventPublisher; + private final EntityLifecycleEventDelegate eventDelegate; private @Nullable EntityCallbacks entityCallbacks; @@ -190,11 +191,12 @@ public AsyncCassandraTemplate(AsyncCqlTemplate asyncCqlTemplate, CassandraConver this.entityOperations = new EntityOperations(converter); this.exceptionTranslator = asyncCqlTemplate.getExceptionTranslator(); this.statementFactory = new StatementFactory(converter); + this.eventDelegate = new EntityLifecycleEventDelegate(); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.eventPublisher = applicationEventPublisher; + this.eventDelegate.setPublisher(applicationEventPublisher); } @Override @@ -214,6 +216,18 @@ public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) { this.entityCallbacks = entityCallbacks; } + /** + * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be + * published or whether emission should be suppressed. Enabled by default. + * + * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events. + * @since 4.0 + * @see CassandraMappingEvent + */ + public void setEntityLifecycleEventsEnabled(boolean enabled) { + this.eventDelegate.setEventsEnabled(enabled); + } + @Override public AsyncCqlOperations getAsyncCqlOperations() { return this.cqlOperations; @@ -456,11 +470,12 @@ private ListenableFuture doDelete(Query query, Class entityClass, Cq tableName); SimpleStatement delete = builder.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(delete, entityClass, tableName)); ListenableFuture future = doExecute(delete, AsyncResultSet::wasApplied); - future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {}); + future.addCallback(success -> maybeEmitEvent(() -> new AfterDeleteEvent<>(delete, entityClass, tableName)), + e -> {}); return future; } @@ -677,8 +692,8 @@ private ListenableFuture doDeleteVersioned(Object entity, QueryOpti if (!result.wasApplied()) { throw new OptimisticLockingFailureException( - String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", - entity, source.getVersion(), tableName)); + String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", entity, + source.getVersion(), tableName)); } }); } @@ -702,10 +717,11 @@ public ListenableFuture deleteById(Object id, Class entityClass) { StatementBuilder builder = getStatementFactory().deleteById(id, entity, tableName); SimpleStatement delete = builder.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(delete, entityClass, tableName)); ListenableFuture future = doExecute(delete, AsyncResultSet::wasApplied); - future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {}); + future.addCallback(success -> maybeEmitEvent(() -> new AfterDeleteEvent<>(delete, entityClass, tableName)), + e -> {}); return future; } @@ -719,10 +735,11 @@ public ListenableFuture truncate(Class entityClass) { Truncate truncate = QueryBuilder.truncate(tableName); SimpleStatement statement = truncate.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entityClass, tableName)); ListenableFuture future = doExecute(statement, AsyncResultSet::wasApplied); - future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)), e -> {}); + future.addCallback(success -> maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entityClass, tableName)), + e -> {}); return new MappingListenableFutureAdapter<>(future, aBoolean -> null); } @@ -753,7 +770,7 @@ private ListenableFuture> executeSave(T entity, CqlIden private ListenableFuture> executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement, Consumer beforeAfterSaveEvent) { - maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement)); + maybeEmitEvent(() -> new BeforeSaveEvent<>(entity, tableName, statement)); T entityToSave = maybeCallBeforeSave(entity, tableName, statement); ListenableFuture result = doQueryForResultSet(statement); @@ -766,7 +783,7 @@ private ListenableFuture> executeSave(T entity, CqlIden beforeAfterSaveEvent.accept(writeResult); - maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName)); + maybeEmitEvent(() -> new AfterSaveEvent<>(entityToSave, tableName)); return writeResult; }); @@ -775,7 +792,7 @@ private ListenableFuture> executeSave(T entity, CqlIden private ListenableFuture executeDelete(Object entity, CqlIdentifier tableName, SimpleStatement statement, Consumer resultConsumer) { - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); ListenableFuture result = doQueryForResultSet(statement); @@ -786,7 +803,7 @@ private ListenableFuture executeDelete(Object entity, CqlIdentifier resultConsumer.accept(writeResult); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entity.getClass(), tableName)); return writeResult; }); @@ -864,9 +881,7 @@ public String getCql() { } } - return getAsyncCqlOperations() - .execute(new GetConfiguredPageSize()) - .completable().join(); + return getAsyncCqlOperations().execute(new GetConfiguredPageSize()).completable().join(); } @SuppressWarnings("unchecked") @@ -876,12 +891,12 @@ private Function getMapper(Class entityType, Class targetType, return row -> { - maybeEmitEvent(new AfterLoadEvent<>(row, targetType, tableName)); + maybeEmitEvent(() -> new AfterLoadEvent<>(row, targetType, tableName)); T result = getConverter().project(projection, row); if (result != null) { - maybeEmitEvent(new AfterConvertEvent<>(row, result, tableName)); + maybeEmitEvent(() -> new AfterConvertEvent<>(row, result, tableName)); } return result; @@ -899,11 +914,8 @@ private static MappingCassandraConverter newConverter(CqlSession session) { return converter; } - protected , T> void maybeEmitEvent(E event) { - - if (this.eventPublisher != null) { - this.eventPublisher.publishEvent(event); - } + protected , T> void maybeEmitEvent(Supplier event) { + this.eventDelegate.publishEvent(event); } protected T maybeCallBeforeConvert(T object, CqlIdentifier tableName) { diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java index 6970962da..93adfcd05 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.commons.logging.Log; @@ -118,7 +119,7 @@ public class CassandraTemplate implements CassandraOperations, ApplicationEventP private final StatementFactory statementFactory; - private @Nullable ApplicationEventPublisher eventPublisher; + private final EntityLifecycleEventDelegate eventDelegate; private @Nullable EntityCallbacks entityCallbacks; @@ -183,6 +184,7 @@ public CassandraTemplate(CqlOperations cqlOperations, CassandraConverter convert this.cqlOperations = cqlOperations; this.entityOperations = new EntityOperations(converter); this.statementFactory = new StatementFactory(new QueryMapper(converter), new UpdateMapper(converter)); + this.eventDelegate = new EntityLifecycleEventDelegate(); } @Override @@ -192,7 +194,7 @@ public CassandraBatchOperations batchOps(BatchType batchType) { @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.eventPublisher = applicationEventPublisher; + this.eventDelegate.setPublisher(applicationEventPublisher); } @Override @@ -212,6 +214,18 @@ public void setEntityCallbacks(@Nullable EntityCallbacks entityCallbacks) { this.entityCallbacks = entityCallbacks; } + /** + * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be + * published or whether emission should be suppressed. Enabled by default. + * + * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events. + * @since 4.0 + * @see CassandraMappingEvent + */ + public void setEntityLifecycleEventsEnabled(boolean enabled) { + this.eventDelegate.setEventsEnabled(enabled); + } + @Override public CqlOperations getCqlOperations() { return this.cqlOperations; @@ -488,11 +502,11 @@ WriteResult doDelete(Query query, Class entityClass, CqlIdentifier tableName) tableName); SimpleStatement statement = delete.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entityClass, tableName)); WriteResult writeResult = doExecute(statement); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entityClass, tableName)); return writeResult; } @@ -700,8 +714,8 @@ private WriteResult doDeleteVersioned(SimpleStatement statement, Object entity, if (!result.wasApplied()) { throw new OptimisticLockingFailureException( - String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", - entity, source.getVersion(), tableName)); + String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", entity, + source.getVersion(), tableName)); } }); } @@ -722,11 +736,11 @@ public boolean deleteById(Object id, Class entityClass) { StatementBuilder delete = getStatementFactory().deleteById(id, entity, tableName); SimpleStatement statement = delete.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entityClass, tableName)); boolean result = doExecute(statement).wasApplied(); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entityClass, tableName)); return result; } @@ -740,11 +754,11 @@ public void truncate(Class entityClass) { Truncate truncate = QueryBuilder.truncate(tableName); SimpleStatement statement = truncate.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entityClass, tableName)); doExecute(statement); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entityClass, tableName)); } // ------------------------------------------------------------------------- @@ -795,13 +809,13 @@ private EntityWriteResult executeSave(T entity, CqlIdentifier tableName, private EntityWriteResult executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement, Consumer resultConsumer) { - maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement)); + maybeEmitEvent(() -> new BeforeSaveEvent<>(entity, tableName, statement)); T entityToSave = maybeCallBeforeSave(entity, tableName, statement); WriteResult result = doExecute(statement); resultConsumer.accept(result); - maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName)); + maybeEmitEvent(() -> new AfterSaveEvent<>(entityToSave, tableName)); return EntityWriteResult.of(result, entityToSave); } @@ -809,13 +823,13 @@ private EntityWriteResult executeSave(T entity, CqlIdentifier tableName, private WriteResult executeDelete(Object entity, CqlIdentifier tableName, SimpleStatement statement, Consumer resultConsumer) { - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); WriteResult result = doExecute(statement); resultConsumer.accept(result); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entity.getClass(), tableName)); return result; } @@ -907,12 +921,12 @@ private Function getMapper(EntityProjection projection, CqlIde return row -> { - maybeEmitEvent(new AfterLoadEvent<>(row, targetType, tableName)); + maybeEmitEvent(() -> new AfterLoadEvent<>(row, targetType, tableName)); T result = getConverter().project(projection, row); if (result != null) { - maybeEmitEvent(new AfterConvertEvent<>(row, result, tableName)); + maybeEmitEvent(() -> new AfterConvertEvent<>(row, result, tableName)); } return result; @@ -930,11 +944,8 @@ private static MappingCassandraConverter newConverter(CqlSession session) { return converter; } - protected , T> void maybeEmitEvent(E event) { - - if (this.eventPublisher != null) { - this.eventPublisher.publishEvent(event); - } + protected , T> void maybeEmitEvent(Supplier event) { + this.eventDelegate.publishEvent(event); } protected T maybeCallBeforeConvert(T object, CqlIdentifier tableName) { diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/EntityLifecycleEventDelegate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/EntityLifecycleEventDelegate.java new file mode 100644 index 000000000..cff6fe2f1 --- /dev/null +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/EntityLifecycleEventDelegate.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.cassandra.core; + +import java.util.function.Supplier; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.lang.Nullable; + +/** + * Delegate class to encapsulate lifecycle event configuration and publishing. Event creation is deferred within an + * event {@link Supplier} to delay the actual event object creation. + * + * @author Mark Paluch + * @since 4.0 + * @see ApplicationEventPublisher + */ +class EntityLifecycleEventDelegate { + + private @Nullable ApplicationEventPublisher publisher; + private boolean eventsEnabled = true; + + public void setPublisher(@Nullable ApplicationEventPublisher publisher) { + this.publisher = publisher; + } + + public boolean isEventsEnabled() { + return eventsEnabled; + } + + public void setEventsEnabled(boolean eventsEnabled) { + this.eventsEnabled = eventsEnabled; + } + + /** + * Publish an application event if event publishing is enabled. + * + * @param eventSupplier the supplier for application events. + */ + public void publishEvent(Supplier eventSupplier) { + + if (canPublishEvent()) { + publisher.publishEvent(eventSupplier.get()); + } + } + + private boolean canPublishEvent() { + return publisher != null && eventsEnabled; + } +} diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java index 94cdc1b54..9c78cfb9e 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraTemplate.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -124,7 +125,7 @@ public class ReactiveCassandraTemplate private final StatementFactory statementFactory; - private @Nullable ApplicationEventPublisher eventPublisher; + private final EntityLifecycleEventDelegate eventDelegate; private @Nullable ReactiveEntityCallbacks entityCallbacks; @@ -190,6 +191,7 @@ public ReactiveCassandraTemplate(ReactiveCqlOperations reactiveCqlOperations, Ca this.cqlOperations = reactiveCqlOperations; this.entityOperations = new EntityOperations(converter); this.statementFactory = new StatementFactory(converter); + this.eventDelegate = new EntityLifecycleEventDelegate(); } @Override @@ -199,7 +201,7 @@ public ReactiveCassandraBatchOperations batchOps(BatchType batchType) { @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.eventPublisher = applicationEventPublisher; + this.eventDelegate.setPublisher(applicationEventPublisher); } @Override @@ -219,6 +221,18 @@ public void setEntityCallbacks(@Nullable ReactiveEntityCallbacks entityCallbacks this.entityCallbacks = entityCallbacks; } + /** + * Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be + * published or whether emission should be suppressed. Enabled by default. + * + * @param enabled {@code true} to enable entity lifecycle events; {@code false} to disable entity lifecycle events. + * @since 4.0 + * @see CassandraMappingEvent + */ + public void setEntityLifecycleEventsEnabled(boolean enabled) { + this.eventDelegate.setEventsEnabled(enabled); + } + @Override public ReactiveCqlOperations getReactiveCqlOperations() { return this.cqlOperations; @@ -385,8 +399,7 @@ Flux doSelect(Query query, Class entityClass, CqlIdentifier tableName, CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entityClass); EntityProjection projection = entityOperations.introspectProjection(returnType, entityClass); Columns columns = getStatementFactory().computeColumnsForProjection(projection, query.getColumns(), - persistentEntity, - returnType); + persistentEntity, returnType); Query queryToUse = query.columns(columns); @@ -453,9 +466,9 @@ Mono doDelete(Query query, Class entityClass, CqlIdentifier tabl SimpleStatement delete = builder.build(); Mono writeResult = doExecuteAndFlatMap(delete, ReactiveCassandraTemplate::toWriteResult) - .doOnSubscribe(it -> maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName))); + .doOnSubscribe(it -> maybeEmitEvent(() -> new BeforeDeleteEvent<>(delete, entityClass, tableName))); - return writeResult.doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName))); + return writeResult.doOnNext(it -> maybeEmitEvent(() -> new AfterDeleteEvent<>(delete, entityClass, tableName))); } // ------------------------------------------------------------------------- @@ -678,8 +691,8 @@ private Mono doDeleteVersioned(SimpleStatement delete, Object entit if (!result.wasApplied()) { sink.error(new OptimisticLockingFailureException( - String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", - entity, source.getVersion(), tableName))); + String.format("Cannot delete entity %s with version %s in table %s; Has it been modified meanwhile", entity, + source.getVersion(), tableName))); return; } @@ -705,9 +718,9 @@ public Mono deleteById(Object id, Class entityClass) { SimpleStatement delete = builder.build(); Mono result = doExecute(delete, ReactiveResultSet::wasApplied) - .doOnSubscribe(it -> maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName))); + .doOnSubscribe(it -> maybeEmitEvent(() -> new BeforeDeleteEvent<>(delete, entityClass, tableName))); - return result.doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName))); + return result.doOnNext(it -> maybeEmitEvent(() -> new AfterDeleteEvent<>(delete, entityClass, tableName))); } @Override @@ -720,9 +733,10 @@ public Mono truncate(Class entityClass) { SimpleStatement statement = truncate.build(); Mono result = doExecute(statement, ReactiveResultSet::wasApplied) - .doOnSubscribe(it -> maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName))); + .doOnSubscribe(it -> maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entityClass, tableName))); - return result.doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName))).then(); + return result.doOnNext(it -> maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entityClass, tableName))) + .then(); } // ------------------------------------------------------------------------- @@ -775,13 +789,13 @@ private Mono> executeSave(T entity, CqlIdentifier table return Mono.defer(() -> { - maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement)); + maybeEmitEvent(() -> new BeforeSaveEvent<>(entity, tableName, statement)); return maybeCallBeforeSave(entity, tableName, statement).flatMapMany(entityToSave -> { Mono execute = doExecuteAndFlatMap(statement, ReactiveCassandraTemplate::toWriteResult); return execute.map(it -> EntityWriteResult.of(it, entityToSave)).handle(handler) // - .doOnNext(it -> maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName))); + .doOnNext(it -> maybeEmitEvent(() -> new AfterSaveEvent<>(entityToSave, tableName))); }).next(); }); } @@ -789,13 +803,13 @@ private Mono> executeSave(T entity, CqlIdentifier table private Mono executeDelete(Object entity, CqlIdentifier tableName, SimpleStatement statement, BiConsumer> handler) { - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(() -> new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); Mono execute = doExecuteAndFlatMap(statement, ReactiveCassandraTemplate::toWriteResult); return execute.map(it -> EntityWriteResult.of(it, entity)).handle(handler) // - .doOnSubscribe(it -> maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement))) // - .doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableName))); + .doOnSubscribe(it -> maybeEmitEvent(() -> new BeforeSaveEvent<>(entity, tableName, statement))) // + .doOnNext(it -> maybeEmitEvent(() -> new AfterDeleteEvent<>(statement, entity.getClass(), tableName))); } private Flux doQuery(Statement statement, RowMapper rowMapper) { @@ -862,9 +876,7 @@ public String getCql() { } } - return getReactiveCqlOperations() - .execute(new GetConfiguredPageSize()) - .single(); + return getReactiveCqlOperations().execute(new GetConfiguredPageSize()).single(); } @SuppressWarnings("unchecked") @@ -874,12 +886,12 @@ private Function getMapper(EntityProjection projection, CqlIde return row -> { - maybeEmitEvent(new AfterLoadEvent<>(row, targetType, tableName)); + maybeEmitEvent(() -> new AfterLoadEvent<>(row, targetType, tableName)); T result = getConverter().project(projection, row); if (result != null) { - maybeEmitEvent(new AfterConvertEvent<>(row, result, tableName)); + maybeEmitEvent(() -> new AfterConvertEvent<>(row, result, tableName)); } return result; @@ -903,11 +915,8 @@ private static MappingCassandraConverter newConverter(ReactiveSession session) { return converter; } - protected , T> void maybeEmitEvent(E event) { - - if (this.eventPublisher != null) { - this.eventPublisher.publishEvent(event); - } + protected , T> void maybeEmitEvent(Supplier event) { + this.eventDelegate.publishEvent(event); } protected Mono maybeCallBeforeConvert(T object, CqlIdentifier tableName) { diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/mapping/event/CassandraTemplateEventIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/mapping/event/CassandraTemplateEventIntegrationTests.java index a720157ae..54967c9cb 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/mapping/event/CassandraTemplateEventIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/mapping/event/CassandraTemplateEventIntegrationTests.java @@ -58,6 +58,16 @@ void streamShouldEmitEvents() { assertThat(getListener().getAfterConvert()).extracting(CassandraMappingEvent::getSource).containsOnly(firstUser); } + @Test // GH-1286 + void disablingEventsShouldSuppressEvents() { + + template.setEntityLifecycleEventsEnabled(false); + template.stream("SELECT * FROM users;", User.class).collect(Collectors.toList()); // Just load entire stream. + + assertThat(getListener().getAfterLoad()).isEmpty(); + assertThat(getListener().getAfterConvert()).isEmpty(); + } + @Test // DATACASS-106 void sliceShouldEmitEvents() { diff --git a/src/main/asciidoc/reference/mapping.adoc b/src/main/asciidoc/reference/mapping.adoc index 7394ecae8..c83db5b0a 100644 --- a/src/main/asciidoc/reference/mapping.adoc +++ b/src/main/asciidoc/reference/mapping.adoc @@ -520,7 +520,10 @@ include::./converters.adoc[] include::../{spring-data-commons-docs}/is-new-state-detection.adoc[leveloffset=+1] -NOTE: Cassandra provides no means to generate identifiers upon inserting data. As consequence, entities must be associated with identifier values. Spring Data defaults to identifier inspection to determine whether an entity is new. If you want to use <> make sure to either use <> or implement `Persistable` for proper entity state detection. +NOTE: Cassandra provides no means to generate identifiers upon inserting data. +As consequence, entities must be associated with identifier values. +Spring Data defaults to identifier inspection to determine whether an entity is new. +If you want to use <> make sure to either use <> or implement `Persistable` for proper entity state detection. [[cassandra.mapping-usage.events]] == Lifecycle Events @@ -530,6 +533,8 @@ Being based on Spring's application context event infrastructure lets other prod To intercept an object before it goes into the database, you can register a subclass of `org.springframework.data.cassandra.core.mapping.event.AbstractCassandraEventListener` that overrides the `onBeforeSave(…)` method. When the event is dispatched, your listener is called and passed the domain object (which is a Java entity). +Entity lifecycle events can be costly and you may notice a change in the performance profile when loading large result sets. +You can disable lifecycle events on the Template API object. The following example uses the `onBeforeSave` method: ====