Skip to content

Commit af4277e

Browse files
committed
Attempt to mitigate KafkaDslTests without timestamp
1 parent 5e3d8d8 commit af4277e

File tree

1 file changed

+2
-8
lines changed
  • spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl

1 file changed

+2
-8
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,6 @@ void testKafkaAdapters() throws Exception {
193193
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
194194
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
195195
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
196-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
197-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048633L);
198196
assertThat(headers.get("foo")).isEqualTo("bar");
199197
}
200198

@@ -211,8 +209,6 @@ void testKafkaAdapters() throws Exception {
211209
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
212210
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
213211
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
214-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
215-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048644L);
216212
}
217213

218214
Message<String> message = MessageBuilder.withPayload("BAR").setHeader(KafkaHeaders.TOPIC, TEST_TOPIC2).build();
@@ -360,8 +356,7 @@ public IntegrationFlow sendToKafkaFlow(
360356
.enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo"))
361357
.publishSubscribeChannel(c -> c
362358
.subscribe(sf -> sf.handle(
363-
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
364-
.timestampExpression("T(Long).valueOf('1487694048633')"),
359+
kafkaMessageHandler(producerFactory(), TEST_TOPIC1),
365360
e -> e.id("kafkaProducer1")))
366361
.subscribe(sf -> sf.handle(kafkaMessageHandlerTopic2, e -> e.id("kafkaProducer2")))
367362
);
@@ -370,8 +365,7 @@ public IntegrationFlow sendToKafkaFlow(
370365
@Bean
371366
public KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandlerTopic2() {
372367
return kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
373-
.flush(msg -> true)
374-
.timestamp(m -> 1487694048644L);
368+
.flush(msg -> true);
375369
}
376370

377371
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(

0 commit comments

Comments
 (0)