Skip to content

Commit cd53c8b

Browse files
committed
implemented pings on first time usage and whenever a dead node is used for the first time, can be disabled in connectionconfiguration
1 parent 1f74fde commit cd53c8b

17 files changed

+98
-42
lines changed

src/Connections/Elasticsearch.Net.Connection.HttpClient/ElasticsearchHttpClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ public ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
118118
throw new NotImplementedException();
119119
}
120120

121-
public bool Ping(Uri uri, int connectTimeout)
121+
public bool Ping(Uri uri)
122122
{
123123
throw new NotImplementedException();
124124
}
125125

126-
public IList<Uri> Sniff(Uri uri, int connectTimeout)
126+
public IList<Uri> Sniff(Uri uri)
127127
{
128128
throw new NotImplementedException();
129129
}

src/Connections/Elasticsearch.Net.Connection.Thrift/ThriftConnection.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public ThriftConnection(IConnectionConfigurationValues connectionSettings)
2828
this._poolSize = Math.Max(1, connectionSettings.MaximumAsyncConnections);
2929

3030
this._resourceLock = new Semaphore(_poolSize, _poolSize);
31-
int seed;
31+
int seed; bool shouldPingHint;
3232
for (var i = 0; i <= connectionSettings.MaximumAsyncConnections; i++)
3333
{
34-
var uri = this._connectionSettings.ConnectionPool.GetNext(null, out seed);
34+
var uri = this._connectionSettings.ConnectionPool.GetNext(null, out seed, out shouldPingHint);
3535
var host = uri.Host;
3636
var port = uri.Port;
3737
var tsocket = new TSocket(host, port);
@@ -195,7 +195,7 @@ public ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
195195
return this.Execute(restRequest);
196196
}
197197

198-
public bool Ping(Uri uri, int connectTimeout)
198+
public bool Ping(Uri uri)
199199
{
200200
var restRequest = new RestRequest();
201201
restRequest.Method = Method.HEAD;
@@ -207,11 +207,11 @@ public bool Ping(Uri uri, int connectTimeout)
207207
return r.Success;
208208
}
209209

210-
public IList<Uri> Sniff(Uri uri, int connectTimeout)
210+
public IList<Uri> Sniff(Uri uri)
211211
{
212212
var restRequest = new RestRequest();
213213
restRequest.Method = Method.GET;
214-
restRequest.Uri = new Uri(uri,"/_nodes/_all/clear?timeout=" + connectTimeout);
214+
restRequest.Uri = new Uri(uri,"/_nodes/_all/clear?timeout=" + this._connectionSettings.PingTimeout.GetValueOrDefault(50));
215215

216216
restRequest.Headers = new Dictionary<string, string>();
217217
restRequest.Headers.Add("Content-Type", "application/json");

src/Elasticsearch.Net.Tests.Unit/Connection/ConcurrencyTests.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,12 @@ public ConcurrencyTestConnection(IConnectionConfigurationValues settings)
122122
{
123123
}
124124

125-
public override IList<Uri> Sniff(Uri uri, int connectTimeout)
125+
public override bool Ping(Uri uri)
126+
{
127+
return true;
128+
}
129+
130+
public override IList<Uri> Sniff(Uri uri)
126131
{
127132
return _rnd.Next(1, 11) % 3 == 0 ? _uris : _uris2;
128133
}

src/Elasticsearch.Net.Tests.Unit/Connection/SkipDeadNodesTests.cs

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public void DeadNodesAreNotVisited()
7777
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null) //info 9 - 9202
7878
);
7979
getCall.Invokes((Uri u) => seenNodes.Add(u));
80+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
81+
pingCall.Returns(true);
8082

8183
var client1 = fake.Resolve<ElasticsearchClient>();
8284
client1.Info(); //info call 1
@@ -101,6 +103,9 @@ public void DeadNodesAreNotVisited()
101103
seenNodes[7].Port.Should().Be(9204);
102104
seenNodes[8].Port.Should().Be(9203);
103105

106+
//4 nodes first time usage + 1 time after the first time 9203 came back to live
107+
pingCall.MustHaveHappened(Repeated.Exactly.Times(5));
108+
104109
//var nowCall = A.CallTo(() => fake.Resolve<IDateTimeProvider>().Sniff(A<Uri>._, A<int>._));
105110
}
106111
}

