Skip to content

Commit c1fc59f

Browse files
KAFKA-18918: Correcting releasing of locks on exception (apache#19091)
The PR corrects the way the locks are released on exception. As `partitionsAcquired` can be a reference to `topicPartitionData`, hence the locks should released prior clearing `partitionsAcquired`. Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>
1 parent c2014c0 commit c1fc59f

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,9 @@ public boolean tryComplete() {
277277
return false;
278278
} catch (Exception e) {
279279
log.error("Error processing delayed share fetch request", e);
280+
releasePartitionLocks(topicPartitionData.keySet());
280281
partitionsAcquired.clear();
281282
partitionsAlreadyFetched.clear();
282-
releasePartitionLocks(topicPartitionData.keySet());
283283
return forceComplete();
284284
}
285285
}

core/src/test/java/kafka/server/share/DelayedShareFetchTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import static org.mockito.ArgumentMatchers.anyString;
8787
import static org.mockito.Mockito.doAnswer;
8888
import static org.mockito.Mockito.doReturn;
89+
import static org.mockito.Mockito.doThrow;
8990
import static org.mockito.Mockito.mock;
9091
import static org.mockito.Mockito.spy;
9192
import static org.mockito.Mockito.times;
@@ -725,6 +726,7 @@ public void testExceptionInMinBytesCalculation() {
725726
Mockito.verify(replicaManager, times(1)).readFromLog(
726727
any(), any(), any(ReplicaQuota.class), anyBoolean());
727728
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
729+
Mockito.verify(sp0, times(1)).releaseFetchLock();
728730

729731
// Force complete the request as it's still pending. Return false from the share partition lock acquire.
730732
when(sp0.maybeAcquireFetchLock()).thenReturn(false);
@@ -747,6 +749,44 @@ public void testExceptionInMinBytesCalculation() {
747749
Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
748750
}
749751

752+
@Test
753+
public void testTryCompleteLocksReleasedOnCompleteException() {
754+
ReplicaManager replicaManager = mock(ReplicaManager.class);
755+
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
756+
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);
757+
758+
SharePartition sp0 = mock(SharePartition.class);
759+
when(sp0.maybeAcquireFetchLock()).thenReturn(true);
760+
when(sp0.canAcquireRecords()).thenReturn(true);
761+
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
762+
763+
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
764+
sharePartitions.put(tp0, sp0);
765+
766+
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
767+
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
768+
BROKER_TOPIC_STATS);
769+
770+
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
771+
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
772+
773+
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
774+
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
775+
.withShareFetchData(shareFetch)
776+
.withSharePartitions(sharePartitions)
777+
.withReplicaManager(replicaManager)
778+
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
779+
.build());
780+
assertFalse(delayedShareFetch.isCompleted());
781+
// Throw exception for onComplete.
782+
doThrow(new RuntimeException()).when(delayedShareFetch).onComplete();
783+
// Try to complete the request.
784+
assertFalse(delayedShareFetch.tryComplete());
785+
786+
Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any());
787+
Mockito.verify(sp0, times(1)).releaseFetchLock();
788+
}
789+
750790
@Test
751791
public void testLocksReleasedForCompletedFetch() {
752792
String groupId = "grp";

0 commit comments

Comments
 (0)