Skip to content

Commit 1101a2f

Browse files
authored
Reseed sorting and StickySniffingConnectionPool variant (#2789)
* Add SniffingSortedStickyConnectionPool * SniffingSortedStickyConnectionPool inherits from SniffingConnectionPool, uses scoring function for node priority * Extract common actions in StaticConnectionPool to share logic with inheritors * Make StickyConnectionPool an override of StaticConnectionPool, so all the pools can share common code * Continuation of #2657 - rebased and cherry-picked relevant commits - Make the sorting a property of the base classes, this now also allows the regular sniffing connection to sort the responses - Renamed SniffingSortedStickyCP to StickySniffingCP because of that - Added documentation and tests for the StickySniffingCP - Fixed a bug with the virtual clustering Failing with a 200 response after .Succeeds(times). * fix documentation PR feedback
1 parent e23ca0e commit 1101a2f

File tree

14 files changed

+468
-148
lines changed

14 files changed

+468
-148
lines changed

src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@ public SniffingConnectionPool(IEnumerable<Uri> uris, bool randomize = true, IDat
2020
{ }
2121

2222
public SniffingConnectionPool(IEnumerable<Node> nodes, bool randomize = true, IDateTimeProvider dateTimeProvider = null)
23-
: base(nodes, randomize, dateTimeProvider)
24-
{ }
23+
: base(nodes, randomize, dateTimeProvider) { }
2524

25+
public SniffingConnectionPool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, IDateTimeProvider dateTimeProvider = null)
26+
: base(nodes, nodeScorer, dateTimeProvider) { }
27+
28+
/// <summary>
29+
/// Obsolete overload
30+
/// </summary>
31+
/// <param name="predicate">UNUSED</param>
32+
[Obsolete("this constructor has an unused parameter: predicate and will be removed in 6.0")]
2633
public SniffingConnectionPool(IEnumerable<Node> nodes, Func<Node, bool> predicate, bool randomize = true, IDateTimeProvider dateTimeProvider = null)
2734
: base(nodes, randomize, dateTimeProvider)
2835
{ }
2936

30-
private static bool DefaultPredicate(Node node) => !node.MasterOnlyNode;
31-
3237
/// <inheritdoc/>
3338
public override IReadOnlyCollection<Node> Nodes
3439
{
@@ -56,8 +61,7 @@ public override void Reseed(IEnumerable<Node> nodes)
5661
try
5762
{
5863
this._readerWriter.EnterWriteLock();
59-
var sortedNodes = nodes
60-
.OrderBy(item => this.Randomize ? this.Random.Next() : 1)
64+
var sortedNodes = this.SortNodes(nodes)
6165
.DistinctBy(n => n.Uri)
6266
.ToList();
6367

src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,31 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Globalization;
34
using System.Linq;
45
using System.Threading;
56

67
namespace Elasticsearch.Net
78
{
89
public class StaticConnectionPool : IConnectionPool
910
{
11+
private readonly Func<Node, float> _nodeScorer;
1012
protected IDateTimeProvider DateTimeProvider { get; }
1113
protected Random Random { get; } = new Random();
1214
protected bool Randomize { get; }
1315

1416
protected List<Node> InternalNodes { get; set; }
1517

18+
protected List<Node> AliveNodes
19+
{
20+
get
21+
{
22+
var now = DateTimeProvider.Now();
23+
return this.InternalNodes
24+
.Where(n => n.IsAlive || n.DeadUntil <= now)
25+
.ToList();
26+
}
27+
}
28+
1629
/// <inheritdoc/>
1730
public virtual IReadOnlyCollection<Node> Nodes => this.InternalNodes;
1831

@@ -41,10 +54,16 @@ public StaticConnectionPool(IEnumerable<Uri> uris, bool randomize = true, IDateT
4154
{ }
4255

4356
public StaticConnectionPool(IEnumerable<Node> nodes, bool randomize = true, IDateTimeProvider dateTimeProvider = null)
57+
: this(nodes, null, dateTimeProvider)
4458
{
45-
nodes.ThrowIfEmpty(nameof(nodes));
46-
4759
this.Randomize = randomize;
60+
}
61+
62+
//this constructor is protected because nodeScorer only makes sense on subclasses that support reseeding
63+
//otherwise just manually sort `nodes` before instantiating.
64+
protected StaticConnectionPool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, IDateTimeProvider dateTimeProvider = null)
65+
{
66+
nodes.ThrowIfEmpty(nameof(nodes));
4867
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
4968

5069
var nn = nodes.ToList();
@@ -54,8 +73,8 @@ public StaticConnectionPool(IEnumerable<Node> nodes, bool randomize = true, IDat
5473

5574
this.UsingSsl = uris.Any(uri => uri.Scheme == "https");
5675

57-
this.InternalNodes = nn
58-
.OrderBy(item => randomize ? this.Random.Next() : 1)
76+
this._nodeScorer = nodeScorer;
77+
this.InternalNodes = this.SortNodes(nn)
5978
.DistinctBy(n => n.Uri)
6079
.ToList();
6180
this.LastUpdate = this.DateTimeProvider.Now();
@@ -69,42 +88,57 @@ public StaticConnectionPool(IEnumerable<Node> nodes, bool randomize = true, IDat
6988
/// </summary>
7089
public virtual IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
7190
{
72-
//var count = this.InternalNodes.Count;
91+
var nodes = this.AliveNodes;
7392

74-
var now = this.DateTimeProvider.Now();
75-
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
76-
.ToList();
77-
var count = nodes.Count;
78-
Node node;
7993
var globalCursor = Interlocked.Increment(ref GlobalCursor);
8094

81-
if (count == 0)
95+
if (nodes.Count == 0)
8296
{
8397
//could not find a suitable node retrying on first node off globalCursor
84-
audit?.Invoke(AuditEvent.AllNodesDead, null);
85-
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
86-
node.IsResurrected = true;
87-
audit?.Invoke(AuditEvent.Resurrection, node);
88-
yield return node;
98+
yield return this.RetryInternalNodes(globalCursor, audit);
8999
yield break;
90100
}
91101

92-
var localCursor = globalCursor % count;
102+
var localCursor = globalCursor % nodes.Count;
103+
foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit))
104+
{
105+
yield return aliveNode;
106+
}
107+
}
108+
109+
protected virtual Node RetryInternalNodes(int globalCursor, Action<AuditEvent, Node> audit = null)
110+
{
111+
audit?.Invoke(AuditEvent.AllNodesDead, null);
112+
var node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
113+
node.IsResurrected = true;
114+
audit?.Invoke(AuditEvent.Resurrection, node);
115+
116+
return node;
117+
}
93118

94-
for (var attempts = 0; attempts < count; attempts++)
119+
protected virtual IEnumerable<Node> SelectAliveNodes(int cursor, List<Node> aliveNodes, Action<AuditEvent, Node> audit = null)
120+
{
121+
for (var attempts = 0; attempts < aliveNodes.Count; attempts++)
95122
{
96-
node = nodes[localCursor];
97-
localCursor = (localCursor + 1) % count;
123+
var node = aliveNodes[cursor];
124+
cursor = (cursor + 1) % aliveNodes.Count;
98125
//if this node is not alive or no longer dead mark it as resurrected
99126
if (!node.IsAlive)
100127
{
101128
audit?.Invoke(AuditEvent.Resurrection, node);
102129
node.IsResurrected = true;
103130
}
131+
104132
yield return node;
105133
}
106134
}
107135

136+
protected IOrderedEnumerable<Node> SortNodes(IEnumerable<Node> nodes) =>
137+
this._nodeScorer != null
138+
? nodes.OrderByDescending(_nodeScorer)
139+
: nodes.OrderBy(n => this.Randomize ? this.Random.Next() : 1);
140+
141+
108142
void IDisposable.Dispose() => this.DisposeManagedResources();
109143

110144
protected virtual void DisposeManagedResources() { }
Lines changed: 15 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,42 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Linq;
43
using System.Threading;
54

65
namespace Elasticsearch.Net
76
{
8-
public class StickyConnectionPool : IConnectionPool
7+
public class StickyConnectionPool : StaticConnectionPool
98
{
10-
protected IDateTimeProvider DateTimeProvider { get; }
11-
12-
protected List<Node> InternalNodes { get; set; }
13-
14-
public bool UsingSsl { get; }
15-
public bool SniffedOnStartup { get; set; }
16-
17-
public IReadOnlyCollection<Node> Nodes => this.InternalNodes;
18-
19-
public int MaxRetries => this.InternalNodes.Count - 1;
20-
21-
public bool SupportsReseeding => false;
22-
23-
public bool SupportsPinging => true;
24-
25-
public DateTime LastUpdate { get; protected set; }
26-
279
public StickyConnectionPool(IEnumerable<Uri> uris, IDateTimeProvider dateTimeProvider = null)
28-
: this(uris.Select(uri => new Node(uri)), dateTimeProvider)
10+
: base(uris, false, dateTimeProvider)
2911
{ }
3012

3113
public StickyConnectionPool(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider = null)
32-
{
33-
nodes.ThrowIfEmpty(nameof(nodes));
34-
35-
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
36-
37-
var nn = nodes.ToList();
38-
var uris = nn.Select(n => n.Uri).ToList();
39-
if (uris.Select(u => u.Scheme).Distinct().Count() > 1)
40-
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
41-
42-
this.UsingSsl = uris.Any(uri => uri.Scheme == "https");
43-
44-
this.InternalNodes = nn
45-
.DistinctBy(n => n.Uri)
46-
.ToList();
47-
48-
this.LastUpdate = this.DateTimeProvider.Now();
49-
}
50-
51-
protected int GlobalCursor = -1;
14+
: base(nodes, false, dateTimeProvider)
15+
{ }
5216

53-
public IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
17+
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
5418
{
55-
var now = this.DateTimeProvider.Now();
56-
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
57-
.ToList();
58-
var count = nodes.Count;
59-
Node node;
19+
var nodes = this.AliveNodes;
6020

61-
if (count == 0)
21+
if (nodes.Count == 0)
6222
{
63-
var globalCursor = Interlocked.Increment(ref GlobalCursor);
23+
var globalCursor = Interlocked.Increment(ref this.GlobalCursor);
24+
6425
//could not find a suitable node retrying on first node off globalCursor
65-
audit?.Invoke(AuditEvent.AllNodesDead, null);
66-
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
67-
node.IsResurrected = true;
68-
audit?.Invoke(AuditEvent.Resurrection, node);
69-
yield return node;
26+
yield return this.RetryInternalNodes(globalCursor, audit);
7027
yield break;
7128
}
7229

7330
// If the cursor is greater than the default then it's been
7431
// set already but we now have a live node so we should reset it
75-
if (GlobalCursor > -1)
76-
{
77-
Interlocked.Exchange(ref GlobalCursor, -1);
78-
}
32+
if (this.GlobalCursor > -1)
33+
Interlocked.Exchange(ref this.GlobalCursor, -1);
7934

8035
var localCursor = 0;
81-
82-
for (var attempts = 0; attempts < count; attempts++)
83-
{
84-
node = nodes[localCursor];
85-
localCursor = (localCursor + 1) % count;
86-
//if this node is not alive or no longer dead mark it as resurrected
87-
if (!node.IsAlive)
88-
{
89-
audit?.Invoke(AuditEvent.Resurrection, node);
90-
node.IsResurrected = true;
91-
}
92-
yield return node;
93-
}
36+
foreach (var aliveNode in this.SelectAliveNodes(localCursor, nodes, audit))
37+
yield return aliveNode;
9438
}
9539

96-
public void Reseed(IEnumerable<Node> nodes) { }
97-
98-
void IDisposable.Dispose() => this.DisposeManagedResources();
99-
100-
protected virtual void DisposeManagedResources() { }
40+
public override void Reseed(IEnumerable<Node> nodes) { }
10141
}
10242
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
6+
namespace Elasticsearch.Net
7+
{
8+
public class StickySniffingConnectionPool : SniffingConnectionPool
9+
{
10+
public override bool SupportsPinging => true;
11+
public override bool SupportsReseeding => true;
12+
13+
public StickySniffingConnectionPool(IEnumerable<Uri> uris, Func<Node, float> nodeScorer, IDateTimeProvider dateTimeProvider = null)
14+
: base(uris.Select(uri => new Node(uri)), nodeScorer ?? DefaultNodeScore, dateTimeProvider) { }
15+
16+
public StickySniffingConnectionPool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, IDateTimeProvider dateTimeProvider = null)
17+
: base(nodes, nodeScorer ?? DefaultNodeScore, dateTimeProvider) { }
18+
19+
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
20+
{
21+
var nodes = this.AliveNodes;
22+
23+
if (nodes.Count == 0)
24+
{
25+
var globalCursor = Interlocked.Increment(ref this.GlobalCursor);
26+
27+
//could not find a suitable node retrying on first node off globalCursor
28+
yield return this.RetryInternalNodes(globalCursor, audit);
29+
yield break;
30+
}
31+
32+
// If the cursor is greater than the default then it's been
33+
// set already but we now have a live node so we should reset it
34+
if (this.GlobalCursor > -1)
35+
Interlocked.Exchange(ref this.GlobalCursor, -1);
36+
37+
var localCursor = 0;
38+
foreach (var aliveNode in this.SelectAliveNodes(localCursor, nodes, audit))
39+
yield return aliveNode;
40+
}
41+
42+
private static float DefaultNodeScore(Node node) => 0f;
43+
}
44+
}

src/Tests/ClientConcepts/ConnectionPooling/BuildingBlocks/ConnectionPooling.doc.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,38 @@ [U] public void Sticky()
180180
client.ConnectionSettings.ConnectionPool
181181
.Should().BeOfType<StickyConnectionPool>();
182182
}
183+
184+
/**[[sticky-sniffing-connection-pool]]
185+
* ==== Sticky Sniffing Connection Pool
186+
*
187+
* A type of connection pool that returns the first live node to issue a request against, such that the node is _sticky_ between requests.
188+
* This implementation supports sniffing and sorting so that each instance of your application can favor a node in the same rack based
189+
* on node attributes for instance.
190+
*/
191+
[U] public void SniffingSortedSticky()
192+
{
193+
var uris = Enumerable.Range(9200, 5)
194+
.Select(port => new Uri($"http://localhost:{port}"));
195+
196+
/** a sniffing sorted sticky pool takes a second parameter `Func` takes a Node and returns a weight.
197+
* Nodes will be sorted descending by weight. In the following example we score nodes that are client nodes
198+
* AND in rack_id `rack_one` the highest
199+
*/
200+
201+
var pool = new StickySniffingConnectionPool(uris, n =>
202+
(n.ClientNode ? 10 : 0)
203+
+ (n.Settings.TryGetValue("node.attr.rack_id", out string rackId)
204+
&& rackId == "rack_one" ? 10 : 0));
205+
206+
pool.SupportsReseeding.Should().BeTrue();
207+
pool.SupportsPinging.Should().BeTrue();
208+
209+
/** To create a client using the sticky sniffing connection pool pass
210+
* the connection pool to the `ConnectionSettings` you pass to `ElasticClient`
211+
*/
212+
var client = new ElasticClient(new ConnectionSettings(pool));
213+
client.ConnectionSettings.ConnectionPool
214+
.Should().BeOfType<StickySniffingConnectionPool>();
215+
}
183216
}
184217
}

0 commit comments

Comments
 (0)