-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathIClusterComposeTask.cs
248 lines (214 loc) · 9.41 KB
/
IClusterComposeTask.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Security;
using System.Runtime.InteropServices;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Elasticsearch.Managed;
using Elastic.Elasticsearch.Managed.ConsoleWriters;
using ICSharpCode.SharpZipLib.GZip;
using ICSharpCode.SharpZipLib.Tar;
using ProcNet;
using ProcNet.Std;
namespace Elastic.Elasticsearch.Ephemeral.Tasks
{
public interface IClusterComposeTask
{
void Run(IEphemeralCluster<EphemeralClusterConfiguration> cluster);
}
public interface IClusterTeardownTask
{
/// <summary>
/// Called when the cluster disposes, used to clean up after itself.
/// </summary>
/// <param name="cluster">The cluster configuration of the node that was started</param>
/// <param name="nodeStarted">Whether the cluster composer was successful in starting the node</param>
void Run(IEphemeralCluster<EphemeralClusterConfiguration> cluster, bool nodeStarted);
}
public abstract class ClusterComposeTask : IClusterComposeTask
{
protected static bool IsWindows { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
protected static string BinarySuffix => IsWindows ? ".bat" : string.Empty;
public abstract void Run(IEphemeralCluster<EphemeralClusterConfiguration> cluster);
protected static void DownloadFile(string from, string to)
{
if (File.Exists(to)) return;
var http = new HttpClient();
using (var stream = http.GetStreamAsync(new Uri(from)).GetAwaiter().GetResult())
using (var fileStream = File.Create(to))
{
stream.CopyTo(fileStream);
fileStream.Flush();
}
}
protected string GetResponseException(HttpResponseMessage m) =>
$"Code: {m?.StatusCode} Reason: {m?.ReasonPhrase} Content: {GetResponseString(m)}";
protected string GetResponseString(HttpResponseMessage m) =>
m?.Content?.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult() ?? string.Empty;
protected HttpResponseMessage Get(IEphemeralCluster<EphemeralClusterConfiguration> cluster, string path,
string query) =>
Call(cluster, path, query, (c, u, t) => c.GetAsync(u, t));
protected HttpResponseMessage Post(IEphemeralCluster<EphemeralClusterConfiguration> cluster, string path,
string query, string json) =>
Call(cluster, path, query,
(c, u, t) => c.PostAsync(u, new StringContent(json, Encoding.UTF8, "application/json"), t));
private HttpResponseMessage Call(
IEphemeralCluster<EphemeralClusterConfiguration> cluster,
string path,
string query,
Func<HttpClient, Uri, CancellationToken, Task<HttpResponseMessage>> verb)
{
var q = string.IsNullOrEmpty(query) ? "pretty=true" : query + "&pretty=true";
var statusUrl = new UriBuilder(cluster.NodesUris().First()) {Path = path, Query = q}.Uri;
var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20));
var handler = new HttpClientHandler
{
AutomaticDecompression =
DecompressionMethods.Deflate | DecompressionMethods.GZip | DecompressionMethods.None,
};
if (cluster.DetectedProxy != DetectedProxySoftware.None)
handler.Proxy = new WebProxy { Address = new Uri("http://localhost:8080") };
cluster.Writer.WriteDiagnostic(
$"{{{nameof(Call)}}} [{statusUrl}] SSL: {cluster.ClusterConfiguration.EnableSsl} Security: {cluster.ClusterConfiguration.EnableSecurity}");
if (cluster.ClusterConfiguration.EnableSsl)
{
#if !NETSTANDARD
ServicePointManager.ServerCertificateValidationCallback += ServerCertificateValidationCallback;
#else
handler.ServerCertificateCustomValidationCallback += (m, c, cn, p) => true;
#endif
}
using var client = new HttpClient(handler) {Timeout = TimeSpan.FromSeconds(20)};
if (cluster.ClusterConfiguration.EnableSecurity)
{
var byteArray =
Encoding.ASCII.GetBytes(
$"{ClusterAuthentication.Admin.Username}:{ClusterAuthentication.Admin.Password}");
client.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
}
try
{
var response = verb(client, statusUrl, tokenSource.Token).ConfigureAwait(false).GetAwaiter()
.GetResult();
if (response.StatusCode == HttpStatusCode.OK) return response;
cluster.Writer.WriteDiagnostic(
$"{{{nameof(Call)}}} [{statusUrl}] Bad status code: [{(int) response.StatusCode}]");
var body = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
foreach (var l in (body ?? string.Empty).Split('\n', '\r'))
cluster.Writer.WriteDiagnostic($"{{{nameof(Call)}}} [{statusUrl}] returned [{l}]");
}
catch (Exception e)
{
cluster.Writer.WriteError($"{{{nameof(Call)}}} [{statusUrl}] exception: {e}");
// ignored
}
finally
{
#if !NETSTANDARD
ServicePointManager.ServerCertificateValidationCallback -= ServerCertificateValidationCallback;
#endif
}
return null;
}
private static bool ServerCertificateValidationCallback(object sender, X509Certificate certificate,
X509Chain chain, SslPolicyErrors sslpolicyerrors) => true;
protected static void WriteFileIfNotExist(string fileLocation, string contents)
{
if (!File.Exists(fileLocation)) File.WriteAllText(fileLocation, contents);
}
protected static void ExecuteBinary(EphemeralClusterConfiguration config, IConsoleLineHandler writer,
string binary, string description, params string[] arguments) =>
ExecuteBinaryInternal(config, writer, binary, description, arguments);
private static void ExecuteBinaryInternal(EphemeralClusterConfiguration config, IConsoleLineHandler writer,
string binary, string description, params string[] arguments)
{
var command = $"{{{binary}}} {{{string.Join(" ", arguments)}}}";
writer?.WriteDiagnostic($"{{{nameof(ExecuteBinary)}}} starting process [{description}] {command}");
var timeout = TimeSpan.FromSeconds(420);
var processStartArguments = new StartArguments(binary, arguments)
{
Environment = new Dictionary<string, string>
{
{config.FileSystem.ConfigEnvironmentVariableName, config.FileSystem.ConfigPath},
{"ES_HOME", config.FileSystem.ElasticsearchHome}
},
Timeout = timeout,
ConsoleOutWriter = new ConsoleOutColorWriter(),
};
writer.WriteDiagnostic($"{binary} {string.Join(" ", arguments)}");
var result = Proc.Start(processStartArguments);
if (!result.Completed)
throw new Exception($"Timeout while executing {description} exceeded {timeout}");
if (result.ExitCode != 0)
throw new Exception(
$"Expected exit code 0 but received ({result.ExitCode}) while executing {description}: {command}");
var errorOut = result.ConsoleOut.Where(c => c.Error).ToList();
// this manifested when calling certgen on versions smaller then 5.2.0
if (errorOut.Any() && config.Version < "5.2.0")
errorOut = errorOut.Where(e => !e.Line.Contains("No log4j2 configuration file found")).ToList();
errorOut = errorOut
.Where(e => !string.IsNullOrWhiteSpace(e.Line))
.Where(e => !e.Line.Contains("usage of JAVA_HOME is deprecated"))
.Where(e => !e.Line.Contains("using ES_JAVA_HOME"))
.Where(e => !e.Line.Trim().StartsWith("warning:"))
.ToList();
if (errorOut.Any() && !binary.Contains("plugin") && !binary.Contains("cert"))
throw new Exception(
$"Received error out with exitCode ({result.ExitCode}) while executing {description}: {command}");
writer?.WriteDiagnostic(
$"{{{nameof(ExecuteBinary)}}} finished process [{description}] {{{result.ExitCode}}}");
}
protected static void CopyFolder(string source, string target, bool overwrite = true)
{
foreach (var sourceDir in Directory.GetDirectories(source, "*", SearchOption.AllDirectories))
{
var targetDir = sourceDir.Replace(source, target);
Directory.CreateDirectory(targetDir);
}
foreach (var sourcePath in Directory.GetFiles(source, "*.*", SearchOption.AllDirectories))
{
var targetPath = sourcePath.Replace(source, target);
if (!overwrite && File.Exists(targetPath)) continue;
File.Copy(sourcePath, targetPath, overwrite);
}
}
protected static void Extract(string file, string toFolder)
{
if (file.EndsWith(".zip")) ExtractZip(file, toFolder);
else if (file.EndsWith(".tar.gz")) ExtractTarGz(file, toFolder);
else if (file.EndsWith(".tar")) ExtractTar(file, toFolder);
else throw new Exception("Can not extract:" + file);
}
private static void ExtractTar(string file, string toFolder)
{
using (var inStream = File.OpenRead(file))
using (var tarArchive = TarArchive.CreateInputTarArchive(inStream))
tarArchive.ExtractContents(toFolder);
}
private static void ExtractTarGz(string file, string toFolder)
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
using (var inStream = File.OpenRead(file))
using (var gzipStream = new GZipInputStream(inStream))
using (var tarArchive = TarArchive.CreateInputTarArchive(gzipStream))
tarArchive.ExtractContents(toFolder);
else
//SharpZipLib loses permissions when untarring
Proc.Exec("tar", "-xvf", file, "-C", toFolder);
}
private static void ExtractZip(string file, string toFolder) =>
ZipFile.ExtractToDirectory(file, toFolder);
}
}