21
21
import java .util .function .Consumer ;
22
22
import java .util .function .Function ;
23
23
24
- import org .apache .kafka .clients .producer .ProducerRecord ;
25
-
26
24
import org .springframework .expression .Expression ;
27
25
import org .springframework .expression .common .LiteralExpression ;
28
26
import org .springframework .integration .dsl .ComponentsRegistration ;
@@ -239,8 +237,8 @@ public S timestampExpression(Expression timestampExpression) {
239
237
}
240
238
241
239
/**
242
- * Configure a SpEL expression to determine whether or not to flush the producer after
243
- * a send. By default the producer is flushed if a header {@code kafka_flush} has a
240
+ * Configure a SpEL expression to determine whether to flush the producer after
241
+ * a send. By default, the producer is flushed if a header {@code kafka_flush} has a
244
242
* value {@link Boolean#TRUE}.
245
243
* @param flushExpression the timestamp expression to use.
246
244
* @return the spec.
@@ -251,8 +249,8 @@ public S flushExpression(String flushExpression) {
251
249
252
250
/**
253
251
* Configure a {@link Function} that will be invoked at runtime to determine whether
254
- * or not to flush the producer after a send. By default the producer is flushed if a
255
- * header {@code kafka_flush} has a value {@link Boolean#TRUE}. Typically used with a
252
+ * to flush the producer after send. By default, the producer is flushed if a
253
+ * header {@code kafka_flush} has a value {@link Boolean#TRUE}. Typically, used with a
256
254
* Java 8 Lambda expression:
257
255
* <pre class="code">
258
256
* {@code
@@ -268,8 +266,8 @@ public <P> S flush(Function<Message<P>, Boolean> flushFunction) {
268
266
}
269
267
270
268
/**
271
- * Configure an {@link Expression} to determine whether or not to flush the producer
272
- * after a send. By default the producer is flushed if a header {@code kafka_flush}
269
+ * Configure an {@link Expression} to determine whether to flush the producer
270
+ * after a send. By default, the producer is flushed if a header {@code kafka_flush}
273
271
* has a value {@link Boolean#TRUE}.
274
272
* @param flushExpression the timestamp expression to use.
275
273
* @return the spec.
@@ -293,7 +291,7 @@ public S sync(boolean sync) {
293
291
294
292
/**
295
293
* Specify a timeout in milliseconds how long {@link KafkaProducerMessageHandler}
296
- * should wait wait for send operation results. Defaults to 10 seconds.
294
+ * should wait for send operation results. Defaults to 10 seconds.
297
295
* @param sendTimeout the timeout to wait for result fo send operation.
298
296
* @return the spec.
299
297
*/
@@ -379,8 +377,8 @@ public S futuresChannel(String futuresChannel) {
379
377
}
380
378
381
379
/**
382
- * Set a {@link ProducerRecordCreator} to create the {@link ProducerRecord}. Ignored
383
- * if {@link #useTemplateConverter(boolean) useTemplateConverter} is true.
380
+ * Set a {@link ProducerRecordCreator} to create the {@link org.apache.kafka.clients.producer. ProducerRecord}.
381
+ * Ignored if {@link #useTemplateConverter(boolean) useTemplateConverter} is true.
384
382
* @param creator the creator.
385
383
* @return the spec.
386
384
* @since 5.5.5
@@ -392,7 +390,7 @@ public S producerRecordCreator(ProducerRecordCreator<K, V> creator) {
392
390
393
391
/**
394
392
* Set to true to use the template's message converter to create the
395
- * {@link ProducerRecord} instead of the
393
+ * {@link org.apache.kafka.clients.producer. ProducerRecord} instead of the
396
394
* {@link #producerRecordCreator(ProducerRecordCreator) producerRecordCreator}.
397
395
* @param use true to use the converter.
398
396
* @return the spec.
@@ -409,7 +407,8 @@ public S useTemplateConverter(boolean use) {
409
407
* @param <K> the key type.
410
408
* @param <V> the value type.
411
409
*/
412
- public static class KafkaProducerMessageHandlerTemplateSpec <K , V > extends KafkaProducerMessageHandlerSpec <K , V , KafkaProducerMessageHandlerTemplateSpec <K , V >>
410
+ public static class KafkaProducerMessageHandlerTemplateSpec <K , V >
411
+ extends KafkaProducerMessageHandlerSpec <K , V , KafkaProducerMessageHandlerTemplateSpec <K , V >>
413
412
implements ComponentsRegistration {
414
413
415
414
private final KafkaTemplateSpec <K , V > kafkaTemplateSpec ;
@@ -428,6 +427,7 @@ public static class KafkaProducerMessageHandlerTemplateSpec<K, V> extends KafkaP
428
427
*/
429
428
public KafkaProducerMessageHandlerTemplateSpec <K , V > configureKafkaTemplate (
430
429
Consumer <KafkaTemplateSpec <K , V >> configurer ) {
430
+
431
431
Assert .notNull (configurer , "The 'configurer' cannot be null" );
432
432
configurer .accept (this .kafkaTemplateSpec );
433
433
return _this ();
0 commit comments