diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 03921beae..cc6400f0a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -758,6 +758,12 @@ public ElasticsearchPersistentEntity getPersistentEntityFor(@Nullable Class getClusterVersion(); + @Nullable + public String getEntityRouting(Object entity) { + return entityOperations.forEntity(entity, converter.getConversionService(), routingResolver) + .getRouting(); + } + /** * Value class to capture client independent information from a response to an index request. */ diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java index e9304a5f0..b5add5ef5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java @@ -77,6 +77,16 @@ public interface ReactiveElasticsearchOperations */ ReactiveClusterOperations cluster(); + /** + * gets the routing for an entity. + * + * @param entity the entity + * @return the routing, may be null if not set. + * @since 5.2 + */ + @Nullable + String getEntityRouting(Object entity); + // region customizations /** * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java index dfcc56219..a12342846 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java @@ -40,6 +40,7 @@ import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.data.util.StreamUtils; import org.springframework.data.util.Streamable; @@ -257,7 +258,7 @@ public void deleteById(ID id) { Assert.notNull(id, "Cannot delete entity with id 'null'."); - doDelete(id, getIndexCoordinates()); + doDelete(id, null, getIndexCoordinates()); } @Override @@ -265,7 +266,7 @@ public void deleteById(ID id, @Nullable RefreshPolicy refreshPolicy) { Assert.notNull(id, "Cannot delete entity with id 'null'."); - doDelete(id, getIndexCoordinates(), refreshPolicy); + doDelete(id, null, getIndexCoordinates(), refreshPolicy); } @Override @@ -273,7 +274,7 @@ public void delete(T entity) { Assert.notNull(entity, "Cannot delete 'null' entity."); - doDelete(extractIdFromBean(entity), getIndexCoordinates()); + doDelete(extractIdFromBean(entity), operations.getEntityRouting(entity), getIndexCoordinates()); } @Override @@ -281,7 +282,7 @@ public void delete(T entity, @Nullable RefreshPolicy refreshPolicy) { Assert.notNull(entity, "Cannot delete 'null' entity."); - doDelete(extractIdFromBean(entity), getIndexCoordinates(), refreshPolicy); + doDelete(extractIdFromBean(entity), operations.getEntityRouting(entity), getIndexCoordinates(), refreshPolicy); } @Override @@ -352,17 +353,26 @@ private List getEntityIds(Iterable entities) { return ids; } - private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates) { + private void doDelete(@Nullable ID id, @Nullable String routing, IndexCoordinates indexCoordinates) { if (id != null) { - executeAndRefresh(operations -> operations.delete(stringIdRepresentation(id), indexCoordinates)); + executeAndRefresh(operations -> { + var ops = routing != null ? operations.withRouting(RoutingResolver.just(routing)) : operations; + // noinspection DataFlowIssue + return ops.delete(stringIdRepresentation(id), indexCoordinates); + }); } } - private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates, @Nullable RefreshPolicy refreshPolicy) { + private void doDelete(@Nullable ID id, @Nullable String routing, IndexCoordinates indexCoordinates, + @Nullable RefreshPolicy refreshPolicy) { if (id != null) { - executeAndRefresh(operations -> operations.delete(stringIdRepresentation(id), indexCoordinates), refreshPolicy); + executeAndRefresh(operations -> { + var ops = routing != null ? operations.withRouting(RoutingResolver.just(routing)) : operations; + // noinspection DataFlowIssue + return ops.delete(stringIdRepresentation(id), indexCoordinates); + }, refreshPolicy); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index ccfdd4f66..4a75c356c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -33,6 +33,7 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -256,7 +257,9 @@ public Mono delete(T entity) { Assert.notNull(entity, "Entity must not be null!"); - return operations.delete(entity, entityInformation.getIndexCoordinates()) // + var routing = operations.getEntityRouting(entity); + var ops = routing != null ? operations.withRouting(RoutingResolver.just(routing)) : operations; + return ops.delete(entity, entityInformation.getIndexCoordinates()) // .then(doRefresh()); } @@ -265,7 +268,9 @@ public Mono delete(T entity, @Nullable RefreshPolicy refreshPolicy) { Assert.notNull(entity, "Entity must not be null!"); - return operations.withRefreshPolicy(refreshPolicy).delete(entity, entityInformation.getIndexCoordinates()) // + var routing = operations.getEntityRouting(entity); + var ops = routing != null ? operations.withRouting(RoutingResolver.just(routing)) : operations; + return ops.withRefreshPolicy(refreshPolicy).delete(entity, entityInformation.getIndexCoordinates()) // .then(doRefresh()); }