Skip to content

Commit 295760d

Browse files
authored
KAFKA-18917: TransformValues throws NPE (apache#19089)
When `transformValues` is used with a `Materialized` instance, but without a queryable name, a `NullPointerException` is thrown. To preserve the semantics present in 3.9, we need to avoid materialization when a queryable name is not present. Reviewers: Bruno Cadonna <[email protected]>
1 parent 8533c43 commit 295760d

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,12 @@ private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySuppli
448448
valueSerde = materializedInternal.valueSerde();
449449
queryableStoreName = materializedInternal.queryableStoreName();
450450
// only materialize if materialized is specified and it has queryable name
451-
final StoreFactory storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null;
452-
storeBuilder = Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
451+
if (queryableStoreName != null) {
452+
final StoreFactory storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
453+
storeBuilder = Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory));
454+
} else {
455+
storeBuilder = null;
456+
}
453457
} else {
454458
keySerde = this.keySerde;
455459
valueSerde = null;

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,32 @@ public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized()
436436
new KeyValueTimestamp<>("A", "3", 15))));
437437
}
438438

439+
@Test
440+
public void shouldCalculateCorrectOldValuesIfNotStatefulEvenNotMaterializedNoQueryableName() {
441+
builder
442+
.table(INPUT_TOPIC, CONSUMED)
443+
.transformValues(new StatelessTransformerSupplier(),
444+
Materialized.with(Serdes.String(), Serdes.Integer())
445+
)
446+
.groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer()))
447+
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
448+
.mapValues(mapBackToStrings())
449+
.toStream()
450+
.process(capture);
451+
452+
driver = new TopologyTestDriver(builder.build(), props());
453+
final TestInputTopic<String, String> inputTopic =
454+
driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
455+
456+
inputTopic.pipeInput("A", "a", 5L);
457+
inputTopic.pipeInput("A", "aa", 15L);
458+
inputTopic.pipeInput("A", "aaa", 10);
459+
460+
assertThat(output(), equalTo(Arrays.asList(new KeyValueTimestamp<>("A", "1", 5),
461+
new KeyValueTimestamp<>("A", "2", 15),
462+
new KeyValueTimestamp<>("A", "3", 15))));
463+
}
464+
439465
private ArrayList<KeyValueTimestamp<String, String>> output() {
440466
return capture.capturedProcessors(1).get(0).processed();
441467
}

0 commit comments

Comments
 (0)