Skip to content

Commit 658329a

Browse files
authored
fix #2311 null GetResponseStream() on HttpWebException (#2314)
* fix #2311 GetResponseStream() on HttpWebException can return null and we should close the response stream instead in those cases because we can not rely on closing the stream. * make sure we did not break ElasticsearchResponse<Stream> Conflicts: src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs
1 parent 21561e6 commit 658329a

File tree

11 files changed

+301
-140
lines changed

11 files changed

+301
-140
lines changed

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.IO;
23
using System.IO.Compression;
34
using System.Net;
45
using System.Text;
@@ -51,7 +52,7 @@ protected virtual HttpWebRequest CreateWebRequest(RequestData requestData)
5152
request.Headers.Add("Content-Encoding", "gzip");
5253
}
5354
if (!requestData.RunAs.IsNullOrEmpty())
54-
request.Headers.Add("es-security-runas-user", requestData.RunAs);
55+
request.Headers.Add("es-shield-runas-user", requestData.RunAs);
5556

5657
if (requestData.Headers != null && requestData.Headers.HasKeys())
5758
request.Headers.Add(requestData.Headers);
@@ -142,6 +143,9 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
142143
var response = (HttpWebResponse)request.GetResponse();
143144
builder.StatusCode = (int)response.StatusCode;
144145
builder.Stream = response.GetResponseStream();
146+
// https://github.com/elastic/elasticsearch-net/issues/2311
147+
// if stream is null call dispose on response instead.
148+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
145149
}
146150
catch (WebException e)
147151
{
@@ -179,6 +183,9 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
179183
var response = (HttpWebResponse)(await request.GetResponseAsync().ConfigureAwait(false));
180184
builder.StatusCode = (int)response.StatusCode;
181185
builder.Stream = response.GetResponseStream();
186+
// https://github.com/elastic/elasticsearch-net/issues/2311
187+
// if stream is null call dispose on response instead.
188+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
182189
}
183190
catch (WebException e)
184191
{
@@ -197,6 +204,9 @@ private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebExcep
197204
{
198205
builder.StatusCode = (int)response.StatusCode;
199206
builder.Stream = response.GetResponseStream();
207+
// https://github.com/elastic/elasticsearch-net/issues/2311
208+
// if stream is null call dispose on response instead.
209+
if (builder.Stream == null || builder.Stream == Stream.Null) response.Dispose();
200210
}
201211
}
202212

src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class RequestData
2121
public AuditEvent OnFailureAuditEvent => this.MadeItToResponse ? AuditEvent.BadResponse : AuditEvent.BadRequest;
2222
public PipelineFailure OnFailurePipelineFailure => this.MadeItToResponse ? PipelineFailure.BadResponse : PipelineFailure.BadRequest;
2323

