Skip to content

Commit 83f4f2f

Browse files
committed
remove inflight expire batch judge
1 parent 73afcc9 commit 83f4f2f

File tree

3 files changed

+273
-41
lines changed

3 files changed

+273
-41
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.producer;
18+
19+
import org.apache.kafka.clients.ApiVersions;
20+
import org.apache.kafka.clients.ClientUtils;
21+
import org.apache.kafka.clients.CommonClientConfigs;
22+
import org.apache.kafka.clients.KafkaClient;
23+
import org.apache.kafka.clients.producer.internals.BufferPool;
24+
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
25+
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
26+
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
27+
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
28+
import org.apache.kafka.clients.producer.internals.Sender;
29+
import org.apache.kafka.common.compress.NoCompression;
30+
import org.apache.kafka.common.internals.ClusterResourceListeners;
31+
import org.apache.kafka.common.internals.Plugin;
32+
import org.apache.kafka.common.metrics.Metrics;
33+
import org.apache.kafka.common.metrics.MetricsReporter;
34+
import org.apache.kafka.common.metrics.Sensor;
35+
import org.apache.kafka.common.serialization.ByteArraySerializer;
36+
import org.apache.kafka.common.serialization.Serializer;
37+
import org.apache.kafka.common.test.ClusterInstance;
38+
import org.apache.kafka.common.test.TestUtils;
39+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
40+
import org.apache.kafka.common.test.api.ClusterTest;
41+
import org.apache.kafka.common.utils.KafkaThread;
42+
import org.apache.kafka.common.utils.LogContext;
43+
import org.apache.kafka.common.utils.Time;
44+
45+
import java.nio.ByteBuffer;
46+
import java.time.Duration;
47+
import java.util.Arrays;
48+
import java.util.Collections;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.Optional;
52+
import java.util.concurrent.ExecutionException;
53+
54+
55+
public class ProducerIntegrationTest {
56+
57+
@ClusterTest(serverProperties = {
58+
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
59+
})
60+
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
61+
ExecutionException {
62+
String topic = "test-topic";
63+
cluster.createTopic("test-topic", 1, (short) 1);
64+
try (var producer = expireProducer(cluster)) {
65+
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
66+
}
67+
try (var consumer = cluster.consumer()) {
68+
consumer.subscribe(List.of(topic));
69+
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
70+
}
71+
72+
}
73+
74+
75+
@SuppressWarnings({"unchecked", "this-escape"})
76+
private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
77+
Map<String, Object> config = Map.of(
78+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
79+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
80+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
81+
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
82+
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
83+
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
84+
);
85+
return new EvilKafkaProducerBuilder().build(config);
86+
}
87+
88+
static class EvilKafkaProducerBuilder {
89+
90+
Serializer<byte[]> serializer = new ByteArraySerializer();
91+
ApiVersions apiVersions = new ApiVersions();
92+
LogContext logContext = new LogContext("[expire Producer test ]");
93+
Metrics metrics = new Metrics(Time.SYSTEM);
94+
95+
String clientId;
96+
String transactionalId;
97+
ProducerConfig config;
98+
ProducerMetadata metadata;
99+
RecordAccumulator accumulator;
100+
Partitioner partitioner;
101+
Sender sender;
102+
ProducerInterceptors<String, String> interceptors;
103+
104+
@SuppressWarnings({"unchecked", "this-escape"})
105+
Producer<byte[], byte[]> build(Map<String, Object> configs) {
106+
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
107+
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
108+
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
109+
return new KafkaProducer<>(
110+
config,
111+
logContext,
112+
metrics,
113+
serializer,
114+
serializer,
115+
buildMetadata(),
116+
buildAccumulator(),
117+
null,
118+
buildSender(),
119+
buildInterceptors(),
120+
buildPartition(),
121+
Time.SYSTEM,
122+
ioThread(),
123+
Optional.empty()
124+
);
125+
}
126+
127+
128+
private ProducerInterceptors buildInterceptors() {
129+
this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
130+
return this.interceptors;
131+
}
132+
133+
private Partitioner buildPartition() {
134+
this.partitioner = config.getConfiguredInstance(
135+
ProducerConfig.PARTITIONER_CLASS_CONFIG,
136+
Partitioner.class,
137+
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
138+
return this.partitioner;
139+
}
140+
141+
private Sender buildSender() {
142+
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
143+
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
144+
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
145+
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
146+
KafkaClient client = ClientUtils.createNetworkClient(config,
147+
this.metrics,
148+
"producer",
149+
logContext,
150+
apiVersions,
151+
Time.SYSTEM,
152+
maxInflightRequests,
153+
metadata,
154+
throttleTimeSensor,
155+
null);
156+
157+
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
158+
this.sender = new Sender(logContext,
159+
client,
160+
metadata,
161+
this.accumulator,
162+
maxInflightRequests == 1,
163+
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
164+
acks,
165+
config.getInt(ProducerConfig.RETRIES_CONFIG),
166+
metricsRegistry.senderMetrics,
167+
Time.SYSTEM,
168+
requestTimeoutMs,
169+
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
170+
null) {
171+
@Override
172+
protected long sendProducerData(long now) {
173+
long result = super.sendProducerData(now);
174+
try {
175+
// Ensure the batch expires.
176+
Thread.sleep(500);
177+
return result;
178+
} catch (InterruptedException e) {
179+
throw new RuntimeException(e);
180+
}
181+
}
182+
};
183+
return this.sender;
184+
}
185+
186+
private RecordAccumulator buildAccumulator() {
187+
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
188+
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
189+
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
190+
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(
191+
config.getConfiguredInstance(
192+
ProducerConfig.PARTITIONER_CLASS_CONFIG,
193+
Partitioner.class,
194+
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
195+
metrics,
196+
ProducerConfig.PARTITIONER_CLASS_CONFIG);
197+
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
198+
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
199+
this.accumulator = new RecordAccumulator(logContext,
200+
batchSize,
201+
NoCompression.NONE,
202+
(int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE),
203+
retryBackoffMs,
204+
retryBackoffMaxMs,
205+
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
206+
new RecordAccumulator.PartitionerConfig(
207+
enableAdaptivePartitioning,
208+
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
209+
),
210+
metrics,
211+
"producer-metrics",
212+
Time.SYSTEM,
213+
null,
214+
new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics,
215+
Time.SYSTEM, "producer-metrics"));
216+
return accumulator;
217+
}
218+
219+
private ProducerMetadata buildMetadata() {
220+
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
221+
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
222+
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
223+
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
224+
List.of(),
225+
reporters,
226+
List.of(
227+
Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(),
228+
Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
229+
this.metadata = new ProducerMetadata(retryBackoffMs,
230+
retryBackoffMaxMs,
231+
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
232+
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
233+
logContext,
234+
clusterResourceListeners,
235+
Time.SYSTEM);
236+
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
237+
return metadata;
238+
}
239+
240+
private KafkaThread ioThread() {
241+
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
242+
ioThread.start();
243+
return ioThread;
244+
}
245+
}
246+
247+
static class EvilBufferPool extends BufferPool {
248+
249+
public EvilBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
250+
super(memory, poolableSize, metrics, time, metricGrpName);
251+
}
252+
253+
/**
254+
* Override deallocate to intentionally corrupt the ByteBuffer being returned to the pool.
255+
* This is used to simulate a scenario where an in-flight buffer is mistakenly reused
256+
* and its contents are unexpectedly modified, helping expose buffer reuse bugs.
257+
*/
258+
@Override
259+
public void deallocate(ByteBuffer buffer, int size) {
260+
// Ensure atomicity using reentrant behavior
261+
lock.lock();
262+
try {
263+
Arrays.fill(buffer.array(), (byte) 0);
264+
super.deallocate(buffer, size);
265+
} finally {
266+
lock.unlock();
267+
}
268+
}
269+
}
270+
}

clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class BufferPool {
4848

4949
private final long totalMemory;
5050
private final int poolableSize;
51-
private final ReentrantLock lock;
51+
protected final ReentrantLock lock;
5252
private final Deque<ByteBuffer> free;
5353
private final Deque<Condition> waiters;
5454
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -172,43 +172,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
172172
this.accumulator.deallocate(batch);
173173
}
174174

175-
/**
176-
* Get the in-flight batches that has reached delivery timeout.
177-
*/
178-
private List<ProducerBatch> getExpiredInflightBatches(long now) {
179-
List<ProducerBatch> expiredBatches = new ArrayList<>();
180-
181-
for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
182-
Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
183-
List<ProducerBatch> partitionInFlightBatches = entry.getValue();
184-
if (partitionInFlightBatches != null) {
185-
Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
186-
while (iter.hasNext()) {
187-
ProducerBatch batch = iter.next();
188-
if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
189-
iter.remove();
190-
// expireBatches is called in Sender.sendProducerData, before client.poll.
191-
// The !batch.isDone() invariant should always hold. An IllegalStateException
192-
// exception will be thrown if the invariant is violated.
193-
if (!batch.isDone()) {
194-
expiredBatches.add(batch);
195-
} else {
196-
throw new IllegalStateException(batch.topicPartition + " batch created at " +
197-
batch.createdMs + " gets unexpected final state " + batch.finalState());
198-
}
199-
} else {
200-
accumulator.maybeUpdateNextBatchExpiryTime(batch);
201-
break;
202-
}
203-
}
204-
if (partitionInFlightBatches.isEmpty()) {
205-
batchIt.remove();
206-
}
207-
}
208-
}
209-
return expiredBatches;
210-
}
211-
212175
private void addToInflightBatches(List<ProducerBatch> batches) {
213176
for (ProducerBatch batch : batches) {
214177
List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
@@ -355,7 +318,8 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) {
355318
return false;
356319
}
357320

358-
private long sendProducerData(long now) {
321+
// Visible for testing
322+
protected long sendProducerData(long now) {
359323
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
360324
// get the list of partitions with data ready to send
361325
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
@@ -404,9 +368,7 @@ private long sendProducerData(long now) {
404368
}
405369

406370
accumulator.resetNextBatchExpiryTime();
407-
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
408371
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
409-
expiredBatches.addAll(expiredInflightBatches);
410372

411373
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
412374
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why

0 commit comments

Comments
 (0)