src/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void SniffOnStartupCallsSniffOnlyOnce()
3333
var param = new TypedParameter(typeof(IDateTimeProvider), null);
3434
fake.Provide<ITransport, Transport>(param);
3535
var connection = fake.Resolve<IConnection>();
36-
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
36+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._));
3737
var client1 = fake.Resolve<ElasticsearchClient>();
3838
var client2 = fake.Resolve<ElasticsearchClient>();
3939
var client3 = fake.Resolve<ElasticsearchClient>();
@@ -66,7 +66,7 @@ public void SniffIsCalledAfterItHasGoneOutOfDate()
6666
fake.Provide<IConnectionConfigurationValues>(config);
6767
fake.Provide<ITransport>(fake.Resolve<Transport>());
6868
var connection = fake.Resolve<IConnection>();
69-
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
69+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._));
7070
var getCall = A.CallTo(() => connection.GetSync(A<Uri>._));
7171
getCall.Returns(ElasticsearchResponse.Create(config, 200, "GET", "/", null, null));
7272

@@ -106,7 +106,7 @@ public void SniffIsCalledAfterItHasGoneOutOfDate_NotWhenItSeesA503()
106106
fake.Provide<IConnectionConfigurationValues>(config);
107107
fake.Provide<ITransport>(fake.Resolve<Transport>());
108108
var connection = fake.Resolve<IConnection>();
109-
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
109+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._));
110110
var getCall = A.CallTo(() => connection.GetSync(A<Uri>._));
111111
getCall.ReturnsNextFromSequence(
112112
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 1
@@ -146,7 +146,7 @@ public void SniffOnConnectionFaultCausesSniffOn503()
146146
fake.Provide<IConnectionConfigurationValues>(config);
147147
fake.Provide<ITransport>(fake.Resolve<Transport>());
148148
var connection = fake.Resolve<IConnection>();
149-
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
149+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._));
150150
var getCall = A.CallTo(() => connection.GetSync(A<Uri>._));
151151
getCall.ReturnsNextFromSequence(
152152
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 1
@@ -188,7 +188,7 @@ public void HostsReturnedBySniffAreVisited()
188188
fake.Provide<IConnectionConfigurationValues>(config);
189189
fake.Provide<ITransport>(fake.Resolve<Transport>());
190190
var connection = fake.Resolve<IConnection>();
191-
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
191+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._));
192192
sniffCall.Returns(new List<Uri>()
193193
{
194194
new Uri("http://localhost:9204"),

src/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs

+14-2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public void ThrowsOutOfNodesException_AndRetriesTheSpecifiedTimes()
6060
//an exception
6161
var getCall = A.CallTo(() => fake.Resolve<IConnection>().GetSync(A<Uri>._));
6262
getCall.Throws<Exception>();
63+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
64+
pingCall.Returns(true);
6365

6466
//create a real ElasticsearchClient with it unspecified dependencies
6567
//as fakes
@@ -124,6 +126,8 @@ public void HardRetryLimitTakesPrecedenceOverNumberOfNodes()
124126
);
125127
var getCall = A.CallTo(() => fake.Resolve<IConnection>().GetSync(A<Uri>._));
126128
getCall.Throws<Exception>();
129+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
130+
pingCall.Returns(true);
127131

128132
this.ProvideTransport(fake);
129133

@@ -159,7 +163,8 @@ public void AConnectionMustBeMadeEvenIfAllNodesAreDead()
159163
ElasticsearchResponse.Create(_config, 503, "GET", "/", null, null),
160164
ElasticsearchResponse.Create(_config, 200, "GET", "/", null, null)
161165
);
162-
166+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
167+
pingCall.Returns(true);
163168
//setup client
164169
this.ProvideTransport(fake);
165170
var client = fake.Resolve<ElasticsearchClient>();
@@ -170,6 +175,8 @@ public void AConnectionMustBeMadeEvenIfAllNodesAreDead()
170175

171176
//original call + 4 retries == 5
172177
getCall.MustHaveHappened(Repeated.Exactly.Times(5));
178+
//ping must have been send out 4 times to the 4 nodes being used for the first time
179+
pingCall.MustHaveHappened(Repeated.Exactly.Times(4));
173180

174181
}
175182
}
@@ -209,8 +216,9 @@ public void AllNodesWillBeMarkedDead()
209216
getCall.Returns(
210217
ElasticsearchResponse.Create(_config, 503, "GET", "/", null, null)
211218
);
212-
213219

220+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
221+
pingCall.Returns(true);
214222
this.ProvideTransport(fake);
215223
var client = fake.Resolve<ElasticsearchClient>();
216224

@@ -259,6 +267,8 @@ public void IfAConnectionComesBackToLifeOnItsOwnItShouldBeMarked()
259267
ElasticsearchResponse.Create(_config, 503, "GET", "/", null, null),
260268
ElasticsearchResponse.Create(_config, 200, "GET", "/", null, null)
261269
);
270+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
271+
pingCall.Returns(true);
262272

263273

