1
1
/*
2
- * Copyright 2016-2021 the original author or authors.
2
+ * Copyright 2016-2022 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -154,6 +154,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
154
154
155
155
private long maxAge ;
156
156
157
+ private boolean configureSerializers = true ;
158
+
157
159
private volatile String transactionIdPrefix ;
158
160
159
161
private volatile String clientIdPrefix ;
@@ -183,16 +185,37 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
183
185
@ Nullable Serializer <K > keySerializer ,
184
186
@ Nullable Serializer <V > valueSerializer ) {
185
187
186
- this (configs , () -> keySerializer , () -> valueSerializer );
188
+ this (configs , () -> keySerializer , () -> valueSerializer , true );
187
189
}
188
190
189
191
/**
190
- * Construct a factory with the provided configuration and {@link Serializer} Suppliers.
191
- * Also configures a {@link #transactionIdPrefix} as a value from the
192
- * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
193
- * This config is going to be overridden with a suffix for target {@link Producer} instance.
194
- * When the suppliers are invoked to get an instance, the serializers'
195
- * {@code configure()} methods will be called with the configuration map.
192
+ * Construct a factory with the provided configuration and {@link Serializer}s. Also
193
+ * configures a {@link #transactionIdPrefix} as a value from the
194
+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
195
+ * be overridden with a suffix for target {@link Producer} instance. The serializers'
196
+ * {@code configure()} methods will be called with the configuration map unless
197
+ * {@code configureSerializers} is false..
198
+ * @param configs the configuration.
199
+ * @param keySerializer the key {@link Serializer}.
200
+ * @param valueSerializer the value {@link Serializer}.
201
+ * @param configureSerializers set to false if serializers are already fully
202
+ * configured.
203
+ * @since 2.8.7
204
+ */
205
+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
206
+ @ Nullable Serializer <K > keySerializer ,
207
+ @ Nullable Serializer <V > valueSerializer , boolean configureSerializers ) {
208
+
209
+ this (configs , () -> keySerializer , () -> valueSerializer , configureSerializers );
210
+ }
211
+
212
+ /**
213
+ * Construct a factory with the provided configuration and {@link Serializer}
214
+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
215
+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
216
+ * be overridden with a suffix for target {@link Producer} instance. When the
217
+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
218
+ * methods will be called with the configuration map.
196
219
* @param configs the configuration.
197
220
* @param keySerializerSupplier the key {@link Serializer} supplier function.
198
221
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
@@ -202,7 +225,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
202
225
@ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
203
226
@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
204
227
228
+ this (configs , keySerializerSupplier , valueSerializerSupplier , true );
229
+ }
230
+
231
+ /**
232
+ * Construct a factory with the provided configuration and {@link Serializer}
233
+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
234
+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
235
+ * be overridden with a suffix for target {@link Producer} instance. When the
236
+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
237
+ * methods will be called with the configuration map unless
238
+ * {@code configureSerializers} is false.
239
+ * @param configs the configuration.
240
+ * @param keySerializerSupplier the key {@link Serializer} supplier function.
241
+ * @param valueSerializerSupplier the value {@link Serializer} supplier function.
242
+ * @param configureSerializers set to false if serializers are already fully
243
+ * configured.
244
+ * @since 2.8.7
245
+ */
246
+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
247
+ @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
248
+ @ Nullable Supplier <Serializer <V >> valueSerializerSupplier , boolean configureSerializers ) {
249
+
205
250
this .configs = new ConcurrentHashMap <>(configs );
251
+ this .configureSerializers = configureSerializers ;
206
252
this .keySerializerSupplier = keySerializerSupplier (keySerializerSupplier );
207
253
this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
208
254
if (this .clientIdPrefix == null && configs .get (ProducerConfig .CLIENT_ID_CONFIG ) instanceof String ) {
@@ -217,6 +263,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
217
263
218
264
private Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
219
265
this .rawKeySerializerSupplier = keySerializerSupplier ;
266
+ if (!this .configureSerializers ) {
267
+ return keySerializerSupplier ;
268
+ }
220
269
return keySerializerSupplier == null
221
270
? () -> null
222
271
: () -> {
@@ -230,6 +279,9 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
230
279
231
280
private Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
232
281
this .rawValueSerializerSupplier = valueSerializerSupplier ;
282
+ if (!this .configureSerializers ) {
283
+ return valueSerializerSupplier ;
284
+ }
233
285
return valueSerializerSupplier == null
234
286
? () -> null
235
287
: () -> {
@@ -252,37 +304,79 @@ public void setBeanName(String name) {
252
304
}
253
305
254
306
/**
255
- * Set a key serializer.
307
+ * Set a key serializer. The serializer will be configured using the producer
308
+ * configuration, unless {@link #setConfigureSerializers(boolean)
309
+ * configureSerializers} is false.
256
310
* @param keySerializer the key serializer.
311
+ * @see #setConfigureSerializers(boolean)
257
312
*/
258
313
public void setKeySerializer (@ Nullable Serializer <K > keySerializer ) {
259
314
this .keySerializerSupplier = keySerializerSupplier (() -> keySerializer );
260
315
}
261
316
262
317
/**
263
- * Set a value serializer.
318
+ * Set a value serializer. The serializer will be configured using the producer
319
+ * configuration, unless {@link #setConfigureSerializers(boolean)
320
+ * configureSerializers} is false.
264
321
* @param valueSerializer the value serializer.
322
+ * @see #setConfigureSerializers(boolean)
265
323
*/
266
324
public void setValueSerializer (@ Nullable Serializer <V > valueSerializer ) {
267
325
this .valueSerializerSupplier = valueSerializerSupplier (() -> valueSerializer );
268
326
}
269
327
270
328
/**
271
- * Set a supplier to supply instances of the key serializer.
329
+ * Set a supplier to supply instances of the key serializer. The serializer will be
330
+ * configured using the producer configuration, unless
331
+ * {@link #setConfigureSerializers(boolean) configureSerializers} is false.
272
332
* @param keySerializerSupplier the supplier.
273
333
* @since 2.8
334
+ * @see #setConfigureSerializers(boolean)
274
335
*/
275
336
public void setKeySerializerSupplier (Supplier <Serializer <K >> keySerializerSupplier ) {
276
- this .keySerializerSupplier = keySerializerSupplier ;
337
+ this .keySerializerSupplier = keySerializerSupplier ( keySerializerSupplier ) ;
277
338
}
278
339
279
340
/**
280
341
* Set a supplier to supply instances of the value serializer.
281
- * @param valueSerializerSupplier the supplier.
342
+ * @param valueSerializerSupplier the supplier. The serializer will be configured
343
+ * using the producer configuration, unless {@link #setConfigureSerializers(boolean)
344
+ * configureSerializers} is false.
282
345
* @since 2.8
346
+ * @see #setConfigureSerializers(boolean)
283
347
*/
284
348
public void setValueSerializerSupplier (Supplier <Serializer <V >> valueSerializerSupplier ) {
285
- this .valueSerializerSupplier = valueSerializerSupplier ;
349
+ this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
350
+ }
351
+
352
+ /**
353
+ * If true (default), programmatically provided serializers (via constructor or
354
+ * setters) will be configured using the producer configuration. Set to false if the
355
+ * serializers are already fully configured.
356
+ * @return true to configure.
357
+ * @since 2.8.7
358
+ * @see #setKeySerializer(Serializer)
359
+ * @see #setKeySerializerSupplier(Supplier)
360
+ * @see #setValueSerializer(Serializer)
361
+ * @see #setValueSerializerSupplier(Supplier)
362
+ */
363
+ public boolean isConfigureSerializers () {
364
+ return this .configureSerializers ;
365
+ }
366
+
367
+ /**
368
+ * Set to false (default true) to prevent programmatically provided serializers (via
369
+ * constructor or setters) from being configured using the producer configuration,
370
+ * e.g. if the serializers are already fully configured.
371
+ * @param configureSerializers false to not configure.
372
+ * @since 2.8.7
373
+ * @see #setKeySerializer(Serializer)
374
+ * @see #setKeySerializerSupplier(Supplier)
375
+ * @see #setValueSerializer(Serializer)
376
+ * @see #setValueSerializerSupplier(Supplier)
377
+ */
378
+ public void setConfigureSerializers (boolean configureSerializers ) {
379
+ this .configureSerializers = configureSerializers ;
286
380
}
287
381
288
382
/**
@@ -441,10 +535,10 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
441
535
Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
442
536
producerProperties .putAll (overrideProperties );
443
537
producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
444
- DefaultKafkaProducerFactory <K , V > newFactory =
445
- new DefaultKafkaProducerFactory <>(producerProperties ,
538
+ DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
446
539
getKeySerializerSupplier (),
447
- getValueSerializerSupplier ());
540
+ getValueSerializerSupplier (),
541
+ isConfigureSerializers ());
448
542
newFactory .setPhysicalCloseTimeout ((int ) getPhysicalCloseTimeout ().getSeconds ());
449
543
newFactory .setProducerPerConsumerPartition (isProducerPerConsumerPartition ());
450
544
newFactory .setProducerPerThread (isProducerPerThread ());
0 commit comments