Skip to content

Commit b60f4e2

Browse files
authored
Add context argument to LogRecordProcessor#onEmit (#4889)
* Add context argument to LogRecordProcessor#onEmit * Change argument order
1 parent 2e9deb4 commit b60f4e2

14 files changed

+99
-33
lines changed

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordProcessor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.api.logs.LogRecordBuilder;
99
import io.opentelemetry.api.logs.Logger;
10+
import io.opentelemetry.context.Context;
1011
import io.opentelemetry.sdk.common.CompletableResultCode;
1112
import java.io.Closeable;
1213
import java.util.ArrayList;
@@ -51,9 +52,11 @@ static LogRecordProcessor composite(Iterable<LogRecordProcessor> processors) {
5152
/**
5253
* Called when a {@link Logger} {@link LogRecordBuilder#emit()}s a log record.
5354
*
55+
* @param context the context set via {@link LogRecordBuilder#setContext(Context)}, or {@link
56+
* Context#current()} if not explicitly set
5457
* @param logRecord the log record
5558
*/
56-
void onEmit(ReadWriteLogRecord logRecord);
59+
void onEmit(Context context, ReadWriteLogRecord logRecord);
5760

5861
/**
5962
* Shutdown the log processor.

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.sdk.logs;
77

8+
import io.opentelemetry.context.Context;
89
import io.opentelemetry.sdk.common.CompletableResultCode;
910
import java.util.ArrayList;
1011
import java.util.List;
@@ -33,9 +34,9 @@ static LogRecordProcessor create(List<LogRecordProcessor> logRecordProcessorsLis
3334
}
3435

3536
@Override
36-
public void onEmit(ReadWriteLogRecord logRecord) {
37+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
3738
for (LogRecordProcessor logRecordProcessor : logRecordProcessors) {
38-
logRecordProcessor.onEmit(logRecord);
39+
logRecordProcessor.onEmit(context, logRecord);
3940
}
4041
}
4142

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessor.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.sdk.logs;
77

8+
import io.opentelemetry.context.Context;
9+
810
final class NoopLogRecordProcessor implements LogRecordProcessor {
911
private static final NoopLogRecordProcessor INSTANCE = new NoopLogRecordProcessor();
1012

@@ -15,5 +17,5 @@ static LogRecordProcessor getInstance() {
1517
private NoopLogRecordProcessor() {}
1618

1719
@Override
18-
public void onEmit(ReadWriteLogRecord logRecord) {}
20+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {}
1921
}

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.opentelemetry.api.logs.LogRecordBuilder;
1111
import io.opentelemetry.api.logs.Severity;
1212
import io.opentelemetry.api.trace.Span;
13-
import io.opentelemetry.api.trace.SpanContext;
1413
import io.opentelemetry.context.Context;
1514
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1615
import io.opentelemetry.sdk.internal.AttributesMap;
@@ -27,7 +26,7 @@ final class SdkLogRecordBuilder implements EventBuilder {
2726

2827
private final InstrumentationScopeInfo instrumentationScopeInfo;
2928
private long epochNanos;
30-
private SpanContext spanContext = SpanContext.getInvalid();
29+
@Nullable private Context context;
3130
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
3231
@Nullable private String severityText;
3332
private Body body = Body.empty();
@@ -54,7 +53,7 @@ public SdkLogRecordBuilder setEpoch(Instant instant) {
5453

5554
@Override
5655
public SdkLogRecordBuilder setContext(Context context) {
57-
this.spanContext = Span.fromContext(context).getSpanContext();
56+
this.context = context;
5857
return this;
5958
}
6059

@@ -95,15 +94,17 @@ public void emit() {
9594
if (loggerSharedState.hasBeenShutdown()) {
9695
return;
9796
}
97+
Context context = this.context == null ? Context.current() : this.context;
9898
loggerSharedState
9999
.getLogRecordProcessor()
100100
.onEmit(
101+
context,
101102
SdkReadWriteLogRecord.create(
102103
loggerSharedState.getLogLimits(),
103104
loggerSharedState.getResource(),
104105
instrumentationScopeInfo,
105106
this.epochNanos == 0 ? this.loggerSharedState.getClock().now() : this.epochNanos,
106-
spanContext,
107+
Span.fromContext(context).getSpanContext(),
107108
severity,
108109
severityText,
109110
body,

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import io.opentelemetry.api.logs.LogRecordBuilder;
1111
import io.opentelemetry.api.logs.Logger;
12+
import io.opentelemetry.context.Context;
1213
import io.opentelemetry.sdk.common.Clock;
1314
import io.opentelemetry.sdk.logs.data.LogRecordData;
1415
import io.opentelemetry.sdk.resources.Resource;
@@ -57,8 +58,9 @@ public SdkLoggerProviderBuilder setLogLimits(Supplier<LogLimits> logLimitsSuppli
5758
}
5859

5960
/**
60-
* Add a log processor. {@link LogRecordProcessor#onEmit(ReadWriteLogRecord)} will be called each
61-
* time a log is emitted by {@link Logger} instances obtained from the {@link SdkLoggerProvider}.
61+
* Add a log processor. {@link LogRecordProcessor#onEmit(Context, ReadWriteLogRecord)} will be
62+
* called each time a log is emitted by {@link Logger} instances obtained from the {@link
63+
* SdkLoggerProvider}.
6264
*
6365
* @param processor the log processor
6466
* @return this

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.opentelemetry.api.metrics.LongCounter;
1111
import io.opentelemetry.api.metrics.Meter;
1212
import io.opentelemetry.api.metrics.MeterProvider;
13+
import io.opentelemetry.context.Context;
1314
import io.opentelemetry.sdk.common.CompletableResultCode;
1415
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
1516
import io.opentelemetry.sdk.logs.LogRecordProcessor;
@@ -81,7 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
8182
}
8283

8384
@Override
84-
public void onEmit(ReadWriteLogRecord logRecord) {
85+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
8586
if (logRecord == null) {
8687
return;
8788
}

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static java.util.Objects.requireNonNull;
99

10+
import io.opentelemetry.context.Context;
1011
import io.opentelemetry.sdk.common.CompletableResultCode;
1112
import io.opentelemetry.sdk.logs.LogRecordProcessor;
1213
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
@@ -58,7 +59,7 @@ public static LogRecordProcessor create(LogRecordExporter exporter) {
5859
}
5960

6061
@Override
61-
public void onEmit(ReadWriteLogRecord logRecord) {
62+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
6263
try {
6364
List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData());
6465
CompletableResultCode result = logRecordExporter.export(logs);

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessorTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.mockito.Mockito.verify;
1111
import static org.mockito.Mockito.when;
1212

13+
import io.opentelemetry.context.Context;
1314
import io.opentelemetry.sdk.common.CompletableResultCode;
1415
import org.junit.jupiter.api.BeforeEach;
1516
import org.junit.jupiter.api.Test;
@@ -39,7 +40,7 @@ void setup() {
3940
void empty() {
4041
LogRecordProcessor multiLogRecordProcessor = LogRecordProcessor.composite();
4142
assertThat(multiLogRecordProcessor).isInstanceOf(NoopLogRecordProcessor.class);
42-
multiLogRecordProcessor.onEmit(logRecord);
43+
multiLogRecordProcessor.onEmit(Context.current(), logRecord);
4344
multiLogRecordProcessor.shutdown();
4445
}
4546

@@ -53,9 +54,10 @@ void oneLogRecordProcessor() {
5354
void twoLogRecordProcessor() {
5455
LogRecordProcessor multiLogRecordProcessor =
5556
LogRecordProcessor.composite(logRecordProcessor1, logRecordProcessor2);
56-
multiLogRecordProcessor.onEmit(logRecord);
57-
verify(logRecordProcessor1).onEmit(same(logRecord));
58-
verify(logRecordProcessor2).onEmit(same(logRecord));
57+
Context context = Context.current();
58+
multiLogRecordProcessor.onEmit(context, logRecord);
59+
verify(logRecordProcessor1).onEmit(same(context), same(logRecord));
60+
verify(logRecordProcessor2).onEmit(same(context), same(logRecord));
5961

6062
multiLogRecordProcessor.forceFlush();
6163
verify(logRecordProcessor1).forceFlush();

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessorTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
99

10+
import io.opentelemetry.context.Context;
1011
import org.junit.jupiter.api.Test;
1112
import org.junit.jupiter.api.extension.ExtendWith;
1213
import org.mockito.Mock;
@@ -20,7 +21,7 @@ class NoopLogRecordProcessorTest {
2021
@Test
2122
void noCrash() {
2223
LogRecordProcessor logRecordProcessor = NoopLogRecordProcessor.getInstance();
23-
logRecordProcessor.onEmit(logRecord);
24+
logRecordProcessor.onEmit(Context.current(), logRecord);
2425
assertThat(logRecordProcessor.forceFlush().isSuccess()).isEqualTo(true);
2526
assertThat(logRecordProcessor.shutdown().isSuccess()).isEqualTo(true);
2627
}

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class SdkLogRecordBuilderTest {
4747
@BeforeEach
4848
void setup() {
4949
when(loggerSharedState.getLogLimits()).thenReturn(LogLimits.getDefault());
50-
when(loggerSharedState.getLogRecordProcessor()).thenReturn(emittedLog::set);
50+
when(loggerSharedState.getLogRecordProcessor())
51+
.thenReturn((context, logRecord) -> emittedLog.set(logRecord));
5152
when(loggerSharedState.getResource()).thenReturn(RESOURCE);
5253
when(loggerSharedState.getClock()).thenReturn(Clock.getDefault());
5354

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderTest.java

+50-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat;
99
import static org.assertj.core.api.Assertions.as;
10+
import static org.assertj.core.api.Assertions.entry;
1011
import static org.mockito.Mockito.mock;
1112
import static org.mockito.Mockito.times;
1213
import static org.mockito.Mockito.verify;
@@ -21,6 +22,8 @@
2122
import io.opentelemetry.api.trace.TraceFlags;
2223
import io.opentelemetry.api.trace.TraceState;
2324
import io.opentelemetry.context.Context;
25+
import io.opentelemetry.context.ContextKey;
26+
import io.opentelemetry.context.Scope;
2427
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
2528
import io.opentelemetry.sdk.common.Clock;
2629
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -29,6 +32,7 @@
2932
import io.opentelemetry.sdk.resources.Resource;
3033
import java.util.ArrayList;
3134
import java.util.List;
35+
import java.util.Optional;
3236
import java.util.concurrent.TimeUnit;
3337
import java.util.concurrent.atomic.AtomicReference;
3438
import org.assertj.core.api.InstanceOfAssertFactories;
@@ -222,7 +226,7 @@ void loggerBuilder_WithLogRecordProcessor() {
222226
SdkLoggerProvider.builder()
223227
.setResource(resource)
224228
.addLogRecordProcessor(
225-
logRecord -> {
229+
(unused, logRecord) -> {
226230
logRecord.setAttribute(null, null);
227231
// Overwrite k1
228232
logRecord.setAttribute(AttributeKey.stringKey("k1"), "new-v1");
@@ -262,6 +266,50 @@ void loggerBuilder_WithLogRecordProcessor() {
262266
Attributes.builder().put("k1", "new-v1").put("k2", "v2").put("k3", "v3").build());
263267
}
264268

269+
@Test
270+
void loggerBuilder_ProcessorWithContext() {
271+
ContextKey<String> contextKey = ContextKey.named("my-context-key");
272+
AtomicReference<LogRecordData> logRecordData = new AtomicReference<>();
273+
274+
sdkLoggerProvider =
275+
SdkLoggerProvider.builder()
276+
.addLogRecordProcessor(
277+
(context, logRecord) ->
278+
logRecord.setAttribute(
279+
AttributeKey.stringKey("my-context-key"),
280+
Optional.ofNullable(context.get(contextKey)).orElse("")))
281+
.addLogRecordProcessor(
282+
(unused, logRecord) -> logRecordData.set(logRecord.toLogRecordData()))
283+
.build();
284+
285+
// With implicit context
286+
try (Scope unused = Context.current().with(contextKey, "context-value1").makeCurrent()) {
287+
sdkLoggerProvider
288+
.loggerBuilder("test")
289+
.build()
290+
.logRecordBuilder()
291+
.setBody("log message1")
292+
.emit();
293+
}
294+
assertThat(logRecordData.get())
295+
.hasBody("log message1")
296+
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value1"));
297+
298+
// With explicit context
299+
try (Scope unused = Context.current().with(contextKey, "context-value2").makeCurrent()) {
300+
sdkLoggerProvider
301+
.loggerBuilder("test")
302+
.build()
303+
.logRecordBuilder()
304+
.setContext(Context.current())
305+
.setBody("log message2")
306+
.emit();
307+
}
308+
assertThat(logRecordData.get())
309+
.hasBody("log message2")
310+
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value2"));
311+
}
312+
265313
@Test
266314
void forceFlush() {
267315
sdkLoggerProvider.forceFlush();
@@ -288,7 +336,7 @@ void canSetClock() {
288336
Clock clock = mock(Clock.class);
289337
when(clock.now()).thenReturn(now);
290338
List<ReadWriteLogRecord> seenLogs = new ArrayList<>();
291-
logRecordProcessor = seenLogs::add;
339+
logRecordProcessor = (context, logRecord) -> seenLogs.add(logRecord);
292340
sdkLoggerProvider =
293341
SdkLoggerProvider.builder()
294342
.setClock(clock)

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ void logRecordBuilder() {
4646
LoggerSharedState state = mock(LoggerSharedState.class);
4747
InstrumentationScopeInfo info = InstrumentationScopeInfo.create("foo");
4848
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
49-
LogRecordProcessor logRecordProcessor = seenLog::set;
49+
LogRecordProcessor logRecordProcessor = (context, logRecord) -> seenLog.set(logRecord);
5050
Clock clock = mock(Clock.class);
5151
when(clock.now()).thenReturn(5L);
5252

@@ -69,7 +69,7 @@ void logRecordBuilder_maxAttributeLength() {
6969
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
7070
SdkLoggerProvider loggerProvider =
7171
SdkLoggerProvider.builder()
72-
.addLogRecordProcessor(seenLog::set)
72+
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
7373
.setLogLimits(() -> LogLimits.builder().setMaxAttributeValueLength(maxLength).build())
7474
.build();
7575
LogRecordBuilder logRecordBuilder = loggerProvider.get("test").logRecordBuilder();
@@ -109,7 +109,7 @@ void logRecordBuilder_maxAttributes() {
109109
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
110110
SdkLoggerProvider loggerProvider =
111111
SdkLoggerProvider.builder()
112-
.addLogRecordProcessor(seenLog::set)
112+
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
113113
.setLogLimits(
114114
() -> LogLimits.builder().setMaxNumberOfAttributes(maxNumberOfAttrs).build())
115115
.build();
@@ -140,15 +140,17 @@ void logRecordBuilder_AfterShutdown() {
140140
loggerProvider.shutdown().join(10, TimeUnit.SECONDS);
141141
loggerProvider.get("test").logRecordBuilder().emit();
142142

143-
verify(logRecordProcessor, never()).onEmit(any());
143+
verify(logRecordProcessor, never()).onEmit(any(), any());
144144
}
145145

146146
@Test
147147
@SuppressLogger(loggerName = API_USAGE_LOGGER_NAME)
148148
void eventBuilder() {
149149
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
150150
SdkLoggerProvider loggerProvider =
151-
SdkLoggerProvider.builder().addLogRecordProcessor(seenLog::set).build();
151+
SdkLoggerProvider.builder()
152+
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
153+
.build();
152154

153155
// Emit event from logger with name and add event domain
154156
loggerProvider

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ void ignoresNullLogs() {
291291
BatchLogRecordProcessor processor =
292292
BatchLogRecordProcessor.builder(mockLogRecordExporter).build();
293293
try {
294-
assertThatCode(() -> processor.onEmit(null)).doesNotThrowAnyException();
294+
assertThatCode(() -> processor.onEmit(null, null)).doesNotThrowAnyException();
295295
} finally {
296296
processor.shutdown();
297297
}

0 commit comments

Comments
 (0)