Skip to content

Improve multiget return. #1710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ Previously the reactive code initialized this to `IMMEDIATE`, now reactive and n
==== delete methods that take a Query

The reactive methods previously returned a `Mono<Long>` with the number of deleted documents, the non reactive versions were void. They now return a `Mono<ByQueryResponse>` which contains much more detailed information about the deleted documents and errors that might have occurred.

==== multiget methods

The implementations of _multiget_ previousl only returned the found entities in a `List<T>` for non-reactive implementations and in a `Flux<T>` for reactive implementations. If the request contained ids that were not found, the information that these are missing was not available. The user needed to compare the returned ids to the requested ones to find
which ones were missing.

Now the `multiget` methods return a `MultiGetItem` for every requested id. This contains information about failures (like non existing indices) and the information if the item existed (then it is contained in the `MultiGetItem) or not.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.elasticsearch.action.get.MultiGetItemResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
Expand Down Expand Up @@ -330,18 +331,12 @@ public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
.next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest)
*/
@Override
public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {

return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers)
.map(MultiGetResponse::getResponses) //
.flatMap(Flux::fromArray) //
.filter(it -> !it.isFailed() && it.getResponse().isExists()) //
.map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
.flatMap(Flux::fromArray); //
}

/*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public <T> T get(String id, Class<T> clazz) {
}

@Override
public <T> List<T> multiGet(Query query, Class<T> clazz) {
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,20 @@ public interface DocumentOperations {
*
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned
* @return list of objects, contains null values for ids that are not found
* @return list of {@link MultiGetItem}s
* @since 4.1
*/
<T> List<T> multiGet(Query query, Class<T> clazz);
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);

/**
* Execute a multiGet against elasticsearch for the given ids.
*
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned
* @param index the index(es) from which the objects are read.
* @return list of objects, contains null values for ids that are not found
* @return list of {@link MultiGetItem}s
*/
<T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);

/**
* Check if an entity with given {@literal id} exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public <T> T get(String id, Class<T> clazz, IndexCoordinates index) {
}

@Override
public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {

Assert.notNull(index, "index must not be null");
Assert.notEmpty(query.getIds(), "No Id defined for Query");
Expand All @@ -177,7 +177,10 @@ public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)
MultiGetResponse result = execute(client -> client.mget(request, RequestOptions.DEFAULT));

DocumentCallback<T> callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
return DocumentAdapters.from(result).stream().map(callback::doWith).collect(Collectors.toList());
return DocumentAdapters.from(result).stream() //
.map(multiGetItem -> MultiGetItem.of( //
multiGetItem.isFailed() ? null : callback.doWith(multiGetItem.getItem()), multiGetItem.getFailure())) //
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
Expand Down Expand Up @@ -190,16 +189,19 @@ public <T> T get(String id, Class<T> clazz, IndexCoordinates index) {
}

@Override
public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {

Assert.notNull(index, "index must not be null");
Assert.notEmpty(query.getIds(), "No Ids defined for Query");

MultiGetRequestBuilder builder = requestFactory.multiGetRequestBuilder(client, query, clazz, index);

DocumentCallback<T> callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
List<Document> documents = DocumentAdapters.from(builder.execute().actionGet());
return documents.stream().map(callback::doWith).collect(Collectors.toList());

return DocumentAdapters.from(builder.execute().actionGet()).stream() //
.map(multiGetItem -> MultiGetItem.of(multiGetItem.isFailed() ? null : callback.doWith(multiGetItem.getItem()),
multiGetItem.getFailure()))
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.lang.Nullable;

/**
* Response object for items returned from multiget requests, encapsulating the returned data and potential error
* information.
*
* @param <T> the entity type
* @author Peter-Josef Meisch
* @since 4.2
*/
public class MultiGetItem<T> {
@Nullable private final T item;
@Nullable private final Failure failure;

private MultiGetItem(@Nullable T item, @Nullable Failure failure) {
this.item = item;
this.failure = failure;
}

public static <T> MultiGetItem<T> of(@Nullable T item, @Nullable Failure failure) {
return new MultiGetItem<>(item, failure);
}

public boolean hasItem() {
return item != null;
}

@Nullable
public T getItem() {
return item;
}

public boolean isFailed() {
return failure != null;
}

@Nullable
public Failure getFailure() {
return failure;
}

public static class Failure {
@Nullable private final String index;
@Nullable private final String type;
@Nullable private final String id;
@Nullable private final Exception exception;

private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception exception) {
this.index = index;
this.type = type;
this.id = id;
this.exception = exception;
}

public static Failure of(String index, String type, String id, Exception exception) {
return new Failure(index, type, id, exception);
}

@Nullable
public String getIndex() {
return index;
}

@Nullable
public String getType() {
return type;
}

@Nullable
public String getId() {
return id;
}

@Nullable
public Exception getException() {
return exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,21 @@ default <T> Flux<T> saveAll(Iterable<T> entities, IndexCoordinates index) {
*
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned, used to determine the index
* @return flux with list of nullable objects
* @return flux with list of {@link MultiGetItem}s that contain the entities
* @since 4.1
*/
<T> Flux<T> multiGet(Query query, Class<T> clazz);
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);

/**
* Execute a multiGet against elasticsearch for the given ids.
*
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned
* @param index the index(es) from which the objects are read.
* @return flux with list of nullable objects
* @return flux with list of {@link MultiGetItem}s that contain the entities
* @since 4.0
*/
<T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index);
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);

/**
* Bulk update all objects. Will do update.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver;
Expand Down Expand Up @@ -298,12 +298,12 @@ private <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedObje
}

@Override
public <T> Flux<T> multiGet(Query query, Class<T> clazz) {
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
}

@Override
public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {

Assert.notNull(index, "Index must not be null");
Assert.notNull(clazz, "Class must not be null");
Expand All @@ -314,7 +314,12 @@ public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)

MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
return Flux.from(execute(client -> client.multiGet(request))) //
.concatMap(result -> callback.toEntity(DocumentAdapters.from(result)));
.map(DocumentAdapters::from) //
.flatMap(multiGetItem -> multiGetItem.isFailed() //
? Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())) //
: callback.toEntity(multiGetItem.getItem())
.map((T item) -> MultiGetItem.of(item, multiGetItem.getFailure())) //
);
}

@Override
Expand Down
Loading