1
1
/*
2
- * Copyright 2019-2021 the original author or authors.
2
+ * Copyright 2019-2022 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
35
35
import org .apache .kafka .common .serialization .BytesSerializer ;
36
36
import org .apache .kafka .common .serialization .IntegerDeserializer ;
37
37
import org .apache .kafka .common .serialization .IntegerSerializer ;
38
+ import org .apache .kafka .common .serialization .Serializer ;
38
39
import org .apache .kafka .common .serialization .StringDeserializer ;
39
40
import org .apache .kafka .common .serialization .StringSerializer ;
40
41
import org .apache .kafka .common .utils .Bytes ;
45
46
46
47
/**
47
48
* @author Gary Russell
49
+ * @author Artem Bilan
50
+ *
48
51
* @since 2.3
49
52
*
50
53
*/
@@ -121,16 +124,16 @@ void testWithPropertyConfigKeys() {
121
124
private void doTest (DelegatingSerializer serializer , DelegatingDeserializer deserializer ) {
122
125
Headers headers = new RecordHeaders ();
123
126
headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "bytes" .getBytes ()));
124
- byte [] bytes = new byte [] { 1 , 2 , 3 , 4 };
127
+ byte [] bytes = new byte []{ 1 , 2 , 3 , 4 };
125
128
byte [] serialized = serializer .serialize ("foo" , headers , new Bytes (bytes ));
126
129
assertThat (serialized ).isSameAs (bytes );
127
130
headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "int" .getBytes ()));
128
131
serialized = serializer .serialize ("foo" , headers , 42 );
129
- assertThat (serialized ).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
132
+ assertThat (serialized ).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
130
133
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo (42 );
131
134
headers .add (new RecordHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "string" .getBytes ()));
132
135
serialized = serializer .serialize ("foo" , headers , "bar" );
133
- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
136
+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
134
137
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
135
138
136
139
// implicit Serdes
@@ -151,25 +154,25 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese
151
154
Collections .singletonMap (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR , "string" ));
152
155
new DefaultKafkaHeaderMapper ().fromHeaders (messageHeaders , headers );
153
156
assertThat (headers .lastHeader (DelegatingSerializer .VALUE_SERIALIZATION_SELECTOR ).value ())
154
- .isEqualTo (new byte [] { 's' , 't' , 'r' , 'i' , 'n' , 'g' });
157
+ .isEqualTo (new byte []{ 's' , 't' , 'r' , 'i' , 'n' , 'g' });
155
158
serialized = serializer .serialize ("foo" , headers , "bar" );
156
- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
159
+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
157
160
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
158
161
}
159
162
160
163
private void doTestKeys (DelegatingSerializer serializer , DelegatingDeserializer deserializer ) {
161
164
Headers headers = new RecordHeaders ();
162
165
headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "bytes" .getBytes ()));
163
- byte [] bytes = new byte [] { 1 , 2 , 3 , 4 };
166
+ byte [] bytes = new byte []{ 1 , 2 , 3 , 4 };
164
167
byte [] serialized = serializer .serialize ("foo" , headers , new Bytes (bytes ));
165
168
assertThat (serialized ).isSameAs (bytes );
166
169
headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "int" .getBytes ()));
167
170
serialized = serializer .serialize ("foo" , headers , 42 );
168
- assertThat (serialized ).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
171
+ assertThat (serialized ).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
169
172
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo (42 );
170
173
headers .add (new RecordHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "string" .getBytes ()));
171
174
serialized = serializer .serialize ("foo" , headers , "bar" );
172
- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
175
+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
173
176
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
174
177
175
178
// implicit Serdes
@@ -190,9 +193,9 @@ private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer
190
193
Collections .singletonMap (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR , "string" ));
191
194
new DefaultKafkaHeaderMapper ().fromHeaders (messageHeaders , headers );
192
195
assertThat (headers .lastHeader (DelegatingSerializer .KEY_SERIALIZATION_SELECTOR ).value ())
193
- .isEqualTo (new byte [] { 's' , 't' , 'r' , 'i' , 'n' , 'g' });
196
+ .isEqualTo (new byte []{ 's' , 't' , 'r' , 'i' , 'n' , 'g' });
194
197
serialized = serializer .serialize ("foo" , headers , "bar" );
195
- assertThat (serialized ).isEqualTo (new byte [] { 'b' , 'a' , 'r' });
198
+ assertThat (serialized ).isEqualTo (new byte []{ 'b' , 'a' , 'r' });
196
199
assertThat (deserializer .deserialize ("foo" , headers , serialized )).isEqualTo ("bar" );
197
200
}
198
201
@@ -218,21 +221,24 @@ void byTypeBadType() {
218
221
assertThatExceptionOfType (SerializationException .class ).isThrownBy (
219
222
() -> serializer .serialize ("foo" , new Bytes (foo )))
220
223
.withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
221
- + "; supported types: \\ [(java.lang.String, \\ [B|\\ [B, java.lang.String)\\ ]" );
224
+ + "; supported types: \\ [(java.lang.String, \\ [B|\\ [B, java.lang.String)]" );
222
225
}
223
226
224
227
@ Test
225
228
void assignable () {
226
- DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer (Map .of (Number .class ,
227
- new IntegerSerializer (), byte [].class , new ByteArraySerializer ()), true );
229
+ var delegates = new HashMap <Class <?>, Serializer <?>>();
230
+ delegates .put (Number .class , new IntegerSerializer ());
231
+ delegates .put (byte [].class , new ByteArraySerializer ());
232
+ DelegatingByTypeSerializer serializer = new DelegatingByTypeSerializer (delegates , true );
233
+
228
234
Integer i = 42 ;
229
- assertThat (serializer .serialize ("foo" , i )).isEqualTo (new byte [] { 0 , 0 , 0 , 42 });
235
+ assertThat (serializer .serialize ("foo" , i )).isEqualTo (new byte []{ 0 , 0 , 0 , 42 });
230
236
byte [] foo = "foo" .getBytes ();
231
237
assertThat (serializer .serialize ("foo" , foo )).isSameAs (foo );
232
238
assertThatExceptionOfType (SerializationException .class ).isThrownBy (
233
- () -> serializer .serialize ("foo" , new Bytes (foo )))
234
- .withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
235
- + "; supported types: \\ [(java.lang.Number, \\ [B|\\ [B, java.lang.Number)\\ ]" );
239
+ () -> serializer .serialize ("foo" , new Bytes (foo )))
240
+ .withMessageMatching ("No matching delegate for type: " + Bytes .class .getName ()
241
+ + "; supported types: \\ [(java.lang.Number, \\ [B|\\ [B, java.lang.Number)]" );
236
242
}
237
243
238
244
}
0 commit comments