Skip to content

Commit 6b32491

Browse files
Switch back to the deprecated elasticsearch-rest-high-level-client instead of the new elasticsearch-java waiting for the fix elastic/elasticsearch-java#163
1 parent 73cba42 commit 6b32491

9 files changed

+588
-187
lines changed

pom.xml

+13-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<opentelemetry.version>1.11.0</opentelemetry.version>
2626
<opentelemetry-alpha.version>1.11.0-alpha</opentelemetry-alpha.version>
2727
<useBeta>true</useBeta>
28-
<elasticstack.version>8.0.0</elasticstack.version>
28+
<elasticstack.version>7.17.0</elasticstack.version>
2929
</properties>
3030
<name>OpenTelemetry Plugin</name>
3131
<description>Publish Jenkins metrics to an OpenTelemetry endpoint, including distributed traces of job executions and health metrics of the controller.</description>
@@ -334,11 +334,20 @@
334334
<optional>true</optional>
335335
</dependency>
336336
<dependency>
337-
<groupId>co.elastic.clients</groupId>
338-
<artifactId>elasticsearch-java</artifactId>
337+
<!--
338+
Use the old `org.elasticsearch.client:elasticsearch-rest-high-level-client` waiting for
339+
`co.elastic.clients:elasticsearch-java` to fix https://github.com/elastic/elasticsearch-java/issues/163
340+
-->
341+
<groupId>org.elasticsearch.client</groupId>
342+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
339343
<version>${elasticstack.version}</version>
344+
<exclusions>
345+
<exclusion>
346+
<groupId>org.elasticsearch</groupId>
347+
<artifactId>elasticsearch-cli</artifactId>
348+
</exclusion>
349+
</exclusions>
340350
</dependency>
341-
342351
<dependency>
343352
<groupId>io.opentelemetry</groupId>
344353
<artifactId>opentelemetry-sdk-testing</artifactId>

src/main/java/io/jenkins/plugins/opentelemetry/JenkinsOpenTelemetryPluginConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ public boolean configure(StaplerRequest req, JSONObject json) throws FormExcepti
129129
this.observabilityBackends = req.bindJSONToList(ObservabilityBackend.class, json.get("observabilityBackends"));
130130
this.endpoint = sanitizeOtlpEndpoint(this.endpoint);
131131
initializeOpenTelemetry();
132+
if (logStorageRetriever != null && logStorageRetriever instanceof Closeable) {
133+
LOGGER.log(Level.FINE, () -> "Close " + logStorageRetriever + "...");
134+
try {
135+
((Closeable) logStorageRetriever).close();
136+
} catch (IOException e) {
137+
LOGGER.log(Level.WARNING, "Exception closing currently setup logStorageRetriever: " + logStorageRetriever, e);
138+
}
139+
}
132140
this.logStorageRetriever = resolveLogStorageRetriever();
133141
save();
134142
return true;

src/main/java/io/jenkins/plugins/opentelemetry/backend/elastic/ElasticsearchLogStorageRetriever.java

