Skip to content

Commit 44c5c84

Browse files
committed
Enable DisableDirectStreaming on a per request basis
Closes #2166 (cherry picked from commit 8196a60)
1 parent dde789b commit 44c5c84

File tree

8 files changed

+78
-43
lines changed

8 files changed

+78
-43
lines changed

src/Elasticsearch.Net/Configuration/RequestConfiguration.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ public interface IRequestConfiguration
4848
/// </summary>
4949
bool? DisablePing { get; set; }
5050

51+
/// <summary>
52+
/// Whether to buffer the request and response bytes for the call
53+
/// </summary>
54+
bool? DisableDirectStreaming { get; set; }
55+
5156
/// <summary>
5257
/// Treat the following statuses (on top of the 200 range) NOT as error.
5358
/// </summary>
@@ -81,6 +86,7 @@ public class RequestConfiguration : IRequestConfiguration
8186
public Uri ForceNode { get; set; }
8287
public bool? DisableSniff { get; set; }
8388
public bool? DisablePing { get; set; }
89+
public bool? DisableDirectStreaming { get; set; }
8490
public IEnumerable<int> AllowedStatusCodes { get; set; }
8591
public BasicAuthenticationCredentials BasicAuthenticationCredentials { get; set; }
8692
public bool EnableHttpPipelining { get; set; } = true;
@@ -94,7 +100,6 @@ public class RequestConfiguration : IRequestConfiguration
94100

95101
public class RequestConfigurationDescriptor : IRequestConfiguration
96102
{
97-
98103
private IRequestConfiguration Self => this;
99104
TimeSpan? IRequestConfiguration.RequestTimeout { get; set; }
100105
TimeSpan? IRequestConfiguration.PingTimeout { get; set; }
@@ -105,6 +110,7 @@ public class RequestConfigurationDescriptor : IRequestConfiguration
105110
Uri IRequestConfiguration.ForceNode { get; set; }
106111
bool? IRequestConfiguration.DisableSniff { get; set; }
107112
bool? IRequestConfiguration.DisablePing { get; set; }
113+
bool? IRequestConfiguration.DisableDirectStreaming { get; set; }
108114
IEnumerable<int> IRequestConfiguration.AllowedStatusCodes { get; set; }
109115
BasicAuthenticationCredentials IRequestConfiguration.BasicAuthenticationCredentials { get; set; }
110116
bool IRequestConfiguration.EnableHttpPipelining { get; set; } = true;
@@ -120,6 +126,7 @@ public RequestConfigurationDescriptor(IRequestConfiguration config)
120126
Self.ForceNode = config?.ForceNode;
121127
Self.DisableSniff = config?.DisableSniff;
122128
Self.DisablePing = config?.DisablePing;
129+
Self.DisableDirectStreaming = config?.DisableDirectStreaming;
123130
Self.AllowedStatusCodes = config?.AllowedStatusCodes;
124131
Self.BasicAuthenticationCredentials = config?.BasicAuthenticationCredentials;
125132
Self.EnableHttpPipelining = config?.EnableHttpPipelining ?? true;
@@ -184,6 +191,12 @@ public RequestConfigurationDescriptor DisablePing(bool? disable = true)
184191
return this;
185192
}
186193

194+
public RequestConfigurationDescriptor DisableDirectStreaming(bool? disable = true)
195+
{
196+
Self.DisableDirectStreaming = disable;
197+
return this;
198+
}
199+
187200
public RequestConfigurationDescriptor ForceNode(Uri uri)
188201
{
189202
Self.ForceNode = uri;

src/Elasticsearch.Net/Configuration/Security/BasicAuthenticationCredentials.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ public class BasicAuthenticationCredentials
88

99
public string Password { get; set; }
1010

11-
public override string ToString()
12-
{
13-
return this.Username + ":" + this.Password;
14-
}
11+
public override string ToString() => $"{this.Username}:{this.Password}";
1512
}
1613
}

