34
34
import org .springframework .messaging .MessageHeaders ;
35
35
import org .springframework .util .Assert ;
36
36
import org .springframework .util .ClassUtils ;
37
+ import org .springframework .util .MimeType ;
37
38
38
39
import com .fasterxml .jackson .core .JsonProcessingException ;
40
+ import com .fasterxml .jackson .databind .DeserializationContext ;
41
+ import com .fasterxml .jackson .databind .JsonNode ;
39
42
import com .fasterxml .jackson .databind .ObjectMapper ;
43
+ import com .fasterxml .jackson .databind .deser .std .StdNodeBasedDeserializer ;
44
+ import com .fasterxml .jackson .databind .module .SimpleModule ;
45
+ import com .fasterxml .jackson .databind .type .TypeFactory ;
40
46
41
47
/**
42
48
* Default header mapper for Apache Kafka.
46
52
* Header types are added to a special header {@link #JSON_TYPES}.
47
53
*
48
54
* @author Gary Russell
55
+ * @author Artem Bilan
56
+ *
49
57
* @since 1.3
50
58
*
51
59
*/
@@ -54,7 +62,8 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
54
62
private static final List <String > DEFAULT_TRUSTED_PACKAGES =
55
63
Arrays .asList (
56
64
"java.util" ,
57
- "java.lang"
65
+ "java.lang" ,
66
+ "org.springframework.util"
58
67
);
59
68
60
69
private static final List <String > DEFAULT_TO_STRING_CLASSES =
@@ -136,6 +145,8 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
136
145
Assert .notNull (objectMapper , "'objectMapper' must not be null" );
137
146
Assert .noNullElements (patterns , "'patterns' must not have null elements" );
138
147
this .objectMapper = objectMapper ;
148
+ this .objectMapper
149
+ .registerModule (new SimpleModule ().addDeserializer (MimeType .class , new MimeTypeJsonDeserializer ()));
139
150
}
140
151
141
152
/**
@@ -233,7 +244,6 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
233
244
}
234
245
}
235
246
236
- @ SuppressWarnings ("unchecked" )
237
247
@ Override
238
248
public void toHeaders (Headers source , final Map <String , Object > headers ) {
239
249
final Map <String , String > jsonTypes = decodeJsonTypes (source );
@@ -257,7 +267,8 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
257
267
headers .put (h .key (), getObjectMapper ().readValue (h .value (), type ));
258
268
}
259
269
catch (IOException e ) {
260
- logger .error ("Could not decode json type: " + new String (h .value ()) + " for key: " + h .key (),
270
+ logger .error ("Could not decode json type: " + new String (h .value ()) + " for key: " + h
271
+ .key (),
261
272
e );
262
273
headers .put (h .key (), h .value ());
263
274
}
@@ -310,6 +321,34 @@ protected boolean trusted(String requestedType) {
310
321
return true ;
311
322
}
312
323
324
+
325
+ /**
326
+ * The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.
327
+ * It is presented here for backward compatibility when older producers send {@link MimeType}
328
+ * headers as serialization version.
329
+ */
330
+ private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer <MimeType > {
331
+
332
+ private static final long serialVersionUID = 1L ;
333
+
334
+ MimeTypeJsonDeserializer () {
335
+ super (MimeType .class );
336
+ }
337
+
338
+ @ Override
339
+ public MimeType convert (JsonNode root , DeserializationContext ctxt ) throws IOException {
340
+ JsonNode type = root .get ("type" );
341
+ JsonNode subType = root .get ("subtype" );
342
+ JsonNode parameters = root .get ("parameters" );
343
+ Map <String , String > params =
344
+ DefaultKafkaHeaderMapper .this .objectMapper .readValue (parameters .traverse (),
345
+ TypeFactory .defaultInstance ()
346
+ .constructMapType (HashMap .class , String .class , String .class ));
347
+ return new MimeType (type .asText (), subType .asText (), params );
348
+ }
349
+
350
+ }
351
+
313
352
/**
314
353
* Represents a header that could not be decoded due to an untrusted type.
315
354
*/
0 commit comments