Skip to content

Commit cfe642e

Browse files
authored
KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4 (apache#10765)
As specified in KIP-743, this PR removes the built-in metrics in Streams that are superseded by the refactoring proposed in KIP-444. Reviewers: Guozhang Wang <[email protected]>, Luke Chen <[email protected]>
1 parent 054d5f9 commit cfe642e

File tree

73 files changed

+788
-2947
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+788
-2947
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

-6
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,6 @@ public class StreamsConfig extends AbstractConfig {
324324
@SuppressWarnings("WeakerAccess")
325325
public static final String EXACTLY_ONCE_V2 = "exactly_once_v2";
326326

327-
/**
328-
* Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for built-in metrics from version 0.10.0. to 2.4
329-
*/
330-
public static final String METRICS_0100_TO_24 = "0.10.0-2.4";
331-
332327
/**
333328
* Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for the latest built-in metrics version.
334329
*/
@@ -753,7 +748,6 @@ public class StreamsConfig extends AbstractConfig {
753748
Type.STRING,
754749
METRICS_LATEST,
755750
in(
756-
METRICS_0100_TO_24,
757751
METRICS_LATEST
758752
),
759753
Importance.LOW,

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
31+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3232
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3333

3434
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
@@ -66,7 +66,7 @@ private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
6666
@Override
6767
public void init(final ProcessorContext context) {
6868
super.init(context);
69-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
69+
droppedRecordsSensor = droppedRecordsSensor(
7070
Thread.currentThread().getName(),
7171
context.taskId().toString(),
7272
(StreamsMetricsImpl) context.metrics());

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.util.Optional;
3939

4040
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
41-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
41+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
4242

4343
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
4444
private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
@@ -91,7 +91,7 @@ private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
9191
public void init(final ProcessorContext context) {
9292
super.init(context);
9393
metrics = (StreamsMetricsImpl) context.metrics();
94-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
94+
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
9595
otherWindowStore = context.getStateStore(otherWindowName);
9696

9797
if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
28+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
2929
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3030

3131
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
@@ -52,7 +52,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
5252
public void init(final ProcessorContext context) {
5353
super.init(context);
5454
metrics = (StreamsMetricsImpl) context.metrics();
55-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
55+
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
5656
valueGetter.init(context);
5757
}
5858

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
30+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3131
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3232

3333
public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
@@ -62,7 +62,7 @@ private class KStreamReduceProcessor extends AbstractProcessor<K, V> {
6262
@Override
6363
public void init(final ProcessorContext context) {
6464
super.init(context);
65-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
65+
droppedRecordsSensor = droppedRecordsSensor(
6666
Thread.currentThread().getName(),
6767
context.taskId().toString(),
6868
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.streams.processor.AbstractProcessor;
2828
import org.apache.kafka.streams.processor.Processor;
2929
import org.apache.kafka.streams.processor.ProcessorContext;
30-
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
3130
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3231
import org.apache.kafka.streams.state.KeyValueIterator;
3332
import org.apache.kafka.streams.state.SessionStore;
@@ -38,8 +37,7 @@
3837
import java.util.ArrayList;
3938
import java.util.List;
4039

41-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
42-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
40+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
4341

4442
public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
4543
private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
@@ -91,13 +89,12 @@ public void init(final ProcessorContext context) {
9189
super.init(context);
9290
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
9391
final String threadId = Thread.currentThread().getName();
94-
lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
92+
lateRecordDropSensor = droppedRecordsSensor(
9593
threadId,
9694
context.taskId().toString(),
97-
((InternalProcessorContext) context).currentNode().name(),
9895
metrics
9996
);
100-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics);
97+
droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
10198
store = context.getStateStore(storeName);
10299
tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues);
103100
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
import java.util.HashSet;
3838
import java.util.Set;
3939

40-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
41-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
40+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
4241
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
4342

4443
public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
@@ -89,13 +88,12 @@ public void init(final ProcessorContext context) {
8988
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
9089
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
9190
final String threadId = Thread.currentThread().getName();
92-
lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
91+
lateRecordDropSensor = droppedRecordsSensor(
9392
threadId,
9493
context.taskId().toString(),
95-
internalProcessorContext.currentNode().name(),
9694
metrics
9795
);
98-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
96+
droppedRecordsSensor = droppedRecordsSensor(
9997
threadId,
10098
context.taskId().toString(),
10199
metrics

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@
3535

3636
import java.util.Map;
3737

38-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
39-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
38+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
4039
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
4140

4241
public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
@@ -87,13 +86,12 @@ public void init(final ProcessorContext context) {
8786
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
8887
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
8988
final String threadId = Thread.currentThread().getName();
90-
lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
89+
lateRecordDropSensor = droppedRecordsSensor(
9190
threadId,
9291
context.taskId().toString(),
93-
internalProcessorContext.currentNode().name(),
9492
metrics
9593
);
96-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
94+
droppedRecordsSensor = droppedRecordsSensor(
9795
threadId,
9896
context.taskId().toString(),
9997
metrics

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
31+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3232
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3333

3434
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
@@ -76,7 +76,7 @@ private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>>
7676
@Override
7777
public void init(final ProcessorContext context) {
7878
super.init(context);
79-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
79+
droppedRecordsSensor = droppedRecordsSensor(
8080
Thread.currentThread().getName(),
8181
context.taskId().toString(),
8282
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
30+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3131
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
3232
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3333

@@ -75,7 +75,7 @@ private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<
7575
@Override
7676
public void init(final ProcessorContext context) {
7777
super.init(context);
78-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
78+
droppedRecordsSensor = droppedRecordsSensor(
7979
Thread.currentThread().getName(),
8080
context.taskId().toString(),
8181
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
30+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3131
import static org.apache.kafka.streams.processor.internals.RecordQueue.UNKNOWN;
3232
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3333

@@ -74,7 +74,7 @@ private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change
7474
@Override
7575
public void init(final ProcessorContext context) {
7676
super.init(context);
77-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
77+
droppedRecordsSensor = droppedRecordsSensor(
7878
Thread.currentThread().getName(),
7979
context.taskId().toString(),
8080
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
30+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3131
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
3232

3333
class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
@@ -73,7 +73,7 @@ private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change
7373
@Override
7474
public void init(final ProcessorContext context) {
7575
super.init(context);
76-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(
76+
droppedRecordsSensor = droppedRecordsSensor(
7777
Thread.currentThread().getName(),
7878
context.taskId().toString(),
7979
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
import java.util.Objects;
3131

32-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
32+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3333

3434
public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
3535
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@@ -84,7 +84,7 @@ private class KTableSourceProcessor extends AbstractProcessor<K, V> {
8484
public void init(final ProcessorContext context) {
8585
super.init(context);
8686
metrics = (StreamsMetricsImpl) context.metrics();
87-
droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
87+
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
8888
if (queryableName != null) {
8989
store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
9090
tupleForwarder = new TimestampedTupleForwarder<>(

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private final class KTableKTableJoinProcessor extends AbstractProcessor<KO, Chan
6464
public void init(final ProcessorContext context) {
6565
super.init(context);
6666
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
67-
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
67+
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
6868
Thread.currentThread().getName(),
6969
internalProcessorContext.taskId().toString(),
7070
internalProcessorContext.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void init(final ProcessorContext context) {
8888
if (valueSerializer == null) {
8989
valueSerializer = (Serializer<V>) context.valueSerde().serializer();
9090
}
91-
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
91+
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
9292
Thread.currentThread().getName(),
9393
context.taskId().toString(),
9494
(StreamsMetricsImpl) context.metrics()

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void init(final ProcessorContext context) {
6262
super.init(context);
6363
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
6464

65-
droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(
65+
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
6666
Thread.currentThread().getName(),
6767
internalProcessorContext.taskId().toString(),
6868
internalProcessorContext.metrics()

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Set;
3232

33-
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
33+
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
3434

3535
/**
3636
* Updates the state for all Global State Stores.
@@ -76,7 +76,7 @@ public Map<TopicPartition, Long> initialize() {
7676
source,
7777
deserializationExceptionHandler,
7878
logContext,
79-
droppedRecordsSensorOrSkippedRecordsSensor(
79+
droppedRecordsSensor(
8080
Thread.currentThread().getName(),
8181
processorContext.taskId().toString(),
8282
processorContext.metrics()

0 commit comments

Comments
 (0)