264274
//provide a transport with all the dependencies resolved
@@ -327,6 +337,8 @@ public void IfAllButOneConnectionDiesSubsequentRequestsMustUseTheOneAliveConnect
327337
ElasticsearchResponse.Create(_config, 200, "GET", "/", null, null),
328338
ElasticsearchResponse.Create(_config, 200, "GET", "/", null, null)
329339
);
340+
var pingCall = A.CallTo(() => fake.Resolve<IConnection>().Ping(A<Uri>._));
341+
pingCall.Returns(true);
330342

331343
//provide a transport with all the dependencies resolved
332344
this.ProvideTransport(fake);

src/Elasticsearch.Net.Tests.Unit/Stubs/NoopConnection.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ public virtual ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
118118
return _responseGenerator.Create();
119119
}
120120

121-
public bool Ping(Uri uri, int connectTimeout)
121+
public bool Ping(Uri uri)
122122
{
123-
throw new NotImplementedException();
123+
return true;
124124
}
125125

126-
public IList<Uri> Sniff(Uri uri, int connectTimeout)
126+
public IList<Uri> Sniff(Uri uri)
127127
{
128128
throw new NotImplementedException();
129129
}

src/Elasticsearch.Net/Connection/ConnectionConfiguration.cs

+11
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ConnectionConfiguration<T> : IConnectionConfigurationValues
3737
public int? MaxDeadTimeout { get; private set; }
3838
public string ProxyUsername { get; private set; }
3939
public string ProxyPassword { get; private set; }
40+
public bool DisablePings { get; private set; }
4041
public string ProxyAddress { get; private set; }
4142
public int MaximumAsyncConnections { get; private set; }
4243
public int? MaxRetries { get; private set; }
@@ -98,6 +99,16 @@ public T EnableTrace(bool enabled = true)
9899
return (T) this;
99100
}
100101

102+
/// <summary>
103+
/// When a node is used for the very first time or when it's used for the first time after it has been marked dead
104+
/// a ping with a very low timeout is send to the node to make sure that when it's still dead it reports it as fast as possible.
105+
/// You can disable these pings globally here if you rather have it fail on the possible slower original request
106+
/// </summary>
107+
public T DisablePing(bool disable = true)
108+
{
109+
this.DisablePings = disable;
110+
return (T) this;
111+
}
101112
/// <summary>
102113
/// This NameValueCollection will be appended to every url NEST calls, great if you need to pass i.e an API key.
103114
/// </summary>

src/Elasticsearch.Net/Connection/HttpConnection.cs

+7-7
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,23 @@ public virtual ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
6767
return this.DoSynchronousRequest(connection, data);
6868
}
6969

70-
public virtual bool Ping(Uri uri, int connectTimeout)
70+
public virtual bool Ping(Uri uri)
7171
{
7272
var request = this.CreateHttpWebRequest(uri, "HEAD");
73-
request.Timeout = connectTimeout;
74-
request.ReadWriteTimeout = connectTimeout;
73+
request.Timeout = this._ConnectionSettings.PingTimeout.GetValueOrDefault(50);
74+
request.ReadWriteTimeout = this._ConnectionSettings.PingTimeout.GetValueOrDefault(50);
7575
using (var response = (HttpWebResponse)request.GetResponse())
7676
{
7777
return response.StatusCode == HttpStatusCode.OK;
7878
}
7979
}
8080

81-
public virtual IList<Uri> Sniff(Uri uri, int connectTimeout)
81+
public virtual IList<Uri> Sniff(Uri uri)
8282
{
83-
uri = new Uri(uri, "_nodes/_all/clear?timeout=" + connectTimeout);
83+
uri = new Uri(uri, "_nodes/_all/clear?timeout=" + this._ConnectionSettings.PingTimeout.GetValueOrDefault(50));
8484
var request = this.CreateHttpWebRequest(uri, "GET");
85-
request.Timeout = connectTimeout;
86-
request.ReadWriteTimeout = connectTimeout;
85+
request.Timeout = this._ConnectionSettings.Timeout;
86+
request.ReadWriteTimeout = this._ConnectionSettings.Timeout;
8787
using (var response = (HttpWebResponse)request.GetResponse())
8888
using (var responseStream = response.GetResponseStream())
8989
{

src/Elasticsearch.Net/Connection/IConnection.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public interface IConnection
2626
Task<ElasticsearchResponse> Delete(Uri uri, byte[] data);
2727
ElasticsearchResponse DeleteSync(Uri uri, byte[] data);
2828

29-
bool Ping(Uri uri, int connectTimeout);
30-
IList<Uri> Sniff(Uri uri, int connectTimeout);
29+
bool Ping(Uri uri);
30+
IList<Uri> Sniff(Uri uri);
3131

3232
}
3333
}

src/Elasticsearch.Net/Connection/IConnectionConfigurationValues.cs

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public interface IConnectionConfigurationValues
1717
int? DeadTimeout { get; }
1818
int? MaxDeadTimeout { get; }
1919
int? MaxRetries { get; }
20+
bool DisablePings { get; }
2021
string ProxyAddress { get; }
2122
string ProxyUsername { get; }
2223
string ProxyPassword { get; }