src/Elasticsearch.Net/Domain/RequestParameters/IRequestParameters.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,5 @@ public interface IRequestParameters
2727
TOut GetQueryStringValue<TOut>(string name);
2828

2929
void AddQueryStringValue(string name, object value);
30-
31-
3230
}
3331
}

src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Collections.Specialized;
44
using System.IO;
55
using System.Linq;
6-
using System.Threading;
76
using Purify;
87

98
namespace Elasticsearch.Net
@@ -39,16 +38,15 @@ public class RequestData
3938
public string ProxyUsername { get; }
4039
public string ProxyPassword { get; }
4140
public bool DisableAutomaticProxyDetection { get; }
41+
4242
public BasicAuthenticationCredentials BasicAuthorizationCredentials { get; }
4343
public IEnumerable<int> AllowedStatusCodes { get; }
4444
public Func<IApiCallDetails, Stream, object> CustomConverter { get; private set; }
4545
public IConnectionConfigurationValues ConnectionSettings { get; }
4646
public IMemoryStreamFactory MemoryStreamFactory { get; }
4747

4848
public RequestData(HttpMethod method, string path, PostData<object> data, IConnectionConfigurationValues global, IRequestParameters local, IMemoryStreamFactory memoryStreamFactory)
49-
#pragma warning disable CS0618 // Type or member is obsolete
50-
: this(method, path, data, global, (IRequestConfiguration)local?.RequestConfiguration, memoryStreamFactory)
51-
#pragma warning restore CS0618 // Type or member is obsolete
49+
: this(method, path, data, global, local?.RequestConfiguration, memoryStreamFactory)
5250
{
5351
this.CustomConverter = local?.DeserializationOverride;
5452
this.Path = this.CreatePathWithQueryStrings(path, this.ConnectionSettings, local);
@@ -66,13 +64,17 @@ private RequestData(
6664
this.MemoryStreamFactory = memoryStreamFactory;
6765
this.Method = method;
6866
this.PostData = data;
67+
68+
if (data != null)
69+
data.DisableDirectStreaming = local?.DisableDirectStreaming ?? global.DisableDirectStreaming;
70+
6971
this.Path = this.CreatePathWithQueryStrings(path, this.ConnectionSettings, null);
7072

71-
this.Pipelined = global.HttpPipeliningEnabled || (local?.EnableHttpPipelining).GetValueOrDefault(false);
73+
this.Pipelined = local?.EnableHttpPipelining ?? global.HttpPipeliningEnabled;
7274
this.HttpCompression = global.EnableHttpCompression;
7375
this.ContentType = local?.ContentType ?? MimeType;
7476
this.Accept = local?.Accept ?? MimeType;
75-
this.Headers = new NameValueCollection(global.Headers ?? new NameValueCollection());
77+
this.Headers = global.Headers != null ? new NameValueCollection(global.Headers) : new NameValueCollection();
7678
this.RunAs = local?.RunAs;
7779

7880
this.RequestTimeout = local?.RequestTimeout ?? global.RequestTimeout;

src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
using System;
2-
using System.Collections.Generic;
32
using System.IO;
4-
using System.Linq;
5-
using System.Text;
63
using System.Threading;
74
using System.Threading.Tasks;
85

@@ -12,18 +9,23 @@ public class ResponseBuilder<TReturn>
129
where TReturn : class
1310
{
1411
public const int BufferSize = 8096;
12+
private static readonly VoidResponse Void = new VoidResponse();
13+
private static readonly IDisposable EmptyDisposable = new MemoryStream();
14+
15+
private readonly RequestData _requestData;
16+
private readonly CancellationToken _cancellationToken;
17+
private readonly bool _disableDirectStreaming;
1518

1619
public Exception Exception { get; set; }
1720
public int? StatusCode { get; set; }
1821
public Stream Stream { get; set; }
1922

20-
private RequestData _requestData;
21-
private CancellationToken _cancellationToken;
22-
2323
public ResponseBuilder(RequestData requestData, CancellationToken cancellationToken = default(CancellationToken))
2424
{
2525
_requestData = requestData;
2626
_cancellationToken = cancellationToken;
27+
_disableDirectStreaming =
28+
this._requestData.PostData?.DisableDirectStreaming ?? this._requestData.ConnectionSettings.DisableDirectStreaming;
2729
}
2830

2931
public ElasticsearchResponse<TReturn> ToResponse()
@@ -56,8 +58,6 @@ private ElasticsearchResponse<TReturn> Initialize(int? statusCode, Exception exc
5658
return response;
5759
}
5860

59-
private static IDisposable EmptyDisposable = new MemoryStream();
60-
6161
private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
6262
{
6363
byte[] bytes = null;
@@ -84,7 +84,7 @@ private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
8484
ServerError serverError;
8585
if (ServerError.TryCreate(stream, out serverError))
8686
response.ServerError = serverError;
87-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
87+
if (_disableDirectStreaming)
8888
response.ResponseBodyInBytes = bytes;
8989
}
9090
}
@@ -114,7 +114,7 @@ private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream
114114
else if (response.HttpStatusCode != null)
115115
{
116116
response.ServerError = await ServerError.TryCreateAsync(stream, this._cancellationToken).ConfigureAwait(false);
117-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
117+
if (_disableDirectStreaming)
118118
response.ResponseBodyInBytes = bytes;
119119
}
120120
}
@@ -130,7 +130,7 @@ private void Finalize(ElasticsearchResponse<TReturn> response)
130130
}
131131

