19
19
import java .io .InputStream ;
20
20
import java .io .InputStreamReader ;
21
21
import java .io .OutputStreamWriter ;
22
+ import java .util .Arrays ;
22
23
import java .util .Map ;
23
24
import java .util .Optional ;
24
25
import java .util .Random ;
26
+ import java .util .function .Consumer ;
27
+ import java .util .function .Predicate ;
25
28
29
+ import com .fasterxml .jackson .core .JsonPointer ;
26
30
import com .fasterxml .jackson .core .JsonProcessingException ;
31
+ import com .fasterxml .jackson .databind .JsonNode ;
27
32
import org .apache .logging .log4j .Level ;
28
33
import org .apache .logging .log4j .LogManager ;
29
34
import org .apache .logging .log4j .Logger ;
34
39
import org .aspectj .lang .annotation .Around ;
35
40
import org .aspectj .lang .annotation .Aspect ;
36
41
import org .aspectj .lang .annotation .Pointcut ;
42
+ import software .amazon .lambda .powertools .logging .CorrelationIdPath ;
37
43
import software .amazon .lambda .powertools .logging .Logging ;
44
+ import software .amazon .lambda .powertools .logging .LoggingUtils ;
38
45
39
46
import static java .nio .charset .StandardCharsets .UTF_8 ;
40
47
import static java .util .Optional .empty ;
47
54
import static software .amazon .lambda .powertools .core .internal .LambdaHandlerProcessor .placedOnRequestHandler ;
48
55
import static software .amazon .lambda .powertools .core .internal .LambdaHandlerProcessor .placedOnStreamHandler ;
49
56
import static software .amazon .lambda .powertools .core .internal .LambdaHandlerProcessor .serviceName ;
57
+ import static software .amazon .lambda .powertools .logging .CorrelationIdPath .AUTO_DETECT ;
58
+ import static software .amazon .lambda .powertools .logging .CorrelationIdPath .DISABLED ;
50
59
import static software .amazon .lambda .powertools .logging .LoggingUtils .appendKey ;
51
60
import static software .amazon .lambda .powertools .logging .LoggingUtils .appendKeys ;
52
61
import static software .amazon .lambda .powertools .logging .LoggingUtils .objectMapper ;
@@ -94,6 +103,10 @@ public Object around(ProceedingJoinPoint pjp,
94
103
proceedArgs = logEvent (pjp );
95
104
}
96
105
106
+ if (logging .correlationIdPath () != DISABLED ) {
107
+ proceedArgs = captureCorrelationId (logging .correlationIdPath (), pjp );
108
+ }
109
+
97
110
Object proceed = pjp .proceed (proceedArgs );
98
111
99
112
coldStartDone ();
@@ -160,18 +173,69 @@ private Object[] logEvent(final ProceedingJoinPoint pjp) {
160
173
return args ;
161
174
}
162
175
163
- private Object [] logFromInputStream (final ProceedingJoinPoint pjp ) {
176
+ private Object [] captureCorrelationId (final CorrelationIdPath correlationIdPath ,
177
+ final ProceedingJoinPoint pjp ) {
164
178
Object [] args = pjp .getArgs ();
179
+ if (isHandlerMethod (pjp )) {
180
+ if (placedOnRequestHandler (pjp )) {
181
+ Object arg = pjp .getArgs ()[0 ];
182
+ JsonNode jsonNode = objectMapper ().valueToTree (arg );
165
183
166
- try (ByteArrayOutputStream out = new ByteArrayOutputStream ();
167
- OutputStreamWriter writer = new OutputStreamWriter (out , UTF_8 );
168
- InputStreamReader reader = new InputStreamReader ((InputStream ) pjp .getArgs ()[0 ], UTF_8 )) {
184
+ setCorrelationIdFromNode (correlationIdPath , pjp , jsonNode );
169
185
170
- IOUtils .copy (reader , writer );
171
- writer .flush ();
172
- byte [] bytes = out .toByteArray ();
173
- args [0 ] = new ByteArrayInputStream (bytes );
186
+ return args ;
187
+ }
188
+
189
+ if (placedOnStreamHandler (pjp )) {
190
+ try {
191
+ byte [] bytes = bytesFromInputStreamSafely ((InputStream ) pjp .getArgs ()[0 ]);
192
+ JsonNode jsonNode = objectMapper ().readTree (bytes );
193
+ args [0 ] = new ByteArrayInputStream (bytes );
194
+
195
+ setCorrelationIdFromNode (correlationIdPath , pjp , jsonNode );
196
+
197
+ return args ;
198
+ } catch (IOException e ) {
199
+ Logger log = logger (pjp );
200
+ log .warn ("Failed to capture correlation id on event from supplied input stream." , e );
201
+ }
202
+ }
203
+ }
204
+
205
+ return args ;
206
+ }
207
+
208
+ private void setCorrelationIdFromNode (CorrelationIdPath correlationIdPath , ProceedingJoinPoint pjp , JsonNode jsonNode ) {
209
+ if (correlationIdPath == AUTO_DETECT ) {
210
+ autoDetect (pjp , jsonNode );
211
+ } else {
212
+ JsonNode node = jsonNode .at (JsonPointer .compile (correlationIdPath .getPath ()));
213
+ LoggingUtils .setCorrelationId (node .asText ());
214
+ }
215
+ }
216
+
217
+ private void autoDetect (ProceedingJoinPoint pjp ,
218
+ JsonNode jsonNode ) {
219
+ Arrays .stream (CorrelationIdPath .values ())
220
+ .filter (path -> path != AUTO_DETECT && path != DISABLED )
221
+ .forEach (correlationIdPath1 -> {
222
+ JsonNode node = jsonNode .at (JsonPointer .compile (correlationIdPath1 .getPath ()));
223
+ String asText = node .asText ();
224
+
225
+ if (null != asText && !asText .isEmpty ()) {
226
+ logger (pjp ).debug ("Auto detected correlation id from event type: {}" , correlationIdPath1 );
227
+ LoggingUtils .setCorrelationId (asText );
228
+ }
229
+ });
230
+ }
231
+
232
+
233
+ private Object [] logFromInputStream (final ProceedingJoinPoint pjp ) {
234
+ Object [] args = pjp .getArgs ();
174
235
236
+ try {
237
+ byte [] bytes = bytesFromInputStreamSafely ((InputStream ) pjp .getArgs ()[0 ]);
238
+ args [0 ] = new ByteArrayInputStream (bytes );
175
239
Logger log = logger (pjp );
176
240
177
241
asJson (pjp , objectMapper ().readValue (bytes , Map .class ))
@@ -185,6 +249,17 @@ private Object[] logFromInputStream(final ProceedingJoinPoint pjp) {
185
249
return args ;
186
250
}
187
251
252
+ private byte [] bytesFromInputStreamSafely (final InputStream inputStream ) throws IOException {
253
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream ();
254
+ InputStreamReader reader = new InputStreamReader (inputStream )) {
255
+ OutputStreamWriter writer = new OutputStreamWriter (out , UTF_8 );
256
+
257
+ IOUtils .copy (reader , writer );
258
+ writer .flush ();
259
+ return out .toByteArray ();
260
+ }
261
+ }
262
+
188
263
private Optional <String > asJson (final ProceedingJoinPoint pjp ,
189
264
final Object target ) {
190
265
try {
0 commit comments