24-
public Node Node { get; internal set; }
24+
public Node Node { get; set; }
2525
public TimeSpan RequestTimeout { get; }
2626
public TimeSpan PingTimeout { get; }
2727
public int KeepAliveTime { get; }

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -121,20 +121,20 @@ public void MarkDead(Node node)
121121
public void FirstPoolUsage(SemaphoreSlim semaphore)
122122
{
123123
if (!this.FirstPoolUsageNeedsSniffing) return;
124-
if (!semaphore.Wait(this._settings.RequestTimeout))
125-
{
126-
if (this.FirstPoolUsageNeedsSniffing)
127-
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
128-
return;
129-
}
130-
131-
if (!this.FirstPoolUsageNeedsSniffing)
132-
{
133-
semaphore.Release();
134-
return;
135-
}
136-
137-
try
124+
if (!semaphore.Wait(this._settings.RequestTimeout))
125+
{
126+
if (this.FirstPoolUsageNeedsSniffing)
127+
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
128+
return;
129+
}
130+
131+
if (!this.FirstPoolUsageNeedsSniffing)
132+
{
133+
semaphore.Release();
134+
return;
135+
}
136+
137+
try
138138
{
139139
using (this.Audit(SniffOnStartup))
140140
{
@@ -152,18 +152,18 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
152152
{
153153
if (!this.FirstPoolUsageNeedsSniffing) return;
154154
var success = await semaphore.WaitAsync(this._settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
155-
if (!success)
156-
{
157-
if(this.FirstPoolUsageNeedsSniffing)
158-
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
159-
return;
160-
}
155+
if (!success)
156+
{
157+
if(this.FirstPoolUsageNeedsSniffing)
158+
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
159+
return;
160+
}
161161

162162
if (!this.FirstPoolUsageNeedsSniffing)
163-
{
164-
semaphore.Release();
165-
return;
166-
}
163+
{
164+
semaphore.Release();
165+
return;
166+
}
167167
try
168168
{
169169
using (this.Audit(SniffOnStartup))

src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ private ElasticsearchResponse<TReturn> Initialize(int? statusCode, Exception exc
5656
return response;
5757
}
5858

59+
private static IDisposable EmptyDisposable = new MemoryStream();
60+
5961
private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
6062
{
6163
byte[] bytes = null;
@@ -64,23 +66,27 @@ private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
6466
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
6567
stream.CopyTo(inMemoryStream, BufferSize);
6668
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
67-
}
68-
69-
if (response.Success)
70-
{
71-
if (!SetSpecialTypes(stream, response, bytes))
72-
{
73-
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
74-
else response.Body = this._requestData.ConnectionSettings.Serializer.Deserialize<TReturn>(stream);
75-
}
76-
}
77-
else if (response.HttpStatusCode != null)
78-
{
79-
ServerError serverError;
80-
if (ServerError.TryCreate(stream, out serverError))
81-
response.ServerError = serverError;
82-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
83-
response.ResponseBodyInBytes = bytes;
69+
}
70+
71+
var needsDispose = typeof(TReturn) != typeof(Stream);
72+
using (needsDispose ? stream : EmptyDisposable)
73+
{
74+
if (response.Success)
75+
{
76+
if (!SetSpecialTypes(stream, response, bytes))
77+
{
78+
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
79+
else response.Body = this._requestData.ConnectionSettings.Serializer.Deserialize<TReturn>(stream);
80+
}
81+
}
82+
else if (response.HttpStatusCode != null)
83+
{
84+
ServerError serverError;
85+
if (ServerError.TryCreate(stream, out serverError))
86+
response.ServerError = serverError;
87+
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
88+
response.ResponseBodyInBytes = bytes;
89+
}
8490
}
8591
}
8692

@@ -94,20 +100,24 @@ private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream
94100
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
95101
}
96102

97-
if (response.Success)
98-
{
99-
if (!SetSpecialTypes(stream, response, bytes))
100-
{
101-
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
102-
else response.Body = await this._requestData.ConnectionSettings.Serializer.DeserializeAsync<TReturn>(stream, this._cancellationToken).ConfigureAwait(false);
103-
}
104-
}
105-
else if (response.HttpStatusCode != null)
106-
{
107-
response.ServerError = await ServerError.TryCreateAsync(stream, this._cancellationToken).ConfigureAwait(false);
108-
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
109-
response.ResponseBodyInBytes = bytes;
110-
}
103+
var needsDispose = typeof(TReturn) != typeof(Stream);
104+
using (needsDispose ? stream : EmptyDisposable)
105+
{
106+
if (response.Success)
107+
{
108+
if (!SetSpecialTypes(stream, response, bytes))
109+
{
110+
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
111+
else response.Body = await this._requestData.ConnectionSettings.Serializer.DeserializeAsync<TReturn>(stream, this._cancellationToken).ConfigureAwait(false);
112+
}
113+
}
114+
else if (response.HttpStatusCode != null)
115+
{
116+
response.ServerError = await ServerError.TryCreateAsync(stream, this._cancellationToken).ConfigureAwait(false);
117+
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
118+
response.ResponseBodyInBytes = bytes;
119+
}
120+
}
111121
}
112122

113123
private void Finalize(ElasticsearchResponse<TReturn> response)
@@ -136,16 +146,16 @@ private bool SetSpecialTypes(Stream responseStream, ElasticsearchResponse<TRetur
136146
{
137147
var setSpecial = true;
138148
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
139-
cs.ResponseBodyInBytes = bytes;
140-
var returnType = typeof (TReturn);
149+
cs.ResponseBodyInBytes = bytes;
150+
var returnType = typeof(TReturn);
141151
if (returnType == typeof(string))
142152
this.SetStringResult(cs as ElasticsearchResponse<string>, bytes);
143153
else if (returnType == typeof(byte[]))
144154
this.SetByteResult(cs as ElasticsearchResponse<byte[]>, bytes);
145155
else if (returnType == typeof(VoidResponse))
146-
this.SetVoidResult(cs as ElasticsearchResponse<VoidResponse>, responseStream);
147-
else if (returnType == typeof(Stream))
148-
this.SetStreamResult(cs as ElasticsearchResponse<Stream>, responseStream);
156+
this.SetVoidResult(cs as ElasticsearchResponse<VoidResponse>, responseStream);
157+
else if (returnType == typeof(Stream))
158+
this.SetStreamResult(cs as ElasticsearchResponse<Stream>, responseStream);
149159
else
150160
setSpecial = false;
151161
return setSpecial;

src/Tests/ClientConcepts/ConnectionPooling/BuildingBlocks/RequestPipelines.doc.cs

Lines changed: 69 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ private IRequestPipeline CreatePipeline(
6666
var settings = new ConnectionSettings(pool, connection ?? new InMemoryConnection());
6767
settings = settingsSelector?.Invoke(settings) ?? settings;
6868
return new FixedPipelineFactory(settings, dateTimeProvider ?? DateTimeProvider.Default).Pipeline;
69-
}
70-
69+
}
70+
7171
/**=== Pipeline Behavior
7272
*==== Sniffing on First usage
73-
*/
73+
*/
7474
[U]
7575
public void FirstUsageCheck()
7676
{
@@ -88,73 +88,73 @@ public void FirstUsageCheck()
8888
/** We can see that only the cluster that supports reseeding will opt in to `FirstPoolUsageNeedsSniffing()`;
8989
* You can however disable reseeding/sniffing on ConnectionSettings
9090
*/
91-
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnStartup(false)); //<1> Disable sniffing on startup
91+
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnStartup(false)); //<1> Disable sniffing on startup
9292
sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
9393
}
9494

95-
/**==== Wait for first Sniff
96-
*
97-
* All threads wait for the sniff on startup to finish, waiting the request timeout period. A
98-
* https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx[`SemaphoreSlim`]
99-
* is used to block threads until the sniff finishes and waiting threads release the `SemaphoreSlim` appropriately.
100-
*/
101-
[U]
102-
public void FirstUsageCheckConcurrentThreads()
103-
{
104-
var response = new
105-
{
106-
cluster_name = "elasticsearch",
107-
nodes = new
108-
{
109-
node1 = new
110-
{
111-
name = "Node Name 1",
112-
transport_address = "127.0.0.1:9300",
113-
host = "127.0.0.1",
114-
ip = "127.0.01",
115-
version = "5.0.0-alpha3",
116-
build = "e455fd0",
117-
http_address = "127.0.0.1:9200",
118-
settings = new JObject
119-
{
120-
{"client.type", "node"},
121-
{"cluster.name", "elasticsearch"},
122-
{"config.ignore_system_properties", "true"},
123-
{"name", "Node Name 1"},
124-
{"path.home", "c:\\elasticsearch\\elasticsearch"},
125-
{"path.logs", "c:/ elasticsearch/logs"}
126-
}
127-
}
128-
}
129-
};
130-
131-
var responseBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response));
132-
133-
var inMemoryConnection = new WaitingInMemoryConnection(
134-
TimeSpan.FromSeconds(1),
135-
responseBody);
136-
137-
var sniffingPipeline = CreatePipeline(
138-
uris => new SniffingConnectionPool(uris),
139-
connection: inMemoryConnection,
140-
settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
141-
142-
var semaphoreSlim = new SemaphoreSlim(1, 1);
95+
/**==== Wait for first Sniff
96+
*
97+
* All threads wait for the sniff on startup to finish, waiting the request timeout period. A
98+
* https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx[`SemaphoreSlim`]
99+
* is used to block threads until the sniff finishes and waiting threads release the `SemaphoreSlim` appropriately.
100+
*/
101+
[U]
102+
public void FirstUsageCheckConcurrentThreads()
103+
{
104+
var response = new
105+
{
106+
cluster_name = "elasticsearch",
107+
nodes = new
108+
{
109+
node1 = new
110+
{
111+
name = "Node Name 1",
112+
transport_address = "127.0.0.1:9300",
113+
host = "127.0.0.1",
114+
ip = "127.0.01",
115+
version = "5.0.0-alpha3",
116+
build = "e455fd0",
117+
http_address = "127.0.0.1:9200",
118+
settings = new JObject
119+
{
120+
{"client.type", "node"},
121+
{"cluster.name", "elasticsearch"},
122+
{"config.ignore_system_properties", "true"},
123+
{"name", "Node Name 1"},
124+
{"path.home", "c:\\elasticsearch\\elasticsearch"},
125+
{"path.logs", "c:/ elasticsearch/logs"}
126+
}
127+
}
128+
}
129+
};
143130

144-
/**
145-
* start three tasks that will initiate a sniff on startup. The first task will successfully
146-
* sniff on startup with the remaining two waiting tasks exiting without exception and releasing
147-
* the `SemaphoreSlim`.
148-
*/
149-
var task1 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
150-
var task2 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
151-
var task3 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
152-
153-
var exception = Record.Exception(() => System.Threading.Tasks.Task.WaitAll(task1, task2, task3));
154-
exception.Should().BeNull();
155-
}
156-
157-
/**==== Sniffing on Connection Failure */
131+
var responseBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response));
132+
133+
var inMemoryConnection = new WaitingInMemoryConnection(
134+
TimeSpan.FromSeconds(1),
135+
responseBody);
136+
137+
var sniffingPipeline = CreatePipeline(
138+
uris => new SniffingConnectionPool(uris),
139+
connection: inMemoryConnection,
140+
settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
141+
142+
var semaphoreSlim = new SemaphoreSlim(1, 1);
143+
144+
/**
145+
* start three tasks that will initiate a sniff on startup. The first task will successfully
146+
* sniff on startup with the remaining two waiting tasks exiting without exception and releasing
147+
* the `SemaphoreSlim`.
148+
*/
149+
var task1 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
150+
var task2 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
151+
var task3 = System.Threading.Tasks.Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
152+
153+
var exception = Record.Exception(() => System.Threading.Tasks.Task.WaitAll(task1, task2, task3));
154+
exception.Should().BeNull();
155+
}
156+
157+
/**==== Sniffing on Connection Failure */
158158
[U]
159159
public void SniffsOnConnectionFailure()
160160
{
@@ -171,9 +171,9 @@ public void SniffsOnConnectionFailure()
171171
*/
172172
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnConnectionFault(false));
173173
sniffingPipeline.SniffsOnConnectionFailure.Should().BeFalse();
174-
}
175-
176-
/**==== Sniffing on Stale cluster */
174+
}
175+
176+
/**==== Sniffing on Stale cluster */
177177
[U]
178178
public void SniffsOnStaleCluster()
179179
{

0 commit comments

Comments
 (0)