132132
private bool NeedsToEagerReadStream() =>
133-
this._requestData.ConnectionSettings.DisableDirectStreaming || typeof(TReturn) == typeof(string) || typeof(TReturn) == typeof(byte[]);
133+
_disableDirectStreaming || typeof(TReturn) == typeof(string) || typeof(TReturn) == typeof(byte[]);
134134

135135
private byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
136136
{
@@ -139,13 +139,12 @@ private byte[] SwapStreams(ref Stream responseStream, ref MemoryStream ms)
139139
responseStream = ms;
140140
responseStream.Position = 0;
141141
return bytes;
142-
143142
}
144143

145144
private bool SetSpecialTypes(Stream responseStream, ElasticsearchResponse<TReturn> cs, byte[] bytes)
146145
{
147146
var setSpecial = true;
148-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
147+
if (_disableDirectStreaming)
149148
cs.ResponseBodyInBytes = bytes;
150149
var returnType = typeof(TReturn);
151150
if (returnType == typeof(string))
@@ -167,12 +166,10 @@ private bool SetSpecialTypes(Stream responseStream, ElasticsearchResponse<TRetur
167166

168167
private void SetStreamResult(ElasticsearchResponse<Stream> result, Stream response) => result.Body = response;
169168

170-
private static VoidResponse _void = new VoidResponse();
171-
172169
private void SetVoidResult(ElasticsearchResponse<VoidResponse> result, Stream response)
173170
{
174171
response.Dispose();
175-
result.Body = _void;
172+
result.Body = Void;
176173
}
177174
}
178175
}

src/Elasticsearch.Net/Transport/PostData.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
using System.Collections.Generic;
2-
using Purify;
32
using System.IO;
4-
using System.Reflection;
53
using System.Threading;
64
using System.Threading.Tasks;
75

@@ -29,6 +27,7 @@ public class PostData<T> : IPostData<T>
2927
private readonly IEnumerable<object> _enumerableOfObject;
3028
private readonly T _serializable;
3129

30+
public bool? DisableDirectStreaming { get; set; }
3231
public byte[] WrittenBytes { get; private set; }
3332
public PostType Type { get; }
3433

@@ -64,7 +63,8 @@ public PostData(T item)
6463
public void Write(Stream writableStream, IConnectionConfigurationValues settings)
6564
{
6665
var indent = settings.PrettyJson ? SerializationFormatting.Indented : SerializationFormatting.None;
67-
MemoryStream ms = null; Stream stream = null;
66+
MemoryStream ms = null;
67+
Stream stream = null;
6868
switch (Type)
6969
{
7070
case PostType.ByteArray:
@@ -79,7 +79,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
7979
case PostType.EnumerableOfObject:
8080
if (!_enumerableOfObject.HasAny()) return;
8181

82-
if (settings.DisableDirectStreaming)
82+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
8383
{
8484
ms = new MemoryStream();
8585
stream = ms;
@@ -93,7 +93,7 @@ public void Write(Stream writableStream, IConnectionConfigurationValues settings
9393
break;
9494
case PostType.Serializable:
9595
stream = writableStream;
96-
if (settings.DisableDirectStreaming)
96+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
9797
{
9898
ms = new MemoryStream();
9999
stream = ms;
@@ -127,7 +127,7 @@ public async Task WriteAsync(Stream writableStream, IConnectionConfigurationValu
127127
break;
128128
case PostType.EnumerableOfObject:
129129
if (!_enumerableOfObject.HasAny()) return;
130-
if (settings.DisableDirectStreaming)
130+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
131131
{
132132
ms = new MemoryStream();
133133
stream = ms;
@@ -141,7 +141,7 @@ public async Task WriteAsync(Stream writableStream, IConnectionConfigurationValu
141141
break;
142142
case PostType.Serializable:
143143
stream = writableStream;
144-
if (settings.DisableDirectStreaming)
144+
if (this.DisableDirectStreaming ?? settings.DisableDirectStreaming)
145145
{
146146
ms = new MemoryStream();
147147
stream = ms;

src/Nest/CommonAbstractions/Request/RequestBase.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public IRequestConfiguration RequestConfiguration
6969
}
7070
}
7171

72-
7372
public abstract class RequestDescriptorBase<TDescriptor, TParameters, TInterface> : RequestBase<TParameters>, IDescriptor
7473
where TDescriptor : RequestDescriptorBase<TDescriptor, TParameters, TInterface>, TInterface
7574
where TParameters : FluentRequestParameters<TParameters>, new()

src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
using Elasticsearch.Net;
2-
using FluentAssertions;
1+
using FluentAssertions;
32
using Nest;
43
using System;
5-
using System.Collections.Generic;
6-
using System.Linq;
7-
using System.Text;
8-
using System.Threading.Tasks;
94
using Tests.Framework;
105

116
namespace Tests.ClientConcepts.LowLevel
@@ -79,5 +74,39 @@ public void EnableDirectStreamingOnSuccess()
7974
response = client.SearchAsync<object>(s => s).Result;
8075
assert(response);
8176
}
77+
78+
[U]
79+
public void DisableDirectStreamingOnRequest()
80+
{
81+
Action<IResponse> assert = r =>
82+
{
83+
r.ApiCall.Should().NotBeNull();
84+
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
85+
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
86+
};
87+
88+
var client = TestClient.GetFixedReturnClient(new { });
89+
var response = client.Search<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming()));
90+
assert(response);
91+
response = client.SearchAsync<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming())).Result;
92+
assert(response);
93+
}
94+
95+
[U]
96+
public void EnableDirectStreamingOnRequest()
97+
{
98+
Action<IResponse> assert = r =>
99+
{
100+
r.ApiCall.Should().NotBeNull();
101+
r.ApiCall.RequestBodyInBytes.Should().BeNull();
102+
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
103+
};
104+
105+
var client = TestClient.GetFixedReturnClient(new { }, 200, c => c.DisableDirectStreaming());
106+
var response = client.Search<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming(false)));
107+
assert(response);
108+
response = client.SearchAsync<object>(s => s.RequestConfiguration(r => r.DisableDirectStreaming(false))).Result;
109+
assert(response);
110+
}
82111
}
83112
}

0 commit comments

Comments
 (0)