+59-116
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* Copyright The Original Author or Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package io.jenkins.plugins.opentelemetry.backend.elastic;
6+
7+
import co.elastic.clients.elasticsearch.ElasticsearchClient;
8+
import co.elastic.clients.elasticsearch._types.FieldValue;
9+
import co.elastic.clients.elasticsearch._types.SortOrder;
10+
import co.elastic.clients.elasticsearch._types.Time;
11+
import co.elastic.clients.elasticsearch.core.SearchRequest;
12+
import co.elastic.clients.elasticsearch.core.SearchResponse;
13+
import co.elastic.clients.elasticsearch.core.search.Hit;
14+
import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient;
15+
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
16+
import co.elastic.clients.transport.rest_client.RestClientTransport;
17+
import com.cloudbees.plugins.credentials.SystemCredentialsProvider;
18+
import com.cloudbees.plugins.credentials.common.IdCredentials;
19+
import com.cloudbees.plugins.credentials.common.UsernamePasswordCredentials;
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.node.ObjectNode;
22+
import groovy.text.Template;
23+
import io.jenkins.plugins.opentelemetry.TemplateBindingsProvider;
24+
import io.jenkins.plugins.opentelemetry.job.log.ConsoleNotes;
25+
import io.jenkins.plugins.opentelemetry.job.log.LogStorageRetriever;
26+
import io.jenkins.plugins.opentelemetry.job.log.LogsQueryResult;
27+
import io.jenkins.plugins.opentelemetry.semconv.JenkinsOtelSemanticAttributes;
28+
import io.opentelemetry.api.trace.Span;
29+
import io.opentelemetry.api.trace.SpanContext;
30+
import io.opentelemetry.api.trace.Tracer;
31+
import io.opentelemetry.context.Scope;
32+
import net.sf.json.JSONArray;
33+
import org.apache.commons.lang.StringUtils;
34+
import org.apache.http.HttpHost;
35+
import org.apache.http.auth.AuthScope;
36+
import org.apache.http.auth.BasicUserPrincipal;
37+
import org.apache.http.auth.Credentials;
38+
import org.apache.http.impl.client.BasicCredentialsProvider;
39+
import org.elasticsearch.client.RestClient;
40+
import org.kohsuke.stapler.framework.io.ByteBuffer;
41+
42+
import javax.annotation.Nonnull;
43+
import javax.annotation.Nullable;
44+
import javax.annotation.PreDestroy;
45+
import java.io.Closeable;
46+
import java.io.IOException;
47+
import java.io.OutputStreamWriter;
48+
import java.io.Writer;
49+
import java.nio.charset.Charset;
50+
import java.nio.charset.StandardCharsets;
51+
import java.security.Principal;
52+
import java.util.Collections;
53+
import java.util.List;
54+
import java.util.Map;
55+
import java.util.NoSuchElementException;
56+
import java.util.logging.Level;
57+
import java.util.logging.Logger;
58+
59+
60+
/**
61+
* Retrieve the logs from Elasticsearch.
62+
* FIXME graceful shutdown
63+
*/
64+
public class ElasticsearchLogStorageRetriever implements LogStorageRetriever<ElasticsearchLogsQueryContext>, Closeable {
65+
/**
66+
* Field used by the Elastic-Otel mapping to store the {@link io.opentelemetry.sdk.logs.LogBuilder#setBody(String)}
67+
*/
68+
public static final String FIELD_MESSAGE = "message";
69+
/**
70+
* Mapping for {@link SpanContext#getTraceId()}
71+
*/
72+
public static final String FIELD_TRACE_ID = "trace.id";
73+
public static final String FIELD_TIMESTAMP = "@timestamp";
74+
75+
public static final Time POINT_IN_TIME_KEEP_ALIVE = Time.of(builder -> builder.time("30s"));
76+
public static final int PAGE_SIZE = 100; // FIXME
77+
public static final String INDEX_TEMPLATE_PATTERNS = "logs-apm.app-*";
78+
public static final String INDEX_TEMPLATE_NAME = "logs-apm.app";
79+
80+
private final static Logger logger = Logger.getLogger(ElasticsearchLogStorageRetriever.class.getName());
81+
82+
@Nonnull
83+
private final Template buildLogsVisualizationUrlTemplate;
84+
85+
private final TemplateBindingsProvider templateBindingsProvider;
86+
87+
@Nonnull
88+
private final ElasticsearchClient esClient;
89+
90+
private final Tracer tracer;
91+
92+
/**
93+
* TODO verify unsername:password auth vs apiKey auth
94+
*/
95+
public ElasticsearchLogStorageRetriever(
96+
String elasticsearchUrl, Credentials elasticsearchCredentials,
97+
Template buildLogsVisualizationUrlTemplate, TemplateBindingsProvider templateBindingsProvider ,
98+
Tracer tracer) {
99+
100+
if (StringUtils.isBlank(elasticsearchUrl)) {
101+
throw new IllegalArgumentException("Elasticsearch url cannot be blank");
102+
}
103+
104+
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
105+
credentialsProvider.setCredentials(AuthScope.ANY, elasticsearchCredentials);
106+
107+
RestClient restClient = RestClient.builder(HttpHost.create(elasticsearchUrl))
108+
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider))
109+
.build();
110+
RestClientTransport elasticsearchTransport = new RestClientTransport(restClient, new JacksonJsonpMapper());
111+
this.esClient = new ElasticsearchClient(elasticsearchTransport);
112+
this.tracer = tracer;
113+
114+
this.buildLogsVisualizationUrlTemplate = buildLogsVisualizationUrlTemplate;
115+
this.templateBindingsProvider = templateBindingsProvider;
116+
}
117+
118+
@Nonnull
119+
@Override
120+
public LogsQueryResult overallLog(@Nonnull String traceId, @Nonnull String spanId, boolean complete, @Nullable ElasticsearchLogsQueryContext context) throws IOException {
121+
// https://www.elastic.co/guide/en/elasticsearch/reference/7.17/point-in-time-api.html
122+
Span span = tracer.spanBuilder("elasticsearch.search")
123+
.startSpan();
124+
try (Scope scope = span.makeCurrent()) {
125+
126+
final Charset charset = StandardCharsets.UTF_8;
127+
final boolean completed;
128+
final List<Hit<ObjectNode>> hits;
129+
130+
final String pitId;
131+
final int pageNo;
132+
133+
if (context == null) {
134+
// Initial request: open a point in time to have consistent pagination results
135+
pitId = esClient.openPointInTime(pit -> pit.index(INDEX_TEMPLATE_PATTERNS).keepAlive(POINT_IN_TIME_KEEP_ALIVE)).id();
136+
pageNo = 0;
137+
} else if (context.pitId == null) { // FIXME verify this behaviour
138+
logger.log(Level.FINE, () -> "Reset Elasticsearch query for unexpected closed Point In Time");
139+
span.setAttribute("info", "Reset Elasticsearch query for unexpected closed Point In Time");
140+
pitId = esClient.openPointInTime(pit -> pit.index(INDEX_TEMPLATE_PATTERNS).keepAlive(POINT_IN_TIME_KEEP_ALIVE)).id();
141+
pageNo = 0;
142+
} else if (complete) {
143+
// FIXME check algorithm
144+
// Get PIT id from context but reset the page number because complete=true
145+
pitId = context.pitId;
146+
pageNo = 0;
147+
} else {
148+
// Get PIT id and page number from context
149+
pitId = context.pitId;
150+
pageNo = context.pageNo;
151+
}
152+
153+
span.setAttribute("query.index", INDEX_TEMPLATE_PATTERNS)
154+
.setAttribute("query.size", PAGE_SIZE)
155+
.setAttribute("pitId", pitId)
156+
.setAttribute("ci.pipeline.run.traceId", traceId)
157+
.setAttribute("ci.pipeline.run.spanId", spanId)
158+
.setAttribute("query.from", pageNo * PAGE_SIZE);
159+
160+
SearchRequest searchRequest = new SearchRequest.Builder()
161+
.pit(pit -> pit.id(pitId).keepAlive(POINT_IN_TIME_KEEP_ALIVE))
162+
.from(pageNo * PAGE_SIZE)
163+
.size(PAGE_SIZE)
164+
.sort(s -> s.field(f -> f.field(FIELD_TIMESTAMP).order(SortOrder.Asc)))
165+
.query(q -> q.match(m -> m.field(FIELD_TRACE_ID).query(FieldValue.of(traceId))))
166+
// .fields() TODO narrow down the list fields to retrieve - we probably have to look at a source filter
167+
.build();
168+
169+
logger.log(Level.FINE, "Retrieve logs for traceId: " + traceId);
170+
SearchResponse<ObjectNode> searchResponse = this.esClient.search(searchRequest, ObjectNode.class);
171+
hits = searchResponse.hits().hits();
172+
span.setAttribute("results", hits.size());
173+
completed = hits.size() != PAGE_SIZE; // TODO is there smarter?
174+
175+
if (completed) {
176+
logger.log(Level.FINE, () -> "Clear scrollId: " + pitId + " for trace: " + traceId + ", span: " + spanId);
177+
esClient.closePointInTime(p -> p.id(pitId));
178+
}
179+
180+
ByteBuffer byteBuffer = new ByteBuffer();
181+
182+
try (Writer w = new OutputStreamWriter(byteBuffer, charset)) {
183+
Map<String, String> bindings = TemplateBindingsProvider.compose(this.templateBindingsProvider, Collections.singletonMap("traceId", traceId)).getBindings();
184+
String logsVisualizationUrl = this.buildLogsVisualizationUrlTemplate.make(bindings).toString();
185+
if (pageNo == 0) {
186+
w.write("View logs in Kibana: " + logsVisualizationUrl + "\n\n");
187+
}
188+
writeOutput(w, hits);
189+
if (completed) {
190+
w.write("\n\nView logs on Kibana " + logsVisualizationUrl);
191+
}else {
192+
w.write("...\n\nView the rest of logs on Kibana " + logsVisualizationUrl);
193+
}
194+
}
195+
196+
String newPitId = completed ? null : pitId;
197+
logger.log(Level.FINE, () -> "overallLog(completed: " + completed + ", page: " + pageNo + ", written.length: " + byteBuffer.length() + ", pit.hash: " + pitId.hashCode() + ")");
198+
return new LogsQueryResult(
199+
byteBuffer, charset, completed,
200+
new ElasticsearchLogsQueryContext(newPitId, pageNo + 1)
201+
);
202+
} catch (IOException | RuntimeException e) {
203+
span.recordException(e);
204+
throw e;
205+
} finally {
206+
span.end();
207+
}
208+
}
209+
210+
/**
211+
* FIXME implement
212+
*/
213+
@Nonnull
214+
@Override
215+
public LogsQueryResult stepLog(@Nonnull String traceId, @Nonnull String spanId, @Nullable ElasticsearchLogsQueryContext logsQueryContext) throws IOException {
216+
throw new UnsupportedOperationException("Not yet implemented");
217+
}
218+
219+
private void writeOutput(Writer writer, List<Hit<ObjectNode>> hits) throws IOException {
220+
for (Hit<ObjectNode> hit : hits) {
221+
ObjectNode source = hit.source();
222+
ObjectNode labels = (ObjectNode) source.findValue("labels");
223+
//Retrieve the label message and annotations to show the formatted message in Jenkins.
224+
JsonNode messageAsJsonNode = source.findValue(FIELD_MESSAGE);
225+
if (messageAsJsonNode == null) {
226+
logger.log(Level.FINE, () -> "Skip log with no message (document id: " + hit.id() + ")");
227+
continue;
228+
}
229+
String message = messageAsJsonNode.asText();
230+
231+
JSONArray annotations;
232+
if (labels == null) {
233+
annotations = null;
234+
} else {
235+
JsonNode annotationsAsText = labels.get(JenkinsOtelSemanticAttributes.JENKINS_ANSI_ANNOTATIONS.getKey());
236+
if (annotationsAsText == null) {
237+
annotations = null;
238+
} else {
239+
annotations = JSONArray.fromObject(annotationsAsText.asText());
240+
}
241+
}
242+
logger.log(Level.FINER, () -> "Write: " + message + ", id: " + hit.id());
243+
ConsoleNotes.write(writer, message, annotations);
244+
}
245+
}
246+
247+
/**
248+
* FIXME returns false when true is expected. How to test
249+
* check if the configured index template exists.
250+
*
251+
* @return true if the index template exists.
252+
*/
253+
public boolean indexTemplateExists() throws IOException {
254+
ElasticsearchIndicesClient indicesClient = this.esClient.indices();
255+
return indicesClient.existsIndexTemplate(b -> b.name(INDEX_TEMPLATE_NAME)).value();
256+
}
257+
258+
@Override
259+
public void close() throws IOException {
260+
logger.log(Level.INFO, ()-> "Shutdown Elasticsearch client...");
261+
this.esClient.shutdown();
262+
}
263+
264+
@Override
265+
public String toString() {
266+
return "ElasticsearchLogStorageRetriever{" +
267+
"buildLogsVisualizationUrlTemplate=" + buildLogsVisualizationUrlTemplate +
268+
", templateBindingsProvider=" + templateBindingsProvider +
269+
'}';
270+
}
271+
272+
/**
273+
* FIXME optimize search
274+
*/
275+
public static Credentials getCredentials(String jenkinsCredentialsId) throws NoSuchElementException {
276+
final UsernamePasswordCredentials usernamePasswordCredentials = (UsernamePasswordCredentials) SystemCredentialsProvider.getInstance().getCredentials().stream()
277+
.filter(credentials ->
278+
(credentials instanceof UsernamePasswordCredentials)
279+
&& ((IdCredentials) credentials)
280+
.getId().equals(jenkinsCredentialsId))
281+
.findAny().get();
282+
283+
return new Credentials() {
284+
@Override
285+
public Principal getUserPrincipal() {
286+
return new BasicUserPrincipal(usernamePasswordCredentials.getUsername());
287+
}
288+
289+
@Override
290+
public String getPassword() {
291+
return usernamePasswordCredentials.getPassword().getPlainText();
292+
}
293+
};
294+
}
295+
}

src/main/java/io/jenkins/plugins/opentelemetry/backend/elastic/ElasticsearchLogsQueryContext.java

+2-24
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,5 @@
77

88
import io.jenkins.plugins.opentelemetry.job.log.LogsQueryContext;
99

10-
import java.util.Objects;
11-
12-
public class ElasticsearchLogsQueryContext implements LogsQueryContext {
13-
final String pitId;
14-
final int pageNo;
15-
16-
public ElasticsearchLogsQueryContext(String pitId, int pageNo) {
17-
this.pitId = pitId;
18-
this.pageNo = pageNo;
19-
}
20-
21-
@Override
22-
public boolean equals(Object o) {
23-
if (this == o) return true;
24-
if (o == null || getClass() != o.getClass()) return false;
25-
ElasticsearchLogsQueryContext that = (ElasticsearchLogsQueryContext) o;
26-
return Objects.equals(pitId, that.pitId) && pageNo == that.pageNo;
27-
}
28-
29-
@Override
30-
public int hashCode() {
31-
return Objects.hash(pitId, pageNo);
32-
}
33-
}
10+
public class ElasticsearchLogsQueryContext implements LogsQueryContext {
11+
}

0 commit comments

Comments
 (0)