diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 774badfb4..437bace65 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -177,7 +177,7 @@ publishing { dependencies { // Compile and test with the last 7.x version to make sure transition scenarios where // the Java API client coexists with a 7.x HLRC work fine - val elasticsearchVersion = "7.17.7" + val elasticsearchVersion = "7.17.18" val jacksonVersion = "2.17.0" // Apache 2.0 diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_types/ElasticsearchException.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_types/ElasticsearchException.java index 1aacc1749..bad5ca106 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_types/ElasticsearchException.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_types/ElasticsearchException.java @@ -19,6 +19,8 @@ package co.elastic.clients.elasticsearch._types; +import co.elastic.clients.transport.http.TransportHttpClient; + import javax.annotation.Nullable; /** @@ -33,11 +35,19 @@ public class ElasticsearchException extends RuntimeException { private final ErrorResponse response; private final String endpointId; + @Nullable + private final TransportHttpClient.Response httpResponse; - public ElasticsearchException(String endpointId, ErrorResponse response) { + public ElasticsearchException(String endpointId, ErrorResponse response, + @Nullable TransportHttpClient.Response httpResponse) { super("[" + endpointId + "] failed: [" + response.error().type() + "] " + response.error().reason()); this.response = response; this.endpointId = endpointId; + this.httpResponse = httpResponse; + } + + public ElasticsearchException(String endpointId, ErrorResponse response) { + this(endpointId, response, null); } /** @@ -68,4 +78,12 @@ public ErrorCause error() { public int status() { return this.response.status(); } + + /** + * The underlying http response, if available. + */ + @Nullable + public TransportHttpClient.Response httpResponse() { + return this.httpResponse; + } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java new file mode 100644 index 000000000..f199705c1 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import co.elastic.clients.transport.http.HeaderMap; +import co.elastic.clients.util.ObjectBuilderBase; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Default implementation of {@link TransportOptions}. Extensions can use it as a base class to provide additional features. + */ +public class DefaultTransportOptions implements TransportOptions { + private final HeaderMap headers; + private final Map parameters; + private final Function, Boolean> onWarnings; + + public static final DefaultTransportOptions EMPTY = new DefaultTransportOptions(); + + public DefaultTransportOptions() { + this(new HeaderMap(), Collections.emptyMap(), null); + } + + public DefaultTransportOptions( + @Nullable HeaderMap headers, + @Nullable Map parameters, + @Nullable Function, Boolean> onWarnings + ) { + this.headers = headers == null ? HeaderMap.EMPTY : headers; + this.parameters = (parameters == null || parameters.isEmpty()) ? + Collections.emptyMap() : Collections.unmodifiableMap(parameters); + this.onWarnings = onWarnings; + } + + protected DefaultTransportOptions(AbstractBuilder builder) { + this(builder.headers, builder.parameters, builder.onWarnings); + } + + public static DefaultTransportOptions of(@Nullable TransportOptions options) { + if (options == null) { + return new DefaultTransportOptions(null, null, null); + } + if (options instanceof DefaultTransportOptions) { + return (DefaultTransportOptions) options; + } + return new DefaultTransportOptions( + new HeaderMap(entriesToMap(options.headers())), + options.queryParameters(), + options.onWarnings() + ); + } + + @Override + public Collection> headers() { + return Collections.unmodifiableSet(headers.entrySet()); + } + + @Override + public Map queryParameters() { + return parameters; + } + + @Override + public Function, Boolean> onWarnings() { + return onWarnings; + } + + @Override + public Builder toBuilder() { + return new Builder(this); + } + + private static Map entriesToMap(Collection> entries) { + if (entries.isEmpty()) { + return Collections.emptyMap(); + } else { + HashMap map = new HashMap<>(); + for (Map.Entry entry: entries) { + map.put(entry.getKey(), entry.getValue()); + } + return map; + } + } + + public abstract static class AbstractBuilder> + extends ObjectBuilderBase implements TransportOptions.Builder { + + private HeaderMap headers; + private Map parameters; + private Function, Boolean> onWarnings; + + public AbstractBuilder() { + } + + public AbstractBuilder(DefaultTransportOptions options) { + this.headers = new HeaderMap(options.headers); + this.parameters = copyOrNull(options.parameters); + this.onWarnings = options.onWarnings; + } + + protected abstract BuilderT self(); + + @Override + public BuilderT addHeader(String name, String value) { + if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { + // Not overridable + return self(); + } + if (this.headers == null) { + this.headers = new HeaderMap(); + } + headers.add(name, value); + return self(); + } + + @Override + public BuilderT setHeader(String name, String value) { + if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { + // Not overridable + return self(); + } + if (this.headers == null) { + this.headers = new HeaderMap(); + } + headers.put(name, value); + return self(); + } + + @Override + public BuilderT removeHeader(String name) { + if (this.headers != null) { + headers.remove(name); + } + return self(); + } + + @Override + public BuilderT setParameter(String name, String value) { + if (parameters == null) { + parameters = new HashMap<>(); + } + parameters.put(name, value); + return self(); + } + + @Override + public BuilderT removeParameter(String name) { + if (parameters != null) { + parameters.remove(name); + }; + return self(); + } + + @Override + public BuilderT onWarnings(Function, Boolean> listener) { + this.onWarnings = listener; + return self(); + } + + private Map copyOrNull(Map map) { + return map.isEmpty() ? null : new HashMap<>(map); + } + } + + public static class Builder extends AbstractBuilder { + + public Builder() { + super(); + } + + public Builder(DefaultTransportOptions options) { + super(options); + } + + @Override + protected Builder self() { + return this; + } + + @Override + public TransportOptions build() { + _checkSingleUse(); + return new DefaultTransportOptions(this); + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java new file mode 100644 index 000000000..8318ec7f5 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java @@ -0,0 +1,513 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import co.elastic.clients.elasticsearch._types.ElasticsearchException; +import co.elastic.clients.elasticsearch._types.ErrorResponse; +import co.elastic.clients.json.JsonpDeserializer; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.NdJsonpSerializable; +import co.elastic.clients.transport.endpoints.BinaryDataResponse; +import co.elastic.clients.transport.endpoints.BinaryEndpoint; +import co.elastic.clients.transport.endpoints.BooleanEndpoint; +import co.elastic.clients.transport.endpoints.BooleanResponse; +import co.elastic.clients.transport.http.HeaderMap; +import co.elastic.clients.transport.http.TransportHttpClient; +import co.elastic.clients.transport.instrumentation.Instrumentation; +import co.elastic.clients.transport.instrumentation.NoopInstrumentation; +import co.elastic.clients.util.LanguageRuntimeVersions; +import co.elastic.clients.util.ApiTypeHelper; +import co.elastic.clients.util.BinaryData; +import co.elastic.clients.util.ByteArrayBinaryData; +import co.elastic.clients.util.ContentType; +import co.elastic.clients.util.MissingRequiredPropertyException; +import co.elastic.clients.util.NoCopyByteArrayOutputStream; +import jakarta.json.JsonException; +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public abstract class ElasticsearchTransportBase implements ElasticsearchTransport { + + private static final String USER_AGENT_VALUE = getUserAgent(); + private static final String CLIENT_META_VALUE = getClientMeta(); + private static final String ELASTIC_API_VERSION; + public static final String JSON_CONTENT_TYPE; + + static { + if (VersionInfo.FLAVOR.equals("serverless")) { + JSON_CONTENT_TYPE = ContentType.APPLICATION_JSON; + ELASTIC_API_VERSION = "2023-10-31"; + } + else if (Version.VERSION == null) { + JSON_CONTENT_TYPE = ContentType.APPLICATION_JSON; + ELASTIC_API_VERSION = null; + } else { + JSON_CONTENT_TYPE = + "application/vnd.elasticsearch+json; compatible-with=" + + Version.VERSION.major(); + ELASTIC_API_VERSION = null; + } + } + + private final TransportHttpClient httpClient; + private final Instrumentation instrumentation; + + @Override + public void close() throws IOException { + httpClient.close(); + } + + private final JsonpMapper mapper; + protected final TransportOptions transportOptions; + + public ElasticsearchTransportBase(TransportHttpClient httpClient, TransportOptions options, JsonpMapper jsonpMapper) { + this(httpClient, options, jsonpMapper, null); + } + + public ElasticsearchTransportBase( + TransportHttpClient httpClient, + TransportOptions options, + JsonpMapper jsonpMapper, + @Nullable Instrumentation instrumentation + ) { + this.mapper = jsonpMapper; + this.httpClient = httpClient; + this.transportOptions = httpClient.createOptions(options); + + // If no instrumentation is provided, fallback to noop + if (instrumentation == null) { + instrumentation = NoopInstrumentation.INSTANCE; + } + this.instrumentation = instrumentation; + } + + @Override + public final JsonpMapper jsonpMapper() { + return mapper; + } + + @Override + public final TransportOptions options() { + return transportOptions; + } + + @Override + public final ResponseT performRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) throws IOException { + try (Instrumentation.Context ctx = instrumentation.newContext(request, endpoint)) { + try (Instrumentation.ThreadScope ts = ctx.makeCurrent()) { + + TransportOptions opts = options == null ? transportOptions : options; + TransportHttpClient.Request req = prepareTransportRequest(request, endpoint); + ctx.beforeSendingHttpRequest(req, options); + + TransportHttpClient.Response resp = httpClient.performRequest(endpoint.id(), null, req, opts); + ctx.afterReceivingHttpResponse(resp); + + ResponseT apiResponse = getApiResponse(resp, endpoint); + ctx.afterDecodingApiResponse(apiResponse); + + return apiResponse; + } catch (Throwable throwable){ + ctx.recordException(throwable); + throw throwable; + } + } + } + + @Override + public final CompletableFuture performRequestAsync( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) { + Instrumentation.Context ctx = instrumentation.newContext(request, endpoint); + + TransportOptions opts = options == null ? transportOptions : options; + TransportHttpClient.Request clientReq; + try (Instrumentation.ThreadScope ss = ctx.makeCurrent()) { + clientReq = prepareTransportRequest(request, endpoint); + ctx.beforeSendingHttpRequest(clientReq, options); + } catch (Exception e) { + // Terminate early + ctx.recordException(e); + ctx.close(); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + // Propagate required property checks to the thread that will decode the response + boolean disableRequiredChecks = ApiTypeHelper.requiredPropertiesCheckDisabled(); + + CompletableFuture clientFuture = httpClient.performRequestAsync( + endpoint.id(), null, clientReq, opts + ); + + // Cancelling the result will cancel the upstream future created by the http client, allowing to stop in-flight requests + CompletableFuture future = new CompletableFuture() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled) { + clientFuture.cancel(mayInterruptIfRunning); + } + return cancelled; + } + }; + + clientFuture.handle((clientResp, thr) -> { + try (Instrumentation.ThreadScope ts = ctx.makeCurrent()) { + if (thr != null) { + // Exception executing the http request + ctx.recordException(thr); + ctx.close(); + future.completeExceptionally(thr); + + } else { + try (ApiTypeHelper.DisabledChecksHandle h = + ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) { + ctx.afterReceivingHttpResponse(clientResp); + ResponseT response = getApiResponse(clientResp, endpoint); + ctx.afterDecodingApiResponse(response); + future.complete(response); + + } catch (Throwable e) { + ctx.recordException(e); + future.completeExceptionally(e); + } finally { + ctx.close(); + } + } + } + return null; + }); + + return future; + } + + private TransportHttpClient.Request prepareTransportRequest( + RequestT request, + Endpoint endpoint + ) throws IOException { + String method = endpoint.method(request); + String path = endpoint.requestUrl(request); + Map params = endpoint.queryParameters(request); + + List bodyBuffers = null; + HeaderMap headers = DefaultHeaders; + + Object body = endpoint.body(request); + if (body != null) { + // Request has a body + if (body instanceof NdJsonpSerializable) { + bodyBuffers = new ArrayList<>(); + collectNdJsonLines(bodyBuffers, (NdJsonpSerializable) request); + headers = JsonContentTypeHeaders; + + } else if (body instanceof BinaryData) { + BinaryData data = (BinaryData)body; + + // ES expects the Accept and Content-Type headers to be consistent. + String dataContentType = data.contentType(); + if (ContentType.APPLICATION_JSON.equals(dataContentType)) { + // Fast path + headers = JsonContentTypeHeaders; + } else { + headers = new HeaderMap(DefaultHeaders); + headers.put(HeaderMap.CONTENT_TYPE, dataContentType); + } + bodyBuffers = Collections.singletonList(data.asByteBuffer()); + + } else { + NoCopyByteArrayOutputStream baos = new NoCopyByteArrayOutputStream(); + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(body, generator); + generator.close(); + bodyBuffers = Collections.singletonList(baos.asByteBuffer()); + headers = JsonContentTypeHeaders; + } + } + + return new TransportHttpClient.Request(method, path, params, headers, bodyBuffers); + } + + private static final HeaderMap JsonContentTypeHeaders = new HeaderMap(); + private static final HeaderMap DefaultHeaders = new HeaderMap(); + static { + addStandardHeaders(DefaultHeaders); + addStandardHeaders(JsonContentTypeHeaders); + JsonContentTypeHeaders.put(HeaderMap.CONTENT_TYPE, JSON_CONTENT_TYPE); + } + + private static final ByteBuffer NdJsonSeparator = ByteBuffer.wrap("\n".getBytes(StandardCharsets.UTF_8)); + + private void collectNdJsonLines(List lines, NdJsonpSerializable value) throws IOException { + Iterator values = value._serializables(); + while(values.hasNext()) { + Object item = values.next(); + if (item == null) { + // Skip + } else if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself + collectNdJsonLines(lines, (NdJsonpSerializable)item); + } else { + // TODO: items that aren't already BinaryData could be serialized to ByteBuffers lazily + // to reduce the number of buffers to keep in memory + lines.add(BinaryData.of(item, this.mapper).asByteBuffer()); + lines.add(NdJsonSeparator); + } + } + } + + private ResponseT getApiResponse( + TransportHttpClient.Response clientResp, + Endpoint endpoint + ) throws IOException { + + int statusCode = clientResp.statusCode(); + + try { + if (statusCode == 200) { + checkProductHeader(clientResp, endpoint); + } + + if (endpoint.isError(statusCode)) { + + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + clientResp, + "Request failed with status code '" + statusCode + "'", + endpoint.id() + ); + } + + BinaryData entity = clientResp.body(); + if (entity == null) { + throw new TransportException( + clientResp, + "Expecting a response body, but none was sent", + endpoint.id() + ); + } + + checkJsonContentType(entity.contentType(), clientResp, endpoint); + + // We may have to replay it. + if (!entity.isRepeatable()) { + entity = new ByteArrayBinaryData(entity); + } + + try (InputStream content = entity.asInputStream()) { + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + // TODO: have the endpoint provide the exception constructor + throw new ElasticsearchException(endpoint.id(), (ErrorResponse) error, clientResp); + } + } catch(JsonException | MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeTransportResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch(Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException( + clientResp, + "Failed to decode error response, check exception cause for additional details", + endpoint.id(), + errorEx + ); + } + } + } else { + return decodeTransportResponse(statusCode, clientResp.body(), clientResp, endpoint); + } + + + } finally { + // Consume the entity unless this is a successful binary endpoint, where the user must consume the entity + if (!(endpoint instanceof BinaryEndpoint && !endpoint.isError(statusCode))) { + clientResp.close(); + } + } + } + + private ResponseT decodeTransportResponse( + int statusCode, @Nullable BinaryData entity, TransportHttpClient.Response clientResp, Endpoint endpoint + ) throws IOException { + + if (endpoint instanceof JsonEndpoint) { + @SuppressWarnings("unchecked") + JsonEndpoint jsonEndpoint = (JsonEndpoint) endpoint; + // Successful response + ResponseT response = null; + JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); + if (responseParser != null) { + // Expecting a body + if (entity == null) { + throw new TransportException( + clientResp, + "Expecting a response body, but none was sent", + endpoint.id() + ); + } + checkJsonContentType(entity.contentType(), clientResp, endpoint); + try ( + InputStream content = entity.asInputStream(); + JsonParser parser = mapper.jsonProvider().createParser(content) + ) { + response = responseParser.deserialize(parser, mapper); + } catch (Exception e) { + throw new TransportException( + clientResp, + "Failed to decode response", + endpoint.id(), + e + ); + } + } + return response; + + } else if(endpoint instanceof BooleanEndpoint) { + BooleanEndpoint bep = (BooleanEndpoint) endpoint; + + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); + return response; + + + } else if (endpoint instanceof BinaryEndpoint) { + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BinaryDataResponse(entity); + return response; + + } else { + throw new TransportException( + clientResp, + "Unhandled endpoint type: '" + endpoint.getClass().getName() + "'", endpoint.id() + ); + } + } + + // Endpoints that (incorrectly) do not return the Elastic product header + private static final Set endpointsMissingProductHeader = new HashSet<>(Arrays.asList( + "es/snapshot.create" // #74 / elastic/elasticsearch#82358 + )); + + private void checkProductHeader(TransportHttpClient.Response clientResp, Endpoint endpoint) throws IOException { + String header = clientResp.header("X-Elastic-Product"); + if (header == null) { + if (endpointsMissingProductHeader.contains(endpoint.id())) { + return; + } + throw new TransportException( + clientResp, + "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " + + "instance, and that any networking filters are preserving that header.", + endpoint.id() + ); + } + + if (!"Elasticsearch".equals(header)) { + throw new TransportException( + clientResp, + "Invalid value '" + header + "' for 'X-Elastic-Product' header.", + endpoint.id() + ); + } + } + + private void checkJsonContentType( + String contentType, TransportHttpClient.Response clientResp, Endpoint endpoint + ) throws IOException { + if (contentType == null) { + throw new TransportException(clientResp, "Response has no content-type", endpoint.id()); + } + + if (contentType.startsWith("application/json") || contentType.startsWith("application/vnd.elasticsearch+json")) { + return; + } + + throw new TransportException(clientResp, "Expecting JSON data but response content-type is: " + contentType, endpoint.id()); + } + + private static void addStandardHeaders(HeaderMap headers) { + headers.put(HeaderMap.USER_AGENT, USER_AGENT_VALUE); + headers.put(HeaderMap.CLIENT_META, CLIENT_META_VALUE); + headers.put(HeaderMap.ACCEPT, JSON_CONTENT_TYPE); + if (ELASTIC_API_VERSION != null) { + headers.put("Elastic-Api-Version", ELASTIC_API_VERSION); + } + } + + private static String getUserAgent() { + return String.format( + Locale.ROOT, + "elastic-java/%s (Java/%s)", + Version.VERSION == null ? "Unknown" : Version.VERSION.toString(), + System.getProperty("java.version") + ); + } + + // visible for testing + static String getClientMeta() { + String flavorKey; + String transportVersion; + + if (VersionInfo.FLAVOR.equals("serverless")) { + flavorKey = "esv="; + int pos = VersionInfo.VERSION.indexOf('+'); + // Strip API version from the transport version + transportVersion = pos > 0 ? VersionInfo.VERSION.substring(0, pos) : VersionInfo.VERSION; + } else { + flavorKey = "es="; + transportVersion = VersionInfo.VERSION; + } + + // service, language, transport, followed by additional information + return flavorKey + + VersionInfo.VERSION + + ",jv=" + + System.getProperty("java.specification.version") + + ",t=" + + transportVersion + + ",hl=2" + + LanguageRuntimeVersions.getRuntimeMetadata(); + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java index 2ef0a0989..43435278f 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java @@ -19,12 +19,15 @@ package co.elastic.clients.transport; +import co.elastic.clients.ApiClient; import co.elastic.clients.json.JsonpDeserializer; import co.elastic.clients.transport.endpoints.BinaryEndpoint; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * An endpoint links requests and responses to HTTP protocol encoding. It also defines the error response @@ -100,14 +103,30 @@ default Map headers(RequestT request) { JsonpDeserializer errorDeserializer(int statusCode); default BinaryEndpoint withBinaryResponse() { - return new BinaryEndpoint<>( - this.id(), - this::method, - this::requestUrl, - this::queryParameters, - this::headers, - this::body, - null - ); + return new BinaryEndpoint<>( + this.id(), + this::method, + this::requestUrl, + this::queryParameters, + this::headers, + this::body, + null + ); + } + + default ResponseT call(RequestT request, Transport transport) throws IOException { + return transport.performRequest(request, this, null); + } + + default ResponseT call(RequestT request, ApiClient client) throws IOException { + return client._transport().performRequest(request, this, null); + } + + default CompletableFuture callAsync(RequestT request, Transport transport) throws IOException { + return transport.performRequestAsync(request, this, null); + } + + default CompletableFuture callAsync(RequestT request, ApiClient client) throws IOException { + return client._transport().performRequestAsync(request, this, null); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/ResponseBase.java b/java-client/src/main/java/co/elastic/clients/transport/ResponseBase.java new file mode 100644 index 000000000..58fcfec86 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/ResponseBase.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +public abstract class ResponseBase { + + public abstract TransportInfo _transportInfo(); + + public abstract void _transportInfo(TransportInfo info); + +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportException.java b/java-client/src/main/java/co/elastic/clients/transport/TransportException.java index b1fbaa9e4..d7d65128a 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/TransportException.java +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportException.java @@ -19,29 +19,42 @@ package co.elastic.clients.transport; +import co.elastic.clients.transport.http.TransportHttpClient; + import javax.annotation.Nullable; import java.io.IOException; public class TransportException extends IOException { - private final int statusCode; private final String endpointId; + private final TransportHttpClient.Response response; - public TransportException(int statusCode, String message, String endpointId) { - this(statusCode, message, endpointId, null); + public TransportException(TransportHttpClient.Response response, String message, String endpointId) { + this(response, message, endpointId, null); } - public TransportException(int statusCode, String message, String endpointId, Throwable cause) { - super("status: " + statusCode + ", " + (endpointId == null ? message : "[" + endpointId + "] " + message), cause); - this.statusCode = statusCode; + public TransportException(TransportHttpClient.Response response, String message, String endpointId, Throwable cause) { + super( + "node: " + response.node() + ", status: " + response.statusCode() + ", " + + (endpointId == null ? message : "[" + endpointId + "] " + message), + cause + ); + this.response = response; this.endpointId = endpointId; + + // Make sure the response is closed to free up resources. + try { + response.close(); + } catch (Exception e) { + this.addSuppressed(e); + } } /** * Status code returned by the http resquest */ public int statusCode() { - return statusCode; + return response.statusCode(); } /** @@ -52,4 +65,7 @@ public String endpointId() { return endpointId; } + public TransportHttpClient.Response response() { + return response; + } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportInfo.java b/java-client/src/main/java/co/elastic/clients/transport/TransportInfo.java new file mode 100644 index 000000000..5ee80d769 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import java.util.List; + +public interface TransportInfo { + + String requestUrl(); + byte[] requestBody(); + String requestBodyText(); + // request headers + + int responseStatusCode(); + byte[] responseBody(); + String responseBodyText(); + // response headers + + List warnings(); +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java index d9a6ddc88..d6c41f490 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java @@ -50,8 +50,14 @@ interface Builder extends ObjectBuilder { Builder addHeader(String name, String value); + Builder setHeader(String name, String value); + + Builder removeHeader(String name); + Builder setParameter(String name, String value); + Builder removeParameter(String name); + Builder onWarnings(Function, Boolean> listener); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportUtils.java b/java-client/src/main/java/co/elastic/clients/transport/TransportUtils.java new file mode 100644 index 000000000..6fd088f59 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportUtils.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Arrays; + +public class TransportUtils { + + /** + * Creates an SSLContext from the self-signed http_ca.crt certificate created by Elasticsearch during + * its first start. + * + * @see Elasticsearch + * documentation + */ + public static SSLContext sslContextFromHttpCaCrt(File file) throws IOException { + try(InputStream in = new FileInputStream(file)) { + return sslContextFromHttpCaCrt(in); + } + } + + /** + * Creates an SSLContext from the self-signed http_ca.crt certificate created by Elasticsearch during + * its first start. + * + * @see Elasticsearch + * documentation + */ + public static SSLContext sslContextFromHttpCaCrt(InputStream in) { + try { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Certificate certificate = cf.generateCertificate(in); + + final KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + keyStore.load(null, null); + keyStore.setCertificateEntry("elasticsearch-ca", certificate); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(keyStore); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + return sslContext; + + } catch (CertificateException | NoSuchAlgorithmException | KeyManagementException | + KeyStoreException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates an SSLContext from the SHA-256 fingerprint of self-signed http_ca.crt certificate output by + * Elasticsearch at startup time. + * + * @param fingerPrint the SHA-256 fingerprint. Can be uppercase or lowercase, with or without colons separating bytes + * @see Elasticsearch + * documentation + */ + public static SSLContext sslContextFromCaFingerprint(String fingerPrint) { + + fingerPrint = fingerPrint.replace(":", ""); + int len = fingerPrint.length(); + byte[] fpBytes = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + fpBytes[i / 2] = (byte) ( + (Character.digit(fingerPrint.charAt(i), 16) << 4) + + Character.digit(fingerPrint.charAt(i+1), 16) + ); + } + + try { + X509TrustManager tm = new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + throw new CertificateException("This is a client-side only trust manager"); + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + + // The CA root is the last element of the chain + X509Certificate anchor = chain[chain.length - 1]; + + byte[] bytes; + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + md.update(anchor.getEncoded()); + bytes = md.digest(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + + if (Arrays.equals(fpBytes, bytes)) { + return; + } + + throw new CertificateException("Untrusted certificate: " + anchor.getSubjectX500Principal()); + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, new X509TrustManager[] { tm }, null); + return sslContext; + + } catch (NoSuchAlgorithmException | KeyManagementException e) { + // Exceptions that should normally not occur + throw new RuntimeException(e); + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/Version.java b/java-client/src/main/java/co/elastic/clients/transport/Version.java index 18caab31b..142e608eb 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/Version.java +++ b/java-client/src/main/java/co/elastic/clients/transport/Version.java @@ -19,12 +19,8 @@ package co.elastic.clients.transport; -import co.elastic.clients.ApiClient; - import javax.annotation.Nullable; -import java.io.InputStream; import java.util.Objects; -import java.util.Properties; /** * This class represents a SemVer version, with an optional patch revision. @@ -34,7 +30,8 @@ public class Version { private final int major; private final int minor; private final int maintenance; - private final boolean isPreRelease; + private final String prerelease; + private final String build; /** * Parse a version string formatted using the standard Maven version format. @@ -42,14 +39,27 @@ public class Version { * @return the version, or {@code null} if the version could not be parsed. */ public static Version parse(String version) { + String prerelease = null; + String build = null; + int hyphen = version.indexOf('-'); - boolean isPreRelease; if (hyphen >= 0) { + // Has prerelease. May be followed buy build information + prerelease = version.substring(hyphen+1); version = version.substring(0, hyphen); - isPreRelease = true; + + int plus = prerelease.indexOf('+'); + if (plus >= 0) { + build = prerelease.substring(0, plus+1); + prerelease = prerelease.substring(0, plus); + } } - else { - isPreRelease = false; + + int plus = version.indexOf('+'); + if (plus >= 0) { + // Has build information + build = version.substring(0, plus+1); + version = version.substring(0, plus); } String[] bits = version.split("\\."); @@ -57,7 +67,7 @@ public static Version parse(String version) { int major = (bits.length >= 1) ? Integer.parseInt(bits[0]) : 0; int minor = (bits.length >= 2) ? Integer.parseInt(bits[1]) : 0; int maintenance = (bits.length >= 3) ? Integer.parseInt(bits[2]) : -1; - return new Version(major, minor, maintenance, isPreRelease); + return new Version(major, minor, maintenance, prerelease, build); } catch(NumberFormatException ex) { return null; @@ -65,10 +75,15 @@ public static Version parse(String version) { } public Version(int major, int minor, int maintenance, boolean isPreRelease) { + this(major, minor, maintenance, isPreRelease ? "p" : null, null); + } + + public Version(int major, int minor, int maintenance, @Nullable String prerelease, @Nullable String build) { this.major = major; this.minor = minor; this.maintenance = maintenance; - this.isPreRelease = isPreRelease; + this.prerelease = prerelease; + this.build = build; } public int major() { @@ -84,7 +99,7 @@ public int maintenance() { } public boolean isPreRelease() { - return isPreRelease; + return prerelease != null; } @Override @@ -95,12 +110,13 @@ public boolean equals(Object other) { return (major == that.major && minor == that.minor && maintenance == that.maintenance && - isPreRelease == that.isPreRelease); + Objects.equals(prerelease, that.prerelease) && + Objects.equals(build, that.build)); } @Override public int hashCode() { - return Objects.hash(major, minor, maintenance, isPreRelease); + return Objects.hash(major, minor, maintenance, prerelease, build); } @Override @@ -113,32 +129,27 @@ public String toString() { s.append('.'); s.append(maintenance); } - if (isPreRelease) { - s.append('p'); + if (prerelease != null) { + s.append('-').append(prerelease); + } + if (build != null) { + s.append('+').append(build); } return s.toString(); } /** - * This library's version, read from the classpath. Can be {@code null} if the version resource could not be read. + * This library's version. Can be {@code null} if the version could not be determined. */ @Nullable public static final Version VERSION; static { Version version = null; - InputStream in = ApiClient.class.getResourceAsStream("version.properties"); - if (in != null) { - Properties properties = new Properties(); - try { - properties.load(in); - String versionStr = properties.getProperty("version"); - if (versionStr != null) { - version = Version.parse(versionStr); - } - } catch (Exception e) { - // Failed to read version.properties file - } + try { + version = Version.parse(VersionInfo.VERSION); + } catch (Exception e) { + // Failed to parse version } VERSION = version; } diff --git a/java-client/src/main/java/co/elastic/clients/transport/VersionInfo.java b/java-client/src/main/java/co/elastic/clients/transport/VersionInfo.java new file mode 100644 index 000000000..66babaf40 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/VersionInfo.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +// Package private +class VersionInfo { + static final String FLAVOR = "stack"; + static final String VERSION = "7.17.20"; +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/WithUriParameter.java b/java-client/src/main/java/co/elastic/clients/transport/WithUriParameter.java new file mode 100644 index 000000000..047271f57 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/WithUriParameter.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +/** + * Base interface for request builders that can load properties found in URI paths segments and query parameters. + */ +public interface WithUriParameter { + + /** + * Set a URI path segment or query parameter property on this object. + * + * @param name the property name + * @param value the property value + * @return {@code true} if the property was found, {@code false} if the property is unknown + * @throws RuntimeException if the value cannot be parsed to a valid property value + */ + boolean withUriParameter(String name, String value); +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/HttpClientBinaryResponse.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryDataResponse.java similarity index 55% rename from java-client/src/main/java/co/elastic/clients/transport/rest_client/HttpClientBinaryResponse.java rename to java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryDataResponse.java index 27e4dad15..f75662a25 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/HttpClientBinaryResponse.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryDataResponse.java @@ -17,48 +17,42 @@ * under the License. */ -package co.elastic.clients.transport.rest_client; +package co.elastic.clients.transport.endpoints; -import co.elastic.clients.transport.endpoints.BinaryResponse; -import org.apache.http.Header; -import org.apache.http.HttpEntity; -import org.apache.http.util.EntityUtils; +import co.elastic.clients.util.BinaryData; +import co.elastic.clients.util.ByteArrayBinaryData; import java.io.IOException; import java.io.InputStream; -class HttpClientBinaryResponse implements BinaryResponse { - private final HttpEntity entity; - private boolean consumed = false; +public class BinaryDataResponse implements BinaryResponse { - HttpClientBinaryResponse(HttpEntity entity) { - this.entity = entity; + private final BinaryData data; + + public BinaryDataResponse(BinaryData data) { + this.data = data; } @Override public String contentType() { - Header h = entity.getContentType(); - return h == null ? "application/octet-stream" : h.getValue(); + return data.contentType(); } @Override public long contentLength() { - long len = entity.getContentLength(); - return len < 0 ? -1 : entity.getContentLength(); + return data.size(); } @Override public InputStream content() throws IOException { - if (consumed) { - throw new IllegalStateException("Response content has already been consumed"); - } - consumed = true; - return entity.getContent(); + return data.asInputStream(); } @Override public void close() throws IOException { - consumed = true; - EntityUtils.consume(entity); + } + + public static BinaryDataResponse of(byte[] data, String contentType) { + return new BinaryDataResponse(new ByteArrayBinaryData(data, contentType)); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java index e845e0a35..27c65a99e 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java @@ -19,15 +19,21 @@ package co.elastic.clients.transport.endpoints; +import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch._types.ErrorResponse; import co.elastic.clients.json.JsonpDeserializer; +import co.elastic.clients.json.JsonpDeserializerBase; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.transport.Endpoint; +import jakarta.json.stream.JsonParser; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.BitSet; import java.util.Collections; +import java.util.EnumSet; import java.util.Map; import java.util.function.Function; @@ -125,7 +131,37 @@ public boolean isError(int statusCode) { @Override public JsonpDeserializer errorDeserializer(int statusCode) { - return ErrorResponse._DESERIALIZER; + // Some errors (typically 404) only consist of a single "error" string (which is a shortcut for ErrorCause.reason) + // and no "status". So we need a deserializer that will set the status value if it's not in the payload + return new JsonpDeserializerBase(EnumSet.of(JsonParser.Event.START_OBJECT)) { + @Override + public ErrorResponse deserialize(JsonParser parser, JsonpMapper mapper, JsonParser.Event event) { + ErrorResponse.Builder builder = new ErrorResponse.Builder(); + builder.status(statusCode); + while ((event = parser.next()) != JsonParser.Event.END_OBJECT) { + JsonpUtils.expectEvent(parser, JsonParser.Event.KEY_NAME, event); + switch (parser.getString()) { + case "error": + switch (event = parser.next()) { + case VALUE_STRING: + builder.error(e -> e.reason(parser.getString()).type("http_status_" + statusCode)); + break; + default: + JsonpUtils.expectEvent(parser, JsonParser.Event.START_OBJECT, event); + builder.error(ErrorCause._DESERIALIZER.deserialize(parser, mapper, event)); + break; + } + break; + case "status": + JsonpUtils.expectNextEvent(parser, JsonParser.Event.VALUE_NUMBER); + builder.status(parser.getInt()); + break; + } + } + + return builder.build(); + } + }; } public SimpleEndpoint withResponseDeserializer( diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java index b5df37c90..1dfd6b4e4 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java @@ -69,11 +69,6 @@ public JsonpDeserializer responseDeserializer() { return this.responseParser; } - @Override - public JsonpDeserializer errorDeserializer(int statusCode) { - return ErrorResponse._DESERIALIZER; - } - public SimpleEndpoint withResponseDeserializer( JsonpDeserializer newResponseParser ) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java index c97a03675..b32aae4a8 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java @@ -34,7 +34,7 @@ public SimpleJsonEndpoint( Function method, Function requestUrl, Function> queryParameters, + Map> queryParameters, Function> headers, boolean hasRequestBody, JsonpDeserializer responseParser diff --git a/java-client/src/main/java/co/elastic/clients/transport/http/HeaderMap.java b/java-client/src/main/java/co/elastic/clients/transport/http/HeaderMap.java new file mode 100644 index 000000000..e7889eb68 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/http/HeaderMap.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.http; + +import javax.annotation.Nullable; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * A (string, string) map with case-insensitive keys. + */ +public class HeaderMap extends AbstractMap { + + public static final String ACCEPT = "Accept"; + public static final String CONTENT_TYPE = "Content-Type"; + public static final String USER_AGENT = "User-Agent"; + public static final String CLIENT_META = "X-Elastic-Client-Meta"; + + @Nullable + protected Map map; + + public static HeaderMap EMPTY = new HeaderMap(null).locked(); + + public HeaderMap() { + this.map = null; + } + + /** + * Copy constructor + */ + public HeaderMap(@Nullable Map map) { + if (map == null || map.isEmpty()) { + this.map = null; + } else if (map instanceof HeaderMap) { + Map hmap = ((HeaderMap) map).map; + this.map = hmap == null ? null : new HashMap<>(hmap); + } else { + this.map = new HashMap<>(map); + } + } + + @Override + public int size() { + return map == null ? 0 : map.size(); + } + + @Override + public Set> entrySet() { + return map == null ? Collections.emptySet() : map.entrySet(); + } + + @Override + public String get(Object object) { + String key = (String)object; // throwing ClassCastException is allowed + if (map == null) { + return null; + } + for (Entry entry : map.entrySet()) { + if (entry.getKey().equalsIgnoreCase(key)) { + return entry.getValue(); + } + } + return null; + } + + @Override + public String put(String key, String value) { + String result; + if (map == null) { + map = new HashMap<>(); + result = null; + } else { + result = remove(key); + } + map.put(key, value); + return result; + } + + public String add(String key, String value) { + if (map == null) { + map = new HashMap<>(); + } else { + for (Entry entry : map.entrySet()) { + if (entry.getKey().equalsIgnoreCase(key)) { + String current = entry.getValue(); + entry.setValue(current + "; " + value); + return current; + } + } + } + return map.put(key, value); + } + + @Override + public String remove(Object object) { + String key = (String)object; // throwing ClassCastException is allowed + if (map == null) { + return null; + } else { + Iterator> entries = map.entrySet().iterator(); + while(entries.hasNext()) { + Entry entry = entries.next(); + if (entry.getKey().equalsIgnoreCase(key)) { + entries.remove(); + return entry.getKey(); + } + } + } + return null; + } + + /** + * Return a locked copy of this header map that cannot be modified + */ + public HeaderMap locked() { + return new Locked(map); + } + + private static class Locked extends HeaderMap { + Locked(Map map) { + super(map); + } + + private String isLocked() { + throw new UnsupportedOperationException("HeaderMap is write locked"); + } + + @Override + public String put(String key, String value) { + return isLocked(); + } + + @Override + public String add(String key, String value) { + return isLocked(); + } + + @Override + public String remove(Object object) { + return isLocked(); + } + + @Override + public Set> entrySet() { + if (map == null) { + return Collections.emptySet(); + } else { + return Collections.unmodifiableSet(super.entrySet()); + } + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/http/TransportHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/http/TransportHttpClient.java new file mode 100644 index 000000000..74ce8bd7a --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/http/TransportHttpClient.java @@ -0,0 +1,261 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.http; + +import co.elastic.clients.transport.DefaultTransportOptions; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.util.BinaryData; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Minimal http client interface needed to implement an Elasticsearch transport. + */ +public interface TransportHttpClient { + + /** + * Create a client-specific options value from an existing option object. If {@code null}, this must + * create the default options to which additional options can be added. + *

+ * This method allows implementations to return subclasses with more features (that applications can use by downcasting the result). + * By default, it will use {@link DefaultTransportOptions}. + */ + default TransportOptions createOptions(@Nullable TransportOptions options) { + return options == null ? DefaultTransportOptions.EMPTY : options; + } + + /** + * Perform a blocking request. + * + * @param endpointId the endpoint identifier. Can be used to have specific strategies depending on the endpoint. + * @param node the node to send the request to. If {@code null}, the implementation has to choose which node to send the request to, + * or throw an {@code IllegalArgumentException}. + * @param request the request + * @param options additional options for the http client. Headers and request parameters set in the options have precedence over + * those defined by the request and should replace them in the final request sent. + * + * @return the response + * @throws IllegalArgumentException if {@code node} is {@code is null} and the implementation cannot decide of + * a node to use. + */ + Response performRequest(String endpointId, @Nullable Node node, Request request, TransportOptions options) throws IOException; + + /** + * Perform an asynchronous request. + *

+ * Implementations should return a {@code CompletableFuture} whose cancellation also cancels any http request in flight and frees + * the associated resources. This allows applications to implement scenarios like timeouts or "first to respond" fan-out without + * leaking resources. + * + * @param endpointId the endpoint identifier. Can be used to have specific strategies depending on the endpoint. + * @param node the node to send the request to. If {@code null}, the implementation has to choose which node to send the request to, + * or throw an {@code IllegalArgumentException}. + * @param request the request + * @param options additional options for the http client. Headers and request parameters set in the options have precedence over + * those defined by the request and should replace them in the final request sent. + * + * @return a future that will be completed with the response. + */ + CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options); + + /** + * Close this client, freeing associated resources. + */ + void close() throws IOException; + + /** + * A node/host to send requests to. + */ + class Node { + private final URI uri; + private final Set roles; + private final Map attributes; + + /** + * Create a node with its URI, roles and attributes. + *

+ * If the URI doesn't end with a '{@code /}', then one is added. + * + * @param uri the node's URI + * @param roles the node's roles (such as "master", "ingest", etc). This can be used for routing decisions by multi-node + * implementations. + * @param attributes the node's attributes. This can be used for routing decisions by multi-node implementations. + */ + public Node(URI uri, Set roles, Map attributes) { + if (!uri.isAbsolute()) { + throw new IllegalArgumentException("Node URIs must be absolute: " + uri); + } + + if (!uri.getRawPath().endsWith("/")) { + uri = uri.resolve(uri.getRawPath() + "/"); + } + + this.uri = uri; + this.roles = roles; + this.attributes = attributes; + } + + public Node(URI uri) { + this(uri, Collections.emptySet(), Collections.emptyMap()); + } + + public Node(String uri) { + this(URI.create(uri), Collections.emptySet(), Collections.emptyMap()); + } + + /** + * The URI of this node. This is an absolute URL with a path ending with a "/". + */ + public URI uri() { + return this.uri; + } + + @Override + public String toString() { + return uri.toString(); + } + + /** + * Two nodes are considered equal if their URIs are equal. Roles and attributes are ignored. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Node)) return false; + Node node = (Node) o; + return Objects.equals(uri, node.uri); + } + + /** + * A node's hash code is that of its URI. Roles and attributes are ignored. + */ + @Override + public int hashCode() { + return Objects.hash(uri); + } + } + + /** + * An http request. + */ + class Request { + @Nullable + private final String method; + private final String path; + private final Map queryParams; + private final Map headers; + @Nullable + private final Iterable body; + + public Request( + String method, + String path, + Map queryParams, + Map headers, + @Nullable Iterable body + ) { + this.method = method; + this.path = path; + this.queryParams = queryParams; + this.headers = headers; + this.body = body; + } + + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map queryParams() { + return queryParams; + } + + public Map headers() { + return headers; + } + + @Nullable + public Iterable body() { + return body; + } + } + + /** + * An http response. + */ + interface Response extends Closeable { + + /** + * The host/node that was used to send the request. It may be different from the one that was provided with the request + * if the http client has a multi-node retry strategy. + */ + Node node(); + + /** + * The response status code. + */ + int statusCode(); + + /** + * Get a header value, or the first value if the header has multiple values. + *

+ * Note: header names are case-insensitive + */ + @Nullable + String header(String name); + + /** + * Get all values for a given header name. + *

+ * Note: header names are case-insensitive + */ + List headers(String name); + + /** + * The response body, if any. + */ + @Nullable + BinaryData body() throws IOException; + + /** + * The original response of the underlying http library, if available. + */ + @Nullable + Object originalResponse(); + + /** + * Close this response, freeing its associated resources if needed, such as consuming the response body. + */ + void close() throws IOException; + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/instrumentation/Instrumentation.java b/java-client/src/main/java/co/elastic/clients/transport/instrumentation/Instrumentation.java new file mode 100644 index 000000000..0adab2bfe --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/instrumentation/Instrumentation.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.instrumentation; + +import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.TransportHttpClient; + +/** + * Instrumentation for an Elasticsearch client. It allows creating a {@link Context} for each request, + * with callbacks for the various stages of request and response processing. + */ +public interface Instrumentation { + + /** + * Create a context for a given request and the corresponding endpoint. + */ + Context newContext(TRequest request, Endpoint endpoint); + + /** + * A context with lifecycle callbacks for the various stages of request and response processing. Must be {@link #close()}d. + */ + interface Context extends AutoCloseable { + + /** + * Sets this context (or the underlying abstraction) as the current thread's scope, so that neste call can + * nest child contexts. + */ + ThreadScope makeCurrent(); + + /** + * Called once the initial API request has been serialized and the http request has been prepared. + */ + void beforeSendingHttpRequest(TransportHttpClient.Request httpRequest, TransportOptions options); + + /** + * Called after the http response has been received, and before analyzing it. + */ + void afterReceivingHttpResponse(TransportHttpClient.Response httpResponse); + + /** + * Called after the http response has been deserialized + */ + void afterDecodingApiResponse(TResponse apiResponse); + + /** + * Called when any stage of request processing caused a failure. + */ + void recordException(Throwable thr); + + @Override + void close(); + } + + /** + * A thread scope. Closing it will detach the scope from the current thread. + */ + interface ThreadScope extends AutoCloseable { + @Override + void close(); + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/instrumentation/NoopInstrumentation.java b/java-client/src/main/java/co/elastic/clients/transport/instrumentation/NoopInstrumentation.java new file mode 100644 index 000000000..93b5555b1 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/instrumentation/NoopInstrumentation.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.instrumentation; + +import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.TransportHttpClient; + +/** + * A no-operation instrumentation. Used when no instrumentation has been set. It can also be used to + * bypass OpenTelemetry automatic discovery. + */ +public class NoopInstrumentation implements Instrumentation { + + public static NoopInstrumentation INSTANCE = new NoopInstrumentation(); + + private NoopInstrumentation() {} + + @Override + public Context newContext(TRequest request, Endpoint endpoint) { + return CONTEXT; + } + + private static final NoopContext CONTEXT = new NoopContext(); + private static final NoopScope SCOPE = new NoopScope(); + + private static class NoopContext implements Context { + @Override + public ThreadScope makeCurrent() { + return SCOPE; + } + + @Override + public void beforeSendingHttpRequest(TransportHttpClient.Request httpRequest, TransportOptions options) {} + + @Override + public void afterReceivingHttpResponse(TransportHttpClient.Response httpResponse) {} + + @Override + public void afterDecodingApiResponse(TResponse apiResponse) {} + + @Override + public void recordException(Throwable thr) {} + + @Override + public void close() {} + } + + private static class NoopScope implements ThreadScope { + @Override + public void close() {} + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java new file mode 100644 index 000000000..64b5aa08a --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java @@ -0,0 +1,285 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.HeaderMap; +import co.elastic.clients.transport.http.TransportHttpClient; +import co.elastic.clients.util.BinaryData; +import co.elastic.clients.util.NoCopyByteArrayOutputStream; +import org.apache.http.Header; +import org.apache.http.HeaderElement; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.ResponseListener; +import org.elasticsearch.client.RestClient; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.AbstractList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class RestClientHttpClient implements TransportHttpClient { + + private static final ConcurrentHashMap ContentTypeCache = new ConcurrentHashMap<>(); + + /** + * The {@code Future} implementation returned by async requests. + * It wraps the RestClient's cancellable and propagates cancellation. + */ + private static class RequestFuture extends CompletableFuture { + private volatile Cancellable cancellable; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && cancellable != null) { + cancellable.cancel(); + } + return cancelled; + } + } + + private final RestClient restClient; + + public RestClientHttpClient(RestClient restClient) { + this.restClient = restClient; + } + + /** + * Returns the underlying low level Rest Client used by this transport. + */ + public RestClient restClient() { + return this.restClient; + } + + @Override + public RestClientOptions createOptions(@Nullable TransportOptions options) { + return RestClientOptions.of(options); + } + + @Override + public Response performRequest(String endpointId, @Nullable Node node, Request request, TransportOptions options) throws IOException { + RestClientOptions rcOptions = RestClientOptions.of(options); + org.elasticsearch.client.Request restRequest = createRestRequest(request, rcOptions); + org.elasticsearch.client.Response restResponse = restClient.performRequest(restRequest); + return new RestResponse(restResponse); + } + + @Override + public CompletableFuture performRequestAsync( + String endpointId, @Nullable Node node, Request request, TransportOptions options + ) { + + RequestFuture future = new RequestFuture<>(); + org.elasticsearch.client.Request restRequest; + + try { + RestClientOptions rcOptions = RestClientOptions.of(options); + restRequest = createRestRequest(request, rcOptions); + } catch(Throwable thr) { + // Terminate early + future.completeExceptionally(thr); + return future; + } + + future.cancellable = restClient.performRequestAsync(restRequest, new ResponseListener() { + @Override + public void onSuccess(org.elasticsearch.client.Response response) { + future.complete(new RestResponse(response)); + } + + @Override + public void onFailure(Exception exception) { + future.completeExceptionally(exception); + } + }); + + return future; + } + + @Override + public void close() throws IOException { + this.restClient.close(); + } + + private org.elasticsearch.client.Request createRestRequest(Request request, RestClientOptions options) { + org.elasticsearch.client.Request clientReq = new org.elasticsearch.client.Request( + request.method(), request.path() + ); + + Iterable body = request.body(); + + Map requestHeaders = request.headers(); + if (!requestHeaders.isEmpty()) { + + int headerCount = requestHeaders.size(); + if ((body == null && headerCount != 3) || headerCount != 4) { + if (options == null) { + options = RestClientOptions.initialOptions(); + } + + RestClientOptions.Builder builder = options.toBuilder(); + for (Map.Entry header : requestHeaders.entrySet()) { + builder.setHeader(header.getKey(), header.getValue()); + } + // Original option headers have precedence + for (Map.Entry header : options.headers()) { + builder.setHeader(header.getKey(), header.getValue()); + } + options = builder.build(); + } + } + + if (options != null) { + clientReq.setOptions(options.restClientRequestOptions()); + } + + clientReq.addParameters(request.queryParams()); + + if (body != null) { + ContentType ct = null; + String ctStr; + if (( ctStr = requestHeaders.get(HeaderMap.CONTENT_TYPE)) != null) { + ct = ContentTypeCache.computeIfAbsent(ctStr, ContentType::parse); + } + clientReq.setEntity(new MultiBufferEntity(body, ct)); + } + + // Request parameter intercepted by LLRC + clientReq.addParameter("ignore", "400,401,403,404,405"); + return clientReq; + } + + static class RestResponse implements Response { + private final org.elasticsearch.client.Response restResponse; + + RestResponse(org.elasticsearch.client.Response restResponse) { + this.restResponse = restResponse; + } + + @Override + public Node node() { + return new Node(restResponse.getHost().toURI()); + } + + @Override + public int statusCode() { + return restResponse.getStatusLine().getStatusCode(); + } + + @Override + public String header(String name) { + return restResponse.getHeader(name); + } + + @Override + public List headers(String name) { + Header[] headers = restResponse.getHeaders(); + for (int i = 0; i < headers.length; i++) { + Header header = headers[i]; + if (header.getName().equalsIgnoreCase(name)) { + HeaderElement[] elements = header.getElements(); + return new AbstractList() { + @Override + public String get(int index) { + return elements[index].getValue(); + } + + @Override + public int size() { + return elements.length; + } + }; + } + } + return Collections.emptyList(); + } + + @Nullable + @Override + public BinaryData body() throws IOException { + HttpEntity entity = restResponse.getEntity(); + return entity == null ? null : new HttpEntityBinaryData(restResponse.getEntity()); + } + + @Nullable + @Override + public org.elasticsearch.client.Response originalResponse() { + return this.restResponse; + } + + @Override + public void close() throws IOException { + EntityUtils.consume(restResponse.getEntity()); + } + } + + private static class HttpEntityBinaryData implements BinaryData { + private final HttpEntity entity; + + HttpEntityBinaryData(HttpEntity entity) { + this.entity = entity; + } + + @Override + public String contentType() { + Header h = entity.getContentType(); + return h == null ? "application/octet-stream" : h.getValue(); + } + + @Override + public void writeTo(OutputStream out) throws IOException { + entity.writeTo(out); + } + + @Override + public ByteBuffer asByteBuffer() throws IOException { + NoCopyByteArrayOutputStream out = new NoCopyByteArrayOutputStream(); + entity.writeTo(out); + return out.asByteBuffer(); + } + + @Override + public InputStream asInputStream() throws IOException { + return entity.getContent(); + } + + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + @Override + public long size() { + long len = entity.getContentLength(); + return len < 0 ? -1 : entity.getContentLength(); + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java index 5c4edde77..9de6da07e 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java @@ -21,12 +21,15 @@ import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.Version; +import co.elastic.clients.transport.http.HeaderMap; +import co.elastic.clients.util.LanguageRuntimeVersions; import co.elastic.clients.util.VisibleForTesting; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.util.VersionInfo; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.WarningsHandler; +import javax.annotation.Nullable; import java.util.AbstractMap; import java.util.Collection; import java.util.List; @@ -39,25 +42,25 @@ public class RestClientOptions implements TransportOptions { private final RequestOptions options; - private static final String CLIENT_META_HEADER = "X-Elastic-Client-Meta"; - private static final String USER_AGENT_HEADER = "User-Agent"; - @VisibleForTesting static final String CLIENT_META_VALUE = getClientMeta(); @VisibleForTesting static final String USER_AGENT_VALUE = getUserAgent(); - static RestClientOptions of(TransportOptions options) { + static RestClientOptions of(@Nullable TransportOptions options) { + if (options == null) { + return initialOptions(); + } + if (options instanceof RestClientOptions) { return (RestClientOptions)options; - - } else { - final Builder builder = new Builder(RequestOptions.DEFAULT.toBuilder()); - options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue())); - options.queryParameters().forEach(builder::setParameter); - builder.onWarnings(options.onWarnings()); - return builder.build(); } + + final Builder builder = new Builder(RequestOptions.DEFAULT.toBuilder()); + options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue())); + options.queryParameters().forEach(builder::setParameter); + builder.onWarnings(options.onWarnings()); + return builder.build(); } public RestClientOptions(RequestOptions options) { @@ -118,24 +121,46 @@ public RequestOptions.Builder restClientRequestOptionsBuilder() { @Override public TransportOptions.Builder addHeader(String name, String value) { - if (name.equalsIgnoreCase(CLIENT_META_HEADER)) { + if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { // Not overridable return this; } - if (name.equalsIgnoreCase(USER_AGENT_HEADER)) { + if (name.equalsIgnoreCase(HeaderMap.USER_AGENT)) { // We must remove our own user-agent from the options, or we'll end up with multiple values for the header - builder.removeHeader(USER_AGENT_HEADER); + builder.removeHeader(HeaderMap.USER_AGENT); } builder.addHeader(name, value); return this; } + @Override + public TransportOptions.Builder setHeader(String name, String value) { + if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { + // Not overridable + return this; + } + builder.removeHeader(name).addHeader(name, value); + return this; + } + + @Override + public TransportOptions.Builder removeHeader(String name) { + builder.removeHeader(name); + return this; + } + @Override public TransportOptions.Builder setParameter(String name, String value) { + // Should be remove and add, but we can't remove. builder.addParameter(name, value); return this; } + @Override + public TransportOptions.Builder removeParameter(String name) { + throw new UnsupportedOperationException("This implementation does not support removing parameters"); + } + /** * Called if there are warnings to determine if those warnings should fail the request. */ @@ -163,17 +188,17 @@ public RestClientOptions build() { } static RestClientOptions initialOptions() { - return new RestClientOptions(RequestOptions.DEFAULT); + return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS); } private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) { - builder.removeHeader(CLIENT_META_HEADER); - builder.addHeader(CLIENT_META_HEADER, CLIENT_META_VALUE); - if (builder.getHeaders().stream().noneMatch(h -> h.getName().equalsIgnoreCase(USER_AGENT_HEADER))) { - builder.addHeader(USER_AGENT_HEADER, USER_AGENT_VALUE); + builder.removeHeader(HeaderMap.CLIENT_META); + builder.addHeader(HeaderMap.CLIENT_META, CLIENT_META_VALUE); + if (builder.getHeaders().stream().noneMatch(h -> h.getName().equalsIgnoreCase(HeaderMap.USER_AGENT))) { + builder.addHeader(HeaderMap.USER_AGENT, USER_AGENT_VALUE); } - if (builder.getHeaders().stream().noneMatch(h -> h.getName().equalsIgnoreCase("Accept"))) { - builder.addHeader("Accept", RestClientTransport.JsonContentType.toString()); + if (builder.getHeaders().stream().noneMatch(h -> h.getName().equalsIgnoreCase(HeaderMap.ACCEPT))) { + builder.addHeader(HeaderMap.ACCEPT, RestClientTransport.JSON_CONTENT_TYPE); } return builder; @@ -211,9 +236,9 @@ private static String getClientMeta() { + metaVersion + ",jv=" + System.getProperty("java.specification.version") - + ",hl=2" + ",t=" + metaVersion + + ",hl=2" + ",hc=" + (httpClientVersion == null ? "" : httpClientVersion.getRelease()) + LanguageRuntimeVersions.getRuntimeMetadata(); diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java index 7e4bfa3cc..dbadfbe1a 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java @@ -19,420 +19,30 @@ package co.elastic.clients.transport.rest_client; -import co.elastic.clients.elasticsearch._types.ElasticsearchException; -import co.elastic.clients.elasticsearch._types.ErrorResponse; -import co.elastic.clients.json.JsonpDeserializer; import co.elastic.clients.json.JsonpMapper; -import co.elastic.clients.json.NdJsonpSerializable; -import co.elastic.clients.transport.JsonEndpoint; -import co.elastic.clients.transport.TransportException; -import co.elastic.clients.transport.Version; -import co.elastic.clients.transport.endpoints.BinaryEndpoint; -import co.elastic.clients.transport.endpoints.BooleanEndpoint; -import co.elastic.clients.transport.endpoints.BooleanResponse; -import co.elastic.clients.transport.ElasticsearchTransport; -import co.elastic.clients.transport.Endpoint; -import co.elastic.clients.transport.TransportOptions; -import co.elastic.clients.util.ApiTypeHelper; -import co.elastic.clients.util.BinaryData; -import co.elastic.clients.util.MissingRequiredPropertyException; -import jakarta.json.JsonException; -import jakarta.json.stream.JsonGenerator; -import jakarta.json.stream.JsonParser; -import org.apache.http.HttpEntity; -import org.apache.http.entity.BufferedHttpEntity; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.entity.ContentType; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.elasticsearch.client.Cancellable; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.ResponseListener; +import co.elastic.clients.transport.ElasticsearchTransportBase; +import co.elastic.clients.transport.instrumentation.Instrumentation; import org.elasticsearch.client.RestClient; -import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; +public class RestClientTransport extends ElasticsearchTransportBase { -public class RestClientTransport implements ElasticsearchTransport { - - static final ContentType JsonContentType; - - static { - - if (Version.VERSION == null) { - JsonContentType = ContentType.APPLICATION_JSON; - } else { - JsonContentType = ContentType.create( - "application/vnd.elasticsearch+json", - new BasicNameValuePair("compatible-with", String.valueOf(Version.VERSION.major())) - ); - } - } - - /** - * The {@code Future} implementation returned by async requests. - * It wraps the RestClient's cancellable and propagates cancellation. - */ - private static class RequestFuture extends CompletableFuture { - private volatile Cancellable cancellable; + private final RestClient restClient; - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = super.cancel(mayInterruptIfRunning); - if (cancelled && cancellable != null) { - cancellable.cancel(); - } - return cancelled; - } + public RestClientTransport(RestClient restClient, JsonpMapper jsonpMapper) { + this(restClient, jsonpMapper, null); } - private final RestClient restClient; - private final JsonpMapper mapper; - private final RestClientOptions transportOptions; - - public RestClientTransport(RestClient restClient, JsonpMapper mapper, @Nullable TransportOptions options) { + public RestClientTransport(RestClient restClient, JsonpMapper jsonpMapper, RestClientOptions options) { + super(new RestClientHttpClient(restClient), options, jsonpMapper, null); this.restClient = restClient; - this.mapper = mapper; - this.transportOptions = options == null ? RestClientOptions.initialOptions() : RestClientOptions.of(options); } - public RestClientTransport(RestClient restClient, JsonpMapper mapper) { - this(restClient, mapper, null); + public RestClientTransport(RestClient restClient, JsonpMapper jsonpMapper, RestClientOptions options, Instrumentation instrumentation) { + super(new RestClientHttpClient(restClient), options, jsonpMapper, instrumentation); + this.restClient = restClient; } - /** - * Returns the underlying low level Rest Client used by this transport. - */ public RestClient restClient() { return this.restClient; } - - /** - * Copies this {@link #RestClientTransport} with specific request options. - */ - public RestClientTransport withRequestOptions(@Nullable TransportOptions options) { - return new RestClientTransport(this.restClient, this.mapper, options); - } - - @Override - public JsonpMapper jsonpMapper() { - return mapper; - } - - @Override - public TransportOptions options() { - return transportOptions; - } - - @Override - public void close() throws IOException { - this.restClient.close(); - } - - public ResponseT performRequest( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options - ) throws IOException { - - org.elasticsearch.client.Request clientReq = prepareLowLevelRequest(request, endpoint, options); - org.elasticsearch.client.Response clientResp = restClient.performRequest(clientReq); - return getHighLevelResponse(clientResp, endpoint); - } - - public CompletableFuture performRequestAsync( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options - ) { - RequestFuture future = new RequestFuture<>(); - org.elasticsearch.client.Request clientReq; - try { - clientReq = prepareLowLevelRequest(request, endpoint, options); - } catch (Exception e) { - // Terminate early - future.completeExceptionally(e); - return future; - } - - // Propagate required property checks to the thread that will decode the response - boolean disableRequiredChecks = ApiTypeHelper.requiredPropertiesCheckDisabled(); - - future.cancellable = restClient.performRequestAsync(clientReq, new ResponseListener() { - @Override - public void onSuccess(Response clientResp) { - try (ApiTypeHelper.DisabledChecksHandle h = - ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) { - - ResponseT response = getHighLevelResponse(clientResp, endpoint); - future.complete(response); - - } catch (Exception e) { - future.completeExceptionally(e); - } - } - - @Override - public void onFailure(Exception e) { - future.completeExceptionally(e); - } - }); - - return future; - } - - private org.elasticsearch.client.Request prepareLowLevelRequest( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options - ) throws IOException { - String method = endpoint.method(request); - String path = endpoint.requestUrl(request); - Map params = endpoint.queryParameters(request); - - org.elasticsearch.client.Request clientReq = new org.elasticsearch.client.Request(method, path); - - RequestOptions restOptions = options == null ? - transportOptions.restClientRequestOptions() : - RestClientOptions.of(options).restClientRequestOptions(); - - if (restOptions != null) { - clientReq.setOptions(restOptions); - } - - clientReq.addParameters(params); - - Object body = endpoint.body(request); - if (body != null) { - // Request has a body - if (body instanceof NdJsonpSerializable) { - List lines = new ArrayList<>(); - collectNdJsonLines(lines, (NdJsonpSerializable) request); - clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType)); - - } else if (body instanceof BinaryData) { - BinaryData data = (BinaryData)body; - - // ES expects the Accept and Content-Type headers to be consistent. - ContentType contentType; - String dataContentType = data.contentType(); - if (co.elastic.clients.util.ContentType.APPLICATION_JSON.equals(dataContentType)) { - // Fast path - contentType = JsonContentType; - } else { - contentType = ContentType.parse(dataContentType); - } - - clientReq.setEntity(new MultiBufferEntity( - Collections.singletonList(data.asByteBuffer()), - contentType - )); - - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); - mapper.serialize(body, generator); - generator.close(); - clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); - } - } - - // Request parameter intercepted by LLRC - clientReq.addParameter("ignore", "400,401,403,404,405"); - return clientReq; - } - - private static final ByteBuffer NdJsonSeparator = ByteBuffer.wrap("\n".getBytes(StandardCharsets.UTF_8)); - - private void collectNdJsonLines(List lines, NdJsonpSerializable value) { - Iterator values = value._serializables(); - while(values.hasNext()) { - Object item = values.next(); - if (item == null) { - // Skip - } else if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself - collectNdJsonLines(lines, (NdJsonpSerializable)item); - } else { - // TODO: items that aren't already BinaryData could be serialized to ByteBuffers lazily - // to reduce the number of buffers to keep in memory - lines.add(BinaryData.of(item, this.mapper).asByteBuffer()); - lines.add(NdJsonSeparator); - } - } - } - - /** - * Write an nd-json value by serializing each of its items on a separate line, recursing if its items themselves implement - * {@link NdJsonpSerializable} to flattening nested structures. - */ - private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) throws IOException { - Iterator values = value._serializables(); - while(values.hasNext()) { - Object item = values.next(); - if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself - writeNdJson((NdJsonpSerializable) item, baos); - } else { - JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); - mapper.serialize(item, generator); - generator.close(); - baos.write('\n'); - } - } - } - - private ResponseT getHighLevelResponse( - org.elasticsearch.client.Response clientResp, - Endpoint endpoint - ) throws IOException { - - int statusCode = clientResp.getStatusLine().getStatusCode(); - try { - - if (statusCode == 200) { - checkProductHeader(clientResp, endpoint); - } - - if (endpoint.isError(statusCode)) { - JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); - if (errorDeserializer == null) { - throw new TransportException( - statusCode, - "Request failed with status code '" + statusCode + "'", - endpoint.id(), new ResponseException(clientResp) - ); - } - - HttpEntity entity = clientResp.getEntity(); - if (entity == null) { - throw new TransportException( - statusCode, - "Expecting a response body, but none was sent", - endpoint.id(), new ResponseException(clientResp) - ); - } - - // We may have to replay it. - entity = new BufferedHttpEntity(entity); - - try { - InputStream content = entity.getContent(); - try (JsonParser parser = mapper.jsonProvider().createParser(content)) { - ErrorT error = errorDeserializer.deserialize(parser, mapper); - // TODO: have the endpoint provide the exception constructor - throw new ElasticsearchException(endpoint.id(), (ErrorResponse) error); - } - } catch(JsonException | MissingRequiredPropertyException errorEx) { - // Could not decode exception, try the response type - try { - ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); - return response; - } catch(Exception respEx) { - // No better luck: throw the original error decoding exception - throw new TransportException(statusCode, - "Failed to decode error response, check exception cause for additional details", endpoint.id(), - new ResponseException(clientResp) - ); - } - } - } else { - return decodeResponse(statusCode, clientResp.getEntity(), clientResp, endpoint); - } - } finally { - // Consume the entity unless this is a successful binary endpoint, where the user must consume the entity - if (!(endpoint instanceof BinaryEndpoint && !endpoint.isError(statusCode))) { - EntityUtils.consume(clientResp.getEntity()); - } - } - } - - private ResponseT decodeResponse( - int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint - ) throws IOException { - - if (endpoint instanceof JsonEndpoint) { - @SuppressWarnings("unchecked") - JsonEndpoint jsonEndpoint = (JsonEndpoint) endpoint; - // Successful response - ResponseT response = null; - JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); - if (responseParser != null) { - // Expecting a body - if (entity == null) { - throw new TransportException( - statusCode, - "Expecting a response body, but none was sent", - endpoint.id(), new ResponseException(clientResp) - ); - } - InputStream content = entity.getContent(); - try (JsonParser parser = mapper.jsonProvider().createParser(content)) { - response = responseParser.deserialize(parser, mapper); - } - } - return response; - - } else if(endpoint instanceof BooleanEndpoint) { - BooleanEndpoint bep = (BooleanEndpoint) endpoint; - - @SuppressWarnings("unchecked") - ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); - return response; - - - } else if (endpoint instanceof BinaryEndpoint) { - BinaryEndpoint bep = (BinaryEndpoint) endpoint; - - @SuppressWarnings("unchecked") - ResponseT response = (ResponseT) new HttpClientBinaryResponse(entity); - return response; - - } else { - throw new TransportException(statusCode, "Unhandled endpoint type: '" + endpoint.getClass().getName() + "'", endpoint.id()); - } - } - - // Endpoints that (incorrectly) do not return the Elastic product header - private static final Set endpointsMissingProductHeader = new HashSet<>(Arrays.asList( - "es/snapshot.create" // #74 / elastic/elasticsearch#82358 - )); - - private void checkProductHeader(Response clientResp, Endpoint endpoint) throws IOException { - String header = clientResp.getHeader("X-Elastic-Product"); - if (header == null) { - if (endpointsMissingProductHeader.contains(endpoint.id())) { - return; - } - throw new TransportException( - clientResp.getStatusLine().getStatusCode(), - "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " - + "instance, and that any networking filters are preserving that header.", - endpoint.id(), - new ResponseException(clientResp) - ); - } - - if (!"Elasticsearch".equals(header)) { - throw new TransportException( - clientResp.getStatusLine().getStatusCode(), - "Invalid value '" + header + "' for 'X-Elastic-Product' header.", - endpoint.id(), - new ResponseException(clientResp) - ); - } - } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java new file mode 100644 index 000000000..c7d6cfde0 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import org.apache.http.HttpException; +import org.apache.http.HttpResponse; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.protocol.HttpContext; +import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; +import org.elasticsearch.client.RequestOptions; + +import java.io.IOException; + +/** + * A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor. + */ +public class SafeResponseConsumer implements HttpAsyncResponseConsumer { + + private final HttpAsyncResponseConsumer delegate; + + /** + * A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}. + */ + public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>( + RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer() + ); + + /** + * Same as {@code RequestOptions.DEFAULT} with a safe consumer factory + */ + public static final RequestOptions DEFAULT_REQUEST_OPTIONS; + + static { + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + builder.setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY); + DEFAULT_REQUEST_OPTIONS = builder.build(); + } + + public SafeResponseConsumer(HttpAsyncResponseConsumer delegate) { + this.delegate = delegate; + } + + @SuppressWarnings("unchecked") + private static void throwUnchecked(Throwable thr) throws T { + throw (T) thr; + } + + @Override + public void responseReceived(HttpResponse response) throws IOException, HttpException { + try { + delegate.responseReceived(response); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error receiving response", e); + } + } + + @Override + public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException { + try { + delegate.consumeContent(decoder, ioControl); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error consuming content", e); + } + } + + @Override + public void responseCompleted(HttpContext context) { + try { + delegate.responseCompleted(context); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error completing response", e); + } + } + + @Override + public void failed(Exception ex) { + try { + delegate.failed(ex); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error handling failure", e); + } + } + + @Override + public Exception getException() { + return delegate.getException(); + } + + @Override + public T getResult() { + return delegate.getResult(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public boolean cancel() { + return delegate.cancel(); + } +} diff --git a/java-client/src/main/java/co/elastic/clients/util/BinaryData.java b/java-client/src/main/java/co/elastic/clients/util/BinaryData.java index 02ec646b1..ddd9123b7 100644 --- a/java-client/src/main/java/co/elastic/clients/util/BinaryData.java +++ b/java-client/src/main/java/co/elastic/clients/util/BinaryData.java @@ -27,6 +27,7 @@ import jakarta.json.stream.JsonParser; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -43,13 +44,31 @@ public interface BinaryData { /** * Write this data to an output stream. + * @throws IllegalStateException if the content has already been consumed and the object + * isn't replayable. */ void writeTo(OutputStream out) throws IOException; /** - * Return this data as a {@code ByteBuffer} + * Return this data as a {@code ByteBuffer}. + * + * @throws IllegalStateException if the content has already been consumed and the object + * isn't replayable. + */ + ByteBuffer asByteBuffer() throws IOException; + + /** + * Return this data as an {@code InputStream}. + * + * @throws IllegalStateException if the content has already been consumed and the object + * isn't replayable. + */ + InputStream asInputStream() throws IOException; + + /** + * Can this object be consumed several times? */ - ByteBuffer asByteBuffer(); + boolean isRepeatable(); /** * Get the estimated size in bytes of the data. diff --git a/java-client/src/main/java/co/elastic/clients/util/ByteArrayBinaryData.java b/java-client/src/main/java/co/elastic/clients/util/ByteArrayBinaryData.java index 4b128a2e3..9641b013f 100644 --- a/java-client/src/main/java/co/elastic/clients/util/ByteArrayBinaryData.java +++ b/java-client/src/main/java/co/elastic/clients/util/ByteArrayBinaryData.java @@ -27,8 +27,10 @@ import jakarta.json.stream.JsonGenerator; import jakarta.json.stream.JsonParser; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.EnumSet; @@ -41,20 +43,32 @@ public class ByteArrayBinaryData implements BinaryData { private final int length; private final String contentType; - ByteArrayBinaryData(byte[] bytes, int offset, int length, String contentType) { + public ByteArrayBinaryData(byte[] bytes, int offset, int length, String contentType) { this.contentType = contentType; this.bytes = bytes; this.offset = offset; this.length = length; } - ByteArrayBinaryData(byte[] bytes, String contentType) { + public ByteArrayBinaryData(byte[] bytes, String contentType) { this.contentType = contentType; this.bytes = bytes; this.offset = 0; this.length = bytes.length; } + /** + * Copy another {@link BinaryData}. Typically used to make a replayable {@link BinaryData} + * from a non-replayable one. + */ + public ByteArrayBinaryData(BinaryData data) throws IOException { + NoCopyByteArrayOutputStream out = new NoCopyByteArrayOutputStream(); + data.writeTo(out); + this.contentType = data.contentType(); + this.bytes = out.array(); + this.offset = 0; + this.length = out.size(); + } @Override public String contentType() { @@ -76,6 +90,16 @@ public ByteBuffer asByteBuffer() { return ByteBuffer.wrap(bytes, offset, length); } + @Override + public InputStream asInputStream() { + return new ByteArrayInputStream(bytes, offset, length); + } + + @Override + public boolean isRepeatable() { + return true; + } + private static class Deserializer extends JsonpDeserializerBase { Deserializer() { diff --git a/java-client/src/main/java/co/elastic/clients/util/DuplicateResourceFinder.java b/java-client/src/main/java/co/elastic/clients/util/DuplicateResourceFinder.java new file mode 100644 index 000000000..8da11ab92 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/util/DuplicateResourceFinder.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.util; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +public class DuplicateResourceFinder { + + private static volatile boolean ENABLED = true; + + /** + * Disables the resource uniqueness checks. Use with caution, as it will mask problems that may hit later. + */ + public static void enableCheck(boolean enabled) { + ENABLED = enabled; + } + + /** + * Ensure a class is only defined once in this class' classpath + */ + public static void ensureClassUniqueness(Class clazz) { + String name = clazz.getName(); + String resource = clazz.getName().replace('.', '/') + ".class"; + ensureResourceUniqueness(resource, name, DuplicateResourceFinder.class.getClassLoader()); + } + + public static void ensureResourceUniqueness(String path) { + ensureResourceUniqueness(path, path, DuplicateResourceFinder.class.getClassLoader()); + } + + private static void ensureResourceUniqueness(String path, String name, ClassLoader classLoader) { + if (!ENABLED) { + return; + } + + // With Java9 modules, will work only with exported classes/resources. This is actually + // what we want, as non-exported classes/resources will not conflict. + List list = new ArrayList<>(); + try { + Enumeration resources = classLoader.getResources(path); + while (resources.hasMoreElements()) { + list.add(resources.nextElement()); + } + } catch (IOException ioe) { + // Ignore + } + + if (list.size() > 1) { + StringBuilder sb = new StringBuilder("Several versions of ") + .append(name) + .append(" were found. This can cause conflicts, please fix the classpath:\n"); + for (URL url: list) { + sb.append(" ").append(url.toString()).append("\n"); + } + sb.append(" See the Java API client's troubleshooting documentation for more information.\n"); + throw new RuntimeException(sb.toString()); + } + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/LanguageRuntimeVersions.java b/java-client/src/main/java/co/elastic/clients/util/LanguageRuntimeVersions.java similarity index 98% rename from java-client/src/main/java/co/elastic/clients/transport/rest_client/LanguageRuntimeVersions.java rename to java-client/src/main/java/co/elastic/clients/util/LanguageRuntimeVersions.java index 5c9008e83..356ceebe0 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/LanguageRuntimeVersions.java +++ b/java-client/src/main/java/co/elastic/clients/util/LanguageRuntimeVersions.java @@ -17,14 +17,14 @@ * under the License. */ -package co.elastic.clients.transport.rest_client; +package co.elastic.clients.util; // Copied verbatim from https://github.com/elastic/jvm-languages-sniffer import java.lang.reflect.Field; import java.lang.reflect.Method; -class LanguageRuntimeVersions { +public class LanguageRuntimeVersions { /** * Returns runtime information by looking up classes identifying non-Java JVM diff --git a/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java b/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java index 5d91204a6..2021a50bc 100644 --- a/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java +++ b/java-client/src/main/java/co/elastic/clients/util/NoCopyByteArrayOutputStream.java @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; /** * A {@code ByteArrayOutputStream} that reduces copy operations of its underlying buffer. @@ -48,4 +49,11 @@ public byte[] array() { public ByteArrayInputStream asInputStream() { return new ByteArrayInputStream(this.buf, 0, this.count); } + + /** + * Get a {@code ByteBuffer} view on this object, based on the current buffer and size. + */ + public ByteBuffer asByteBuffer() { + return ByteBuffer.wrap(this.buf, 0, this.count); + } } diff --git a/java-client/src/main/java/co/elastic/clients/util/ObjectBuilderBase.java b/java-client/src/main/java/co/elastic/clients/util/ObjectBuilderBase.java index e61462418..96e76d206 100644 --- a/java-client/src/main/java/co/elastic/clients/util/ObjectBuilderBase.java +++ b/java-client/src/main/java/co/elastic/clients/util/ObjectBuilderBase.java @@ -19,6 +19,7 @@ package co.elastic.clients.util; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/ElasticsearchTestServer.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/ElasticsearchTestServer.java index c4eee1b93..7457158e4 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/ElasticsearchTestServer.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/ElasticsearchTestServer.java @@ -174,6 +174,10 @@ public int port() { return port; } + public ElasticsearchContainer container() { + return this.container; + } + public RestClient restClient() { return restClient; } diff --git a/java-client/src/test/java/co/elastic/clients/testkit/MockHttpClient.java b/java-client/src/test/java/co/elastic/clients/testkit/MockHttpClient.java new file mode 100644 index 000000000..27b3fba3e --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/testkit/MockHttpClient.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.testkit; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransportBase; +import co.elastic.clients.transport.TransportException; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.TransportHttpClient; +import co.elastic.clients.util.BinaryData; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class MockHttpClient implements TransportHttpClient { + + private static final Response NotFound = new MockResponse(404, null); + + Map responses = new ConcurrentHashMap<>(); + + public MockHttpClient add(String path, String contentType, byte[] data) { + responses.put(path, new MockResponse(200, BinaryData.of(data, contentType))); + return this; + } + + public MockHttpClient add(String path, String contentType, String text) { + responses.put(path, new MockResponse(200, BinaryData.of(text.getBytes(StandardCharsets.UTF_8), contentType))); + return this; + } + + public ElasticsearchClient client() { + return client(new ModelTestCase() {}.mapper); + } + + public ElasticsearchClient client(JsonpMapper mapper) { + return new ElasticsearchClient(new ElasticsearchTransportBase(this, null, mapper) { + @Override + public void close() throws IOException { + super.close(); + } + }); + } + + + @Override + public Response performRequest( + String endpointId, @Nullable TransportHttpClient.Node node, Request request, TransportOptions option + ) throws IOException { + Response response = responses.get(request.path()); + + if (response == null) { + throw new TransportException(NotFound, "Not found", endpointId); + } + + return response; + } + + @Override + public CompletableFuture performRequestAsync( + String endpointId, @Nullable TransportHttpClient.Node node, Request request, TransportOptions options + ) { + CompletableFuture result = new CompletableFuture<>(); + try { + Response response = performRequest(endpointId, node, request, options); + result.complete(response); + } catch (Exception e) { + result.completeExceptionally(e); + } + return result; + } + + @Override + public void close() throws IOException { + } + + private static class MockResponse implements Response { + + private final int statusCode; + private final BinaryData body; + private final Map headers; + + MockResponse(int statusCode, BinaryData body) { + this.statusCode = statusCode; + this.headers = new HashMap<>(); + this.body = body; + + if (body != null) { + headers.put("content-type", body.contentType()); + } + headers.put("x-elastic-product", "Elasticsearch"); + } + + @Override + public Node node() { + return null; + } + + @Override + public int statusCode() { + return statusCode; + } + + @Nullable + @Override + public String header(String name) { + return headers.get(name.toLowerCase()); + } + + @Override + public List headers(String name) { + String header = header(name); + return header == null ? null : Collections.singletonList(header); + } + + @Nullable + @Override + public BinaryData body() throws IOException { + return body; + } + + @Nullable + @Override + public Object originalResponse() { + return null; + } + + @Override + public void close() throws IOException { + } + } +} diff --git a/java-client/src/test/java/co/elastic/clients/testkit/ModelTestCase.java b/java-client/src/test/java/co/elastic/clients/testkit/ModelTestCase.java new file mode 100644 index 000000000..690ddad52 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/testkit/ModelTestCase.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.testkit; + +import co.elastic.clients.json.JsonpDeserializer; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.json.jsonb.JsonbJsonpMapper; +import jakarta.json.spi.JsonProvider; +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; +import org.junit.jupiter.api.Assertions; + +import java.io.StringReader; +import java.io.StringWriter; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.EnumSet; +import java.util.Random; + +/** + * Base class for tests that encode/decode json + */ +public abstract class ModelTestCase extends Assertions { + + protected enum JsonImpl { Jsonb, Jackson, Simple }; + + // Same value for all tests in a test run + private static final int RAND = new Random().nextInt(100); + + protected final JsonImpl jsonImpl; + protected final JsonpMapper mapper; + + private static JsonImpl chooseJsonImpl(EnumSet jsonImplCandidates, int rand) { + // Converting an EnumSet to an array always uses the same order. + return jsonImplCandidates.toArray(new JsonImpl[jsonImplCandidates.size()])[rand % jsonImplCandidates.size()]; + } + + private static JsonpMapper createMapper(JsonImpl jsonImpl, int rand) { + switch(jsonImpl) { + case Jsonb: + System.out.println("Using a JsonB mapper (rand = " + rand + ")."); + return new JsonbJsonpMapper() { + @Override + public boolean ignoreUnknownFields() { + return false; + } + }; + + case Jackson: + System.out.println("Using a Jackson mapper (rand = " + rand + ")."); + return new JacksonJsonpMapper() { + @Override + public boolean ignoreUnknownFields() { + return false; + } + }; + + default: + System.out.println("Using a simple mapper (rand = " + rand + ")."); + return SimpleJsonpMapper.INSTANCE_REJECT_UNKNOWN_FIELDS; + } + } + + protected ModelTestCase(EnumSet jsonImplCandidates, int rand) { + jsonImpl = chooseJsonImpl(jsonImplCandidates, rand); + mapper = createMapper(jsonImpl, rand); + } + + protected ModelTestCase(EnumSet jsonImplCandidates) { + this(jsonImplCandidates, RAND); + } + + protected ModelTestCase(JsonImpl jsonImpl) { + this(EnumSet.of(jsonImpl), RAND); + } + + protected ModelTestCase(int rand) { + this(EnumSet.allOf(JsonImpl.class), rand); + } + + protected ModelTestCase() { + this(EnumSet.allOf(JsonImpl.class), RAND); + } + + protected String toJson(T value) { + return toJson(value, mapper); + } + + public static String toJson(T value, JsonpMapper mapper) { + StringWriter sw = new StringWriter(); + JsonProvider provider = mapper.jsonProvider(); + JsonGenerator generator = provider.createGenerator(sw); + mapper.serialize(value, generator); + generator.close(); + return sw.toString(); + } + + public static T fromJson(String json, Class clazz, JsonpMapper mapper) { + return fromJson(json, (Type)clazz, mapper); + } + + public static T fromJson(String json, Type type, JsonpMapper mapper) { + JsonParser parser = mapper.jsonProvider().createParser(new StringReader(json)); + return mapper.deserialize(parser, type); + } + + protected T fromJson(String json, Class clazz) { + return fromJson(json, clazz, mapper); + } + + protected T fromJson(String json, Type type) { + return fromJson(json, type, mapper); + } + + @SuppressWarnings("unchecked") + protected T checkJsonRoundtrip(T value, String expectedJson) { + assertEquals(expectedJson, toJson(value)); + return fromJson(expectedJson, (Class)value.getClass()); + } + + protected T fromJson(String json, JsonpDeserializer deserializer) { + return fromJson(json, deserializer, mapper); + } + + protected T fromJson(String json, JsonpDeserializer deserializer, JsonpMapper mapper) { + JsonParser parser = mapper.jsonProvider().createParser(new StringReader(json)); + return deserializer.deserialize(parser, mapper); + } + + + public static void assertGetterType(Class expected, Class clazz, String name) { + Method method; + try { + method = clazz.getMethod(name); + } catch (NoSuchMethodException e) { + fail("Getter '" + clazz.getName() + "." + name + "' doesn't exist"); + return; + } + + assertSame(expected, method.getReturnType()); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java index 4d3449984..b79b03893 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java @@ -24,7 +24,7 @@ import co.elastic.clients.transport.rest_client.RestClientTransport; import com.sun.net.httpserver.HttpServer; import org.apache.http.HttpHost; -import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -68,8 +68,8 @@ public void testXMLResponse() throws Exception { assertEquals(401, ex.statusCode()); assertEquals("es/cat.indices", ex.endpointId()); - // Cause is transport-dependent - ResponseException restException = (ResponseException) ex.getCause(); - assertEquals(401, restException.getResponse().getStatusLine().getStatusCode()); + // Original response is transport-dependent + Response restClientResponse = (Response)ex.response().originalResponse(); + assertEquals(401, restClientResponse.getStatusLine().getStatusCode()); } } diff --git a/java-client/src/test/java/co/elastic/clients/transport/endpoints/BooleanEndpointTest.java b/java-client/src/test/java/co/elastic/clients/transport/VersionInfoTest.java similarity index 52% rename from java-client/src/test/java/co/elastic/clients/transport/endpoints/BooleanEndpointTest.java rename to java-client/src/test/java/co/elastic/clients/transport/VersionInfoTest.java index c3fd3e2f1..9170923b9 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/endpoints/BooleanEndpointTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/VersionInfoTest.java @@ -17,25 +17,20 @@ * under the License. */ -package co.elastic.clients.transport.endpoints; +package co.elastic.clients.transport; -import co.elastic.clients.elasticsearch.core.ExistsRequest; -import co.elastic.clients.elasticsearch.logstash.PutPipelineRequest; -import co.elastic.clients.util.ApiTypeHelper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class BooleanEndpointTest extends Assertions { - +public class VersionInfoTest { @Test - public void testHasRequestBody() { - ExistsRequest er = ExistsRequest.of(r -> r.index("foo").id("1")); - assertNull(ExistsRequest._ENDPOINT.body(er)); + public void testFlavor() { + Assertions.assertEquals("stack", VersionInfo.FLAVOR); + } - // This type has a lot of required properties that aren't the purpose of this test - try (ApiTypeHelper.DisabledChecksHandle handle = ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(true)) { - PutPipelineRequest ppr = PutPipelineRequest.of(r -> r); - assertNotNull(PutPipelineRequest._ENDPOINT.body(ppr)); - } + @Test + public void testClientMeta() { + String version = VersionInfo.VERSION; + Assertions.assertTrue(ElasticsearchTransportBase.getClientMeta().startsWith("es=" + version + ",jv=")); } } diff --git a/java-client/src/test/java/co/elastic/clients/transport/endpoints/EndpointBaseTest.java b/java-client/src/test/java/co/elastic/clients/transport/endpoints/EndpointBaseTest.java index e47c11093..296bc3b20 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/endpoints/EndpointBaseTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/endpoints/EndpointBaseTest.java @@ -19,10 +19,13 @@ package co.elastic.clients.transport.endpoints; -import org.junit.jupiter.api.Assertions; +import co.elastic.clients.elasticsearch._types.ErrorResponse; +import co.elastic.clients.elasticsearch.core.PingRequest; +import co.elastic.clients.testkit.ModelTestCase; +import co.elastic.clients.transport.Endpoint; import org.junit.jupiter.api.Test; -public class EndpointBaseTest extends Assertions { +public class EndpointBaseTest extends ModelTestCase { @Test public void testPathEncoding() { @@ -39,4 +42,27 @@ private String pathEncode(String s) { EndpointBase.pathEncode(s, sb); return sb.toString(); } + + @Test + public void testErrorDecoding() { + Endpoint endpoint = PingRequest._ENDPOINT; + + { + String json = "{\"error\":\"some error\"}"; + + ErrorResponse response = fromJson(json, endpoint.errorDeserializer(404)); + assertEquals(404, response.status()); + assertEquals("some error", response.error().reason()); + assertEquals("http_status_404", response.error().type()); + } + + { + String json = "{\"status\":401,\"error\":{\"type\":\"the_error_type\",\"reason\":\"some error\"}}"; + + ErrorResponse response = fromJson(json, endpoint.errorDeserializer(404)); + assertEquals(401, response.status()); // value in response body has precedence + assertEquals("some error", response.error().reason()); + assertEquals("the_error_type", response.error().type()); + } + } } diff --git a/java-client/src/test/java/co/elastic/clients/transport/http/HeaderMapTest.java b/java-client/src/test/java/co/elastic/clients/transport/http/HeaderMapTest.java new file mode 100644 index 000000000..83dee0f8e --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/http/HeaderMapTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.http; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.Map; + +class HeaderMapTest extends Assertions { + + @Test + public void testCaseSensitivity() { + HeaderMap headers = new HeaderMap(); + + headers.put("Foo", "bar"); + assertEquals("bar", headers.get("Foo")); + assertEquals("bar", headers.get("foo")); + assertEquals("bar", headers.get("fOO")); + + headers.put("foo", "baz"); + assertEquals("baz", headers.get("Foo")); + assertEquals("baz", headers.get("foo")); + assertEquals("baz", headers.get("fOO")); + } + + @Test + public void testLock() { + HeaderMap headers = new HeaderMap(); + + headers.put("foo", "bar"); + + HeaderMap locked = headers.locked(); + assertEquals("bar", headers.get("Foo")); + + assertThrows(UnsupportedOperationException.class, () -> { + locked.put("foo", "baz"); + }); + + assertThrows(UnsupportedOperationException.class, () -> { + Iterator> iterator = locked.entrySet().iterator(); + assertEquals("bar", iterator.next().getValue()); + iterator.remove(); + }); + + headers.put("foo", "baz"); + assertEquals("baz", headers.get("Foo")); + assertEquals("bar", locked.get("Foo")); + } + + @Test + public void testAdd() { + HeaderMap headers = new HeaderMap(); + + headers.add("Foo", "bar"); + headers.add("foo", "baz"); + + assertEquals("bar; baz", headers.get("Foo")); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java index 488e55ff1..cd6558a4f 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java @@ -20,7 +20,9 @@ package co.elastic.clients.transport.rest_client; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BooleanResponse; import com.sun.net.httpserver.Headers; @@ -58,6 +60,8 @@ public static void setup() throws IOException { // Register a handler on the core.exists("capture-handler/{name}") endpoint that will capture request headers. httpServer.createContext("/capture-headers/_doc/", exchange -> { String testName = exchange.getRequestURI().getPath().substring("/capture-headers/_doc/".length()); + System.out.println(exchange.getResponseHeaders()); + System.out.println(); collectedHeaders.put(testName, exchange.getRequestHeaders()); // Reply with an empty 200 response @@ -75,6 +79,15 @@ public static void cleanup() { httpServer = null; collectedHeaders = null; } + + private ElasticsearchTransport newRestClientTransport(RestClient restClient, JsonpMapper mapper) { + return newRestClientTransport(restClient, mapper, null); + } + + private ElasticsearchTransport newRestClientTransport(RestClient restClient, JsonpMapper mapper, RestClientOptions options) { + return new RestClientTransport(restClient, mapper, options); + //return new RestClientMonolithTransport(restClient, mapper, options); + } /** * Make a server call, capture request headers and check their consistency. @@ -114,7 +127,7 @@ void testNoRequestOptions() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - RestClientTransport transport = new RestClientTransport(llrc, new SimpleJsonpMapper()); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport); String id = checkHeaders(esClient); @@ -127,7 +140,7 @@ void testTransportRequestOptions() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - RestClientTransport transport = new RestClientTransport(llrc, new SimpleJsonpMapper(), + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()).build() ); ElasticsearchClient esClient = new ElasticsearchClient(transport); @@ -142,7 +155,7 @@ void testClientRequestOptions() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - RestClientTransport transport = new RestClientTransport(llrc, new SimpleJsonpMapper()); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport).withTransportOptions( new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder()).build() ); @@ -157,7 +170,7 @@ void testLambdaOptionsBuilder() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - RestClientTransport transport = new RestClientTransport(llrc, new SimpleJsonpMapper()); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper()); ElasticsearchClient esClient = new ElasticsearchClient(transport) .withTransportOptions(o -> o .addHeader("Foo", "bar") @@ -179,7 +192,7 @@ void testRequestOptionsOverridingBuiltin() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - RestClientTransport transport = new RestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options)); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options)); ElasticsearchClient esClient = new ElasticsearchClient(transport); // Should not override client meta String id = checkHeaders(esClient); diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java new file mode 100644 index 000000000..2910ed5d5 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer; +import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class SafeResponseConsumerTest { + + static HttpServer Server; + static HttpHost ESHost; + + // A consumer factory that throws an Error, to simulate the effect of an OOME + static HttpAsyncResponseConsumerFactory FailingConsumerFactory = () -> new HeapBufferedAsyncResponseConsumer(100 * 1024 * 1024) { + @Override + protected void onResponseReceived(HttpResponse httpResponse) throws HttpException, IOException { + super.onResponseReceived(httpResponse); + } + + @Override + protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException { + super.onContentReceived(decoder, ioctrl); + throw new Error("Error in onContentReceived"); + } + + @Override + protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException { + super.onEntityEnclosed(entity, contentType); + } + }; + + @BeforeAll + public static void setup() throws Exception { + Server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + Server.start(); + + Server.createContext("/", exchange -> { + String path = exchange.getRequestURI().getPath(); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + + if (path.equals("/")) { + byte[] bytes = Info.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + exchange.getResponseBody().write(bytes); + exchange.close(); + return; + } + + exchange.sendResponseHeaders(404, -1); + exchange.close(); + }); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + Server.stop(1); + } catch (Exception e) { + // Ignore + } + })); + + ESHost = new HttpHost(Server.getAddress().getAddress(), Server.getAddress().getPort()); + } + + @AfterAll + public static void tearDown() { + Server.stop(0); + } + + @Test + public void testReactorDeath() throws Exception { + + // Request options that will simulate an OOME and cause the reactor to die + RequestOptions.Builder failingOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + failingOptionsBuilder.setHttpAsyncResponseConsumerFactory(FailingConsumerFactory); + RequestOptions failingOptions = failingOptionsBuilder.build(); + + RestClient restClient = RestClient.builder(ESHost).build(); + + // First request, to warm things up. + // An "indice exists" request, that has no response body + Request existsReq = new Request("HEAD", "/index-name"); + restClient.performRequest(existsReq); + + try { + Request infoReq = new Request("GET", "/"); + infoReq.setOptions(failingOptions); + + restClient.performRequest(infoReq); + Assertions.fail("First request should not succeed"); + } catch(Exception t) { +// System.err.println("Request 1 error"); +// t.printStackTrace(); + } + + Thread.sleep(1000); + + try { + // 2nd request with no specific options + Request infoReq = new Request("GET", "/"); + restClient.performRequest(infoReq); + Assertions.fail("Second request should not succeed"); + } catch(Exception t) { +// System.err.println("Request 2 error"); +// t.printStackTrace(); + } + + restClient.close(); + } + + @Test + public void testReactorSurvival() throws Exception { + + // Request options that will simulate an OOME and wrapped in the safe consumer that will + // avoid the reactor's death + RequestOptions.Builder protectedFailingOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + protectedFailingOptionsBuilder.setHttpAsyncResponseConsumerFactory(() -> + new SafeResponseConsumer<>(FailingConsumerFactory.createHttpAsyncResponseConsumer()) + ); + RequestOptions protectedFailingOptions = protectedFailingOptionsBuilder.build(); + + RestClient restClient = RestClient.builder(ESHost).build(); + + // First request, to warm things up. + // An "indice exists" request, that has no response body + Request existsReq = new Request("HEAD", "/index-name"); + restClient.performRequest(existsReq); + + try { + Request infoReq = new Request("GET", "/"); + infoReq.setOptions(protectedFailingOptions); + + restClient.performRequest(infoReq); + Assertions.fail("First request should not succeed"); + } catch(Exception t) { + // System.err.println("Request 1 error"); + // t.printStackTrace(); + } + + { + // 2nd request with no specific options + Request infoReq = new Request("GET", "/"); + + Response resp = restClient.performRequest(infoReq); + Assertions.assertEquals(200, resp.getStatusLine().getStatusCode()); + } + + restClient.close(); + } + + private static final String Info = "{\n" + + " \"cluster_name\": \"foo\",\n" + + " \"cluster_uuid\": \"bar\",\n" + + " \"version\": {\n" + + " \"build_date\": \"2022-01-28T08:36:04.875279988Z\",\n" + + " \"minimum_wire_compatibility_version\": \"6.8.0\",\n" + + " \"build_hash\": \"bee86328705acaa9a6daede7140defd4d9ec56bd\",\n" + + " \"number\": \"7.17.0\",\n" + + " \"lucene_version\": \"8.11.1\",\n" + + " \"minimum_index_compatibility_version\": \"6.0.0-beta1\",\n" + + " \"build_flavor\": \"default\",\n" + + " \"build_snapshot\": false,\n" + + " \"build_type\": \"docker\"\n" + + " },\n" + + " \"name\": \"instance-0000000000\",\n" + + " \"tagline\": \"You Know, for Search\"\n" + + "}"; +}