src/Elasticsearch.Net/Connection/Transport.cs

+19-5
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,16 @@ public ElasticsearchResponse DoRequest(
8383

8484
var postData = PostData(data);
8585
ElasticsearchResponse response = null;
86-
87-
int initialSeed;
88-
var baseUri = this._connectionPool.GetNext(seed, out initialSeed);
86+
87+
int initialSeed; bool shouldPingHint;
88+
var baseUri = this._connectionPool.GetNext(seed, out initialSeed, out shouldPingHint);
8989
bool seenError = false;
9090

9191
try
9292
{
93+
if (shouldPingHint && !this._configurationValues.DisablePings)
94+
this._connection.Ping(CreateUriToPath(baseUri, ""));
95+
9396
var uri = CreateUriToPath(baseUri, path);
9497
response = _doRequest(method, uri, postData);
9598
if (response != null && response.SuccessOrKnownError)
@@ -160,8 +163,19 @@ public Task<ElasticsearchResponse> DoRequestAsync(
160163
if (queryString != null) path += queryString.ToQueryString();
161164

162165
var postData = PostData(data);
163-
int initialSeed;
164-
var baseUri = this._connectionPool.GetNext(seed, out initialSeed);
166+
int initialSeed; bool shouldPingHint;
167+
var baseUri = this._connectionPool.GetNext(seed, out initialSeed, out shouldPingHint);
168+
if (shouldPingHint && !this._configurationValues.DisablePings)
169+
{
170+
try
171+
{
172+
this._connection.Ping(CreateUriToPath(baseUri, ""));
173+
}
174+
catch (Exception e)
175+
{
176+
return this.RetryRequestAsync(method, path, data, retried, baseUri, initialSeed, e);
177+
}
178+
}
165179
var uri = CreateUriToPath(baseUri, path);
166180
return _doRequestAsync(method, uri, postData).ContinueWith(t=>
167181
{

src/Elasticsearch.Net/ConnectionPool/EndpointState.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace Elasticsearch.Net.ConnectionPool
77
{
88
public class EndpointState
99
{
10-
public int _attempts = 0;
10+
public int _attempts = -1;
1111
public DateTime date = new DateTime();
1212
}
1313
}

src/Elasticsearch.Net/ConnectionPool/IConnectionPool.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ public interface IConnectionPool
1313
int MaxRetries { get; }
1414

1515
/// <summary>
16-
/// Get the next available Uri for a live node
16+
/// Gets the next live Uri to perform the request on
1717
/// </summary>
18-
Uri GetNext(int? initialSeed, out int seed);
18+
/// <param name="initialSeed">pass the original seed when retrying, this guarantees that the nodes are walked in a predictable manner when multithreading</param>
19+
/// <param name="seed">The seed this call started on</param>
20+
/// <returns></returns>
21+
Uri GetNext(int? initialSeed, out int seed, out bool shouldPingHint);
1922

2023
/// <summary>
2124
/// Mark the specified Uri as dead

src/Elasticsearch.Net/ConnectionPool/SingleNodeConnectionPool.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ public SingleNodeConnectionPool(Uri uri)
1818
_uri = uri;
1919
}
2020

21-
public Uri GetNext(int? initialSeed, out int seed)
21+
public Uri GetNext(int? initialSeed, out int seed, out bool shouldPingHint)
2222
{
2323
seed = 0;
24+
shouldPingHint = false;
2425
return _uri;
2526
}
2627

src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ public override void Sniff(IConnection connection, bool fromStartupHint = false)
2828

2929
try
3030
{
31-
int seed;
32-
var uri = this.GetNext(null, out seed);
31+
int seed; bool shouldPingHint;
32+
var uri = this.GetNext(null, out seed, out shouldPingHint);
3333

3434
this._readerWriter.EnterWriteLock();
35-
var nodes = connection.Sniff(uri, 50);
35+
var nodes = connection.Sniff(uri);
3636
if (!nodes.HasAny())
3737
return;
3838

@@ -48,12 +48,12 @@ public override void Sniff(IConnection connection, bool fromStartupHint = false)
4848
}
4949
}
5050

51-
public override Uri GetNext(int? initialSeed, out int seed)
51+
public override Uri GetNext(int? initialSeed, out int seed, out bool shouldPingHint)
5252
{
5353
try
5454
{
5555
this._readerWriter.EnterReadLock();
56-
return base.GetNext(initialSeed, out seed);
56+
return base.GetNext(initialSeed, out seed, out shouldPingHint);
5757
}
5858
finally
5959
{

0 commit comments

Comments
 (0)