1
1
/*
2
- * Copyright 2017-2022 the original author or authors.
2
+ * Copyright 2017-2023 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
20
20
import java .nio .ByteBuffer ;
21
21
import java .nio .charset .StandardCharsets ;
22
22
import java .util .Arrays ;
23
+ import java .util .Collections ;
23
24
import java .util .HashMap ;
24
- import java .util .HashSet ;
25
25
import java .util .LinkedHashSet ;
26
26
import java .util .List ;
27
27
import java .util .Map ;
31
31
import org .apache .kafka .common .header .Headers ;
32
32
import org .apache .kafka .common .header .internals .RecordHeader ;
33
33
34
- import org .springframework .lang .Nullable ;
35
34
import org .springframework .messaging .MessageHeaders ;
36
35
import org .springframework .util .Assert ;
37
36
import org .springframework .util .ClassUtils ;
38
37
import org .springframework .util .MimeType ;
39
38
40
39
import com .fasterxml .jackson .core .JsonProcessingException ;
40
+ import com .fasterxml .jackson .core .type .TypeReference ;
41
41
import com .fasterxml .jackson .databind .DeserializationContext ;
42
42
import com .fasterxml .jackson .databind .JsonNode ;
43
43
import com .fasterxml .jackson .databind .ObjectMapper ;
@@ -63,26 +63,23 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
63
63
64
64
private static final String JAVA_LANG_STRING = "java.lang.String" ;
65
65
66
- private static final Set <String > TRUSTED_ARRAY_TYPES =
67
- new HashSet <>(Arrays .asList (
66
+ private static final Set <String > TRUSTED_ARRAY_TYPES = Set .of (
68
67
"[B" ,
69
68
"[I" ,
70
69
"[J" ,
71
70
"[F" ,
72
71
"[D" ,
73
72
"[C"
74
- )) ;
73
+ );
75
74
76
- private static final List <String > DEFAULT_TRUSTED_PACKAGES =
77
- Arrays .asList (
75
+ private static final List <String > DEFAULT_TRUSTED_PACKAGES = List .of (
78
76
"java.lang" ,
79
77
"java.net" ,
80
78
"java.util" ,
81
79
"org.springframework.util"
82
80
);
83
81
84
- private static final List <String > DEFAULT_TO_STRING_CLASSES =
85
- Arrays .asList (
82
+ private static final List <String > DEFAULT_TO_STRING_CLASSES = List .of (
86
83
"org.springframework.util.MimeType" ,
87
84
"org.springframework.http.MediaType"
88
85
);
@@ -142,7 +139,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
142
139
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
143
140
*/
144
141
public DefaultKafkaHeaderMapper (String ... patterns ) {
145
- this (new ObjectMapper (), patterns );
142
+ this (JacksonUtils . enhancedObjectMapper (), patterns );
146
143
}
147
144
148
145
/**
@@ -222,7 +219,7 @@ protected boolean isEncodeStrings() {
222
219
}
223
220
224
221
/**
225
- * Set to true to encode String-valued headers as JSON ("..."), by default just the
222
+ * Set to true to encode String-valued headers as JSON string ("..."), by default just the
226
223
* raw String value is converted to a byte array using the configured charset. Set to
227
224
* true if a consumer of the outbound record is using Spring for Apache Kafka version
228
225
* less than 2.3
@@ -234,8 +231,15 @@ public void setEncodeStrings(boolean encodeStrings) {
234
231
}
235
232
236
233
/**
237
- * Add packages to the trusted packages list (default {@code java.util, java.lang}) used
234
+ * Add packages to the trusted packages list used
238
235
* when constructing objects from JSON.
236
+ * By default, the following packages are trusted:
237
+ * <ul>
238
+ * <li>java.lang</li>
239
+ * <li>java.net</li>
240
+ * <li>java.util</li>
241
+ * <li>org.springframework.util</li>
242
+ * </ul>
239
243
* If any of the supplied packages is {@code "*"}, all packages are trusted.
240
244
* If a class for a non-trusted package is encountered, the header is returned to the
241
245
* application with value of type {@link NonTrustedHeaderType}.
@@ -286,20 +290,19 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
286
290
}
287
291
if (!encodeToJson && valueToAdd instanceof String ) {
288
292
target .add (new RecordHeader (key , ((String ) valueToAdd ).getBytes (getCharset ())));
289
- className = JAVA_LANG_STRING ;
290
293
}
291
294
else {
292
295
target .add (new RecordHeader (key , headerObjectMapper .writeValueAsBytes (valueToAdd )));
293
296
}
294
297
jsonHeaders .put (key , className );
295
298
}
296
299
catch (Exception e ) {
297
- logger .debug (e , () -> "Could not map " + key + " with type " + rawValue .getClass ().getName ());
300
+ logger .error (e , () -> "Could not map " + key + " with type " + rawValue .getClass ().getName ());
298
301
}
299
302
}
300
303
}
301
304
});
302
- if (jsonHeaders .size () > 0 ) {
305
+ if (! jsonHeaders .isEmpty () ) {
303
306
try {
304
307
target .add (new RecordHeader (JSON_TYPES , headerObjectMapper .writeValueAsBytes (jsonHeaders )));
305
308
}
@@ -321,7 +324,7 @@ else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(head
321
324
headers .put (headerName , new String (header .value (), getCharset ()));
322
325
}
323
326
else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
324
- if (jsonTypes != null && jsonTypes .containsKey (headerName )) {
327
+ if (jsonTypes .containsKey (headerName )) {
325
328
String requestedType = jsonTypes .get (headerName );
326
329
populateJsonValueHeader (header , requestedType , headers );
327
330
}
@@ -355,8 +358,7 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
355
358
}
356
359
catch (IOException e ) {
357
360
logger .error (e , () ->
358
- "Could not decode json type: " + new String (header .value ()) + " for key: "
359
- + header .key ());
361
+ "Could not decode json type: " + requestedType + " for key: " + header .key ());
360
362
headers .put (header .key (), header .value ());
361
363
}
362
364
}
@@ -385,18 +387,16 @@ private Object decodeValue(Header h, Class<?> type) throws IOException, LinkageE
385
387
return value ;
386
388
}
387
389
388
- @ SuppressWarnings ("unchecked" )
389
- @ Nullable
390
390
private Map <String , String > decodeJsonTypes (Headers source ) {
391
- Map <String , String > types = null ;
391
+ Map <String , String > types = Collections . emptyMap () ;
392
392
Header jsonTypes = source .lastHeader (JSON_TYPES );
393
393
if (jsonTypes != null ) {
394
394
ObjectMapper headerObjectMapper = getObjectMapper ();
395
395
try {
396
- types = headerObjectMapper .readValue (jsonTypes .value (), Map . class );
396
+ types = headerObjectMapper .readValue (jsonTypes .value (), new TypeReference <>() { } );
397
397
}
398
398
catch (IOException e ) {
399
- logger .error (e , () -> "Could not decode json types: " + new String (jsonTypes .value ()));
399
+ logger .error (e , () -> "Could not decode json types: " + new String (jsonTypes .value (), StandardCharsets . UTF_8 ));
400
400
}
401
401
}
402
402
return types ;
0 commit comments