File tree Expand file tree Collapse file tree 1 file changed +10
-1
lines changed
core/src/main/java/kafka/server/share Expand file tree Collapse file tree 1 file changed +10
-1
lines changed Original file line number Diff line number Diff line change @@ -331,8 +331,17 @@ public boolean tryComplete() {
331
331
releasePartitionLocks (topicPartitionData .keySet ());
332
332
partitionsAcquired .clear ();
333
333
localPartitionsAlreadyFetched .clear ();
334
+ return forceComplete ();
335
+ } else {
336
+ boolean completedByMe = forceComplete ();
337
+ // If invocation of forceComplete is not successful, then that means the request is already completed
338
+ // hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that
339
+ // completes the operation (due to expiration) right before a different thread is about to enter tryComplete.
340
+ if (!completedByMe ) {
341
+ releasePartitionLocks (partitionsAcquired .keySet ());
342
+ }
343
+ return completedByMe ;
334
344
}
335
- return forceComplete ();
336
345
}
337
346
}
338
347
You can’t perform that action at this time.
0 commit comments