-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathElasticsearchNode.cs
233 lines (197 loc) · 8.13 KB
/
ElasticsearchNode.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using Elastic.Elasticsearch.Managed.Configuration;
using Elastic.Elasticsearch.Managed.ConsoleWriters;
using Elastic.Elasticsearch.Managed.FileSystem;
using Elastic.Stack.ArtifactsApi;
using ProcNet;
using ProcNet.Std;
namespace Elastic.Elasticsearch.Managed
{
public class ElasticsearchNode : ObservableProcess
{
private readonly ManualResetEvent _startedHandle = new ManualResetEvent(false);
public ElasticsearchNode(ElasticVersion version, string elasticsearchHome = null)
: this(new NodeConfiguration(new ClusterConfiguration(version,
(v, s) => new NodeFileSystem(v, elasticsearchHome))))
{
}
public ElasticsearchNode(NodeConfiguration config) : base(StartArgs(config)) => NodeConfiguration = config;
public string Version { get; private set; }
public int? Port { get; private set; }
public bool NodeStarted { get; private set; }
public NodeConfiguration NodeConfiguration { get; }
private int? JavaProcessId { get; set; }
public override int? ProcessId => JavaProcessId ?? base.ProcessId;
public int? HostProcessId => base.ProcessId;
/// <summary>
/// Set this true if you want the node to go into assumed started state as soon as its waiting for more nodes to start
/// doing the election.
/// <para>Useful to speed up starting multi node clusters</para>
/// </summary>
public bool AssumeStartedOnNotEnoughMasterPing { get; set; }
internal IConsoleLineHandler Writer { get; private set; }
public Exception LastSeenException { get; set; }
public WaitHandle StartedHandle => _startedHandle;
private static StartArguments StartArgs(NodeConfiguration config)
{
//var args = new[] {config.FileSystem.Binary}.Concat(config.CommandLineArguments);
var startArguments = new StartArguments(config.FileSystem.Binary, config.CommandLineArguments)
{
SendControlCFirst = true,
Environment = EnvVars(config),
WaitForExit = config.WaitForShutdown,
WaitForStreamReadersTimeout = config.WaitForShutdown
};
config.ModifyStartArguments(startArguments);
return startArguments;
}
private static Dictionary<string, string> EnvVars(NodeConfiguration config)
{
var environmentVariables = new Dictionary<string, string> {{"ES_JAVA_OPTS", "-Xms1g -Xmx1g"}};
if (!string.IsNullOrWhiteSpace(config.FileSystem.ConfigPath))
environmentVariables.Add(config.FileSystem.ConfigEnvironmentVariableName, config.FileSystem.ConfigPath);
if (!string.IsNullOrWhiteSpace(config.FileSystem.ElasticsearchHome))
environmentVariables.Add("ES_HOME", config.FileSystem.ElasticsearchHome);
return environmentVariables;
}
private bool AssumedStartedStateChecker(string section, string message)
{
if (AssumeStartedOnNotEnoughMasterPing
&& section.Contains("ZenDiscovery")
&& message.Contains("not enough master nodes discovered during pinging"))
return true;
return false;
}
public IDisposable Start() => Start(TimeSpan.FromMinutes(2));
public IDisposable Start(TimeSpan waitForStarted) => Start(new LineHighlightWriter(), waitForStarted);
public IDisposable Start(IConsoleLineHandler writer, TimeSpan waitForStarted)
{
var node = NodeConfiguration.DesiredNodeName;
var subscription = SubscribeLines(writer);
if (WaitForStarted(waitForStarted)) return subscription;
subscription.Dispose();
throw new ElasticsearchCleanExitException(
$"Failed to start node: {node} before the configured timeout of: {waitForStarted}");
}
public IDisposable SubscribeLines() => SubscribeLines(new LineHighlightWriter());
public IDisposable SubscribeLines(IConsoleLineHandler writer) =>
SubscribeLines(writer, delegate { }, delegate { }, delegate { });
public IDisposable SubscribeLines(IConsoleLineHandler writer, Action<LineOut> onNext) =>
SubscribeLines(writer, onNext, delegate { }, delegate { });
public IDisposable SubscribeLines(IConsoleLineHandler writer, Action<LineOut> onNext,
Action<Exception> onError) =>
SubscribeLines(writer, onNext, onError, delegate { });
public IDisposable SubscribeLines(IConsoleLineHandler writer, Action<LineOut> onNext, Action<Exception> onError,
Action onCompleted)
{
Writer = writer;
var node = NodeConfiguration.DesiredNodeName;
writer?.WriteDiagnostic($"Elasticsearch location: [{Binary}]", node);
writer?.WriteDiagnostic($"Settings: {{{string.Join(" ", NodeConfiguration.CommandLineArguments)}}}", node);
var envVarName = NodeConfiguration.Version.InRange("<7.12.0") ? "JAVA_HOME" : "ES_JAVA_HOME";
var javaHome = Environment.GetEnvironmentVariable(envVarName);
writer?.WriteDiagnostic($"{envVarName}: {{{javaHome}}}", node);
Process.StartInfo.Environment[envVarName] = javaHome;
return SubscribeLines(
l =>
{
writer?.Handle(l);
onNext?.Invoke(l);
},
e =>
{
LastSeenException = e;
writer?.Handle(e);
onError?.Invoke(e);
_startedHandle.Set();
},
() =>
{
onCompleted?.Invoke();
_startedHandle.Set();
});
}
public bool WaitForStarted(TimeSpan timeout) => _startedHandle.WaitOne(timeout);
protected override void OnBeforeSetCompletedHandle()
{
_startedHandle.Set();
base.OnBeforeSetCompletedHandle();
}
protected override void OnBeforeWaitForEndOfStreamsError(TimeSpan waited)
{
// The wait for streams finished before streams were fully read.
// this usually indicates the process is still running.
// Proc will successfully kill the host but will leave the JavaProcess the bat file starts running
// The elasticsearch jar is closing down so won't leak but might prevent EphemeralClusterComposer to do its clean up.
// We do a hard kill on both here to make sure both processes are gone.
HardKill(HostProcessId);
HardKill(JavaProcessId);
}
private static void HardKill(int? processId)
{
if (!processId.HasValue) return;
try
{
var p = Process.GetProcessById(processId.Value);
p.Kill();
}
catch (Exception)
{
}
}
protected override bool ContinueReadingFromProcessReaders()
{
if (!NodeStarted) return true;
return true;
// some how if we return false here it leads to Task starvation in Proc and tests in e.g will Elastic.Elasticsearch.Xunit will start
// to timeout. This makes little sense to me now, so leaving this performance optimization out for now. Hopefully another fresh look will yield
// to (not so) obvious.
//return this.NodeConfiguration.ShowElasticsearchOutputAfterStarted;
}
protected override bool KeepBufferingLines(LineOut c)
{
//if the node is already started only keep buffering lines while we have a writer and the nodeconfiguration wants output after started
if (NodeStarted)
{
var keepBuffering = Writer != null && NodeConfiguration.ShowElasticsearchOutputAfterStarted;
if (!keepBuffering) CancelAsyncReads();
return keepBuffering;
}
var parsed = LineOutParser.TryParse(c?.Line, out _, out _, out var section, out _, out var message,
out var started);
if (!parsed) return Writer != null;
if (JavaProcessId == null && LineOutParser.TryParseNodeInfo(section, message, out var version, out var pid))
{
JavaProcessId = pid;
Version = version;
}
else if (LineOutParser.TryGetPortNumber(section, message, out var port))
{
Port = port;
var dp = NodeConfiguration.DesiredPort;
if (dp.HasValue && Port != dp.Value)
throw new ElasticsearchCleanExitException(
$"Node started on port {port} but {dp.Value} was requested");
}
if (!started) started = AssumedStartedStateChecker(section, message);
if (started)
{
if (!Port.HasValue)
throw new ElasticsearchCleanExitException(
$"Node started but ElasticsearchNode did not grab its port number");
NodeStarted = true;
_startedHandle.Set();
}
// if we have dont a writer always return true
if (Writer != null) return true;
//otherwise only keep buffering if we are not started
return !started;
}
}
}