Skip to content

Fix reactive blocking calls. #1825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ protected void initialize(ElasticsearchConverter elasticsearchConverter) {
this.routingResolver = new DefaultRoutingResolver((SimpleElasticsearchMappingContext) mappingContext);

requestFactory = new RequestFactory(elasticsearchConverter);
VersionInfo.logVersions(getClusterVersion());

// initialize the VersionInfo class in the initialization phase
// noinspection ResultOfMethodCallIgnored
VersionInfo.versionProperties();
}

/**
Expand Down Expand Up @@ -166,6 +169,16 @@ public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

/**
* logs the versions of the different Elasticsearch components.
*
* @since 4.3
*/
public void logVersions() {
VersionInfo.logVersions(getClusterVersion());
}

// endregion

// region DocumentOperations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest;
import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest;
import org.springframework.data.elasticsearch.core.index.GetTemplateRequest;
import org.springframework.data.elasticsearch.core.index.MappingBuilder;
import org.springframework.data.elasticsearch.core.index.PutTemplateRequest;
import org.springframework.data.elasticsearch.core.index.ReactiveMappingBuilder;
import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.index.TemplateData;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
Expand Down Expand Up @@ -190,8 +190,7 @@ public Mono<Document> createMapping(Class<?> clazz) {
}
}

String mapping = new MappingBuilder(converter).buildPropertyMapping(clazz);
return Mono.just(Document.parse(mapping));
return new ReactiveMappingBuilder(converter).buildReactivePropertyMapping(clazz).map(Document::parse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, Elastic
this.operations = new EntityOperations(this.mappingContext);
this.requestFactory = new RequestFactory(converter);

logVersions();
// initialize the VersionInfo class in the initialization phase
// noinspection ResultOfMethodCallIgnored
VersionInfo.versionProperties();
}

private ReactiveElasticsearchTemplate copy() {
Expand All @@ -155,11 +157,14 @@ private ReactiveElasticsearchTemplate copy() {
return copy;
}

private void logVersions() {
getClusterVersion() //
.doOnSuccess(VersionInfo::logVersions) //
.doOnError(e -> VersionInfo.logVersions(null)) //
.subscribe();
/**
* logs the versions of the different Elasticsearch components.
*
* @return a Mono signalling finished execution
* @since 4.3
*/
public Mono<Void> logVersions() {
return getClusterVersion().doOnNext(VersionInfo::logVersions).then();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static Mono<String> readFileFromClasspath(String url) {

String line;
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append(line).append('\n');
}

sink.next(sb.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class MappingBuilder {
private static final String DYNAMIC_DATE_FORMATS = "dynamic_date_formats";
private static final String RUNTIME = "runtime";

private final ElasticsearchConverter elasticsearchConverter;
protected final ElasticsearchConverter elasticsearchConverter;

private boolean writeTypeHints = true;

Expand All @@ -113,9 +113,16 @@ public MappingBuilder(ElasticsearchConverter elasticsearchConverter) {
*/
public String buildPropertyMapping(Class<?> clazz) throws MappingException {

ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);

return buildPropertyMapping(entity, getRuntimeFields(entity));
}

protected String buildPropertyMapping(ElasticsearchPersistentEntity<?> entity,
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) {

try {
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);

writeTypeHints = entity.writeTypeHints();

Expand All @@ -124,7 +131,8 @@ public String buildPropertyMapping(Class<?> clazz) throws MappingException {
// Dynamic templates
addDynamicTemplatesMapping(builder, entity);

mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class));
mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class),
runtimeFields);

builder.endObject() // root object
.close();
Expand All @@ -148,7 +156,8 @@ private void writeTypeHintMapping(XContentBuilder builder) throws IOException {

private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity<?> entity,
boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType,
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping) throws IOException {
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping,
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) throws IOException {

if (entity != null && entity.isAnnotationPresent(Mapping.class)) {
Mapping mappingAnnotation = entity.getRequiredAnnotation(Mapping.class);
Expand All @@ -170,8 +179,8 @@ private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersisten
builder.field(DYNAMIC_DATE_FORMATS, mappingAnnotation.dynamicDateFormats());
}

if (StringUtils.hasText(mappingAnnotation.runtimeFieldsPath())) {
addRuntimeFields(builder, mappingAnnotation.runtimeFieldsPath());
if (runtimeFields != null) {
builder.field(RUNTIME, runtimeFields);
}
}

Expand Down Expand Up @@ -227,13 +236,22 @@ private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersisten

}

private void addRuntimeFields(XContentBuilder builder, String runtimeFieldsPath) throws IOException {
@Nullable
private org.springframework.data.elasticsearch.core.document.Document getRuntimeFields(
@Nullable ElasticsearchPersistentEntity<?> entity) {

ClassPathResource runtimeFields = new ClassPathResource(runtimeFieldsPath);
if (entity != null) {
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
if (mappingAnnotation != null) {
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();

if (runtimeFields.exists()) {
builder.rawField(RUNTIME, runtimeFields.getInputStream(), XContentType.JSON);
if (hasText(runtimeFieldsPath)) {
String jsonString = ResourceUtil.readFileFromClasspath(runtimeFieldsPath);
return org.springframework.data.elasticsearch.core.document.Document.parse(jsonString);
}
}
}
return null;
}

private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
Expand Down Expand Up @@ -291,7 +309,7 @@ private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
: null;

mapEntity(builder, persistentEntity, false, property.getFieldName(), true, fieldAnnotation.type(),
fieldAnnotation, dynamicMapping);
fieldAnnotation, dynamicMapping, null);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.index;

import static org.springframework.util.StringUtils.*;

import reactor.core.publisher.Mono;

import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.core.ReactiveResourceUtil;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.mapping.MappingException;
import org.springframework.lang.Nullable;

/**
* Subclass of {@link MappingBuilder} with specialized methods TO inhibit blocking CALLS
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ReactiveMappingBuilder extends MappingBuilder {

public ReactiveMappingBuilder(ElasticsearchConverter elasticsearchConverter) {
super(elasticsearchConverter);
}

@Override
public String buildPropertyMapping(Class<?> clazz) throws MappingException {
throw new UnsupportedOperationException(
"Use ReactiveMappingBuilder.buildReactivePropertyMapping() instead of buildPropertyMapping()");
}

public Mono<String> buildReactivePropertyMapping(Class<?> clazz) throws MappingException {
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);

return getRuntimeFields(entity) //
.switchIfEmpty(Mono.just(Document.create())) //
.map(document -> {
if (document.isEmpty()) {
return buildPropertyMapping(entity, null);
} else {
return buildPropertyMapping(entity, document);
}
});
}

private Mono<Document> getRuntimeFields(@Nullable ElasticsearchPersistentEntity<?> entity) {

if (entity != null) {
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
if (mappingAnnotation != null) {
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();

if (hasText(runtimeFieldsPath)) {
return ReactiveResourceUtil.readFileFromClasspath(runtimeFieldsPath).map(Document::parse);
}
}
}

return Mono.empty();
}
}
Loading