Skip to content

Commit 25788b3

Browse files
Addressed Jun's round 4 review comments
1 parent f94f7ca commit 25788b3

File tree

1 file changed

+9
-18
lines changed

1 file changed

+9
-18
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.concurrent.CompletableFuture;
6363
import java.util.concurrent.ExecutionException;
6464
import java.util.concurrent.Future;
65-
import java.util.concurrent.RejectedExecutionException;
6665
import java.util.concurrent.TimeUnit;
6766
import java.util.concurrent.locks.Lock;
6867
import java.util.function.BiConsumer;
@@ -304,7 +303,7 @@ public boolean tryComplete() {
304303
localPartitionsAlreadyFetched = replicaManagerReadResponse;
305304
boolean completedByMe = forceComplete();
306305
// If invocation of forceComplete is not successful, then that means the request is already completed
307-
// hence release the acquired locks.
306+
// hence the acquired locks are already released.
308307
if (!completedByMe) {
309308
releasePartitionLocks(partitionsAcquired.keySet());
310309
}
@@ -335,7 +334,7 @@ public boolean tryComplete() {
335334
} else {
336335
boolean completedByMe = forceComplete();
337336
// If invocation of forceComplete is not successful, then that means the request is already completed
338-
// hence release the acquired locks. This can occur in case of remote storage fetch if there is a thread that
337+
// hence the acquired locks are already released. This can occur in case of remote storage fetch if there is a thread that
339338
// completes the operation (due to expiration) right before a different thread is about to enter tryComplete.
340339
if (!completedByMe) {
341340
releasePartitionLocks(partitionsAcquired.keySet());
@@ -626,7 +625,7 @@ private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchIn
626625
private boolean maybeProcessRemoteFetch(
627626
LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
628627
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
629-
) throws Exception {
628+
) {
630629
Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>();
631630
topicPartitionData.keySet().forEach(topicIdPartition -> {
632631
// topic partitions for which fetch would not be happening in this share fetch request.
@@ -637,19 +636,16 @@ private boolean maybeProcessRemoteFetch(
637636
// Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add
638637
// them to the delayed actions queue.
639638
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
640-
Optional<Exception> exceptionOpt = processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
641-
if (exceptionOpt.isPresent()) {
642-
throw exceptionOpt.get();
643-
}
639+
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
644640
// Check if remote fetch can be completed.
645641
return maybeCompletePendingRemoteFetch();
646642
}
647643

648644
/**
649-
* Returns an option containing an exception if a task for RemoteStorageFetchInfo could not be scheduled successfully else returns empty optional.
650-
* @param topicPartitionRemoteFetchInfo - The remote storage fetch topic partition information.
645+
* Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt.
646+
* @param topicPartitionRemoteFetchInfo - The remote storage fetch information.
651647
*/
652-
private Optional<Exception> processRemoteFetchOrException(
648+
private void processRemoteFetchOrException(
653649
TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
654650
) {
655651
TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition();
@@ -665,17 +661,12 @@ private Optional<Exception> processRemoteFetchOrException(
665661
replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition()));
666662
}
667663
);
668-
} catch (RejectedExecutionException e) {
669-
// Return the error if any in scheduling the remote fetch task.
670-
log.warn("Unable to fetch data from remote storage", e);
671-
remoteStorageFetchException = Optional.of(e);
672-
return Optional.of(e);
673664
} catch (Exception e) {
665+
// Throw the error if any in scheduling the remote fetch task.
674666
remoteStorageFetchException = Optional.of(e);
675-
return Optional.of(e);
667+
throw e;
676668
}
677669
remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo));
678-
return Optional.empty();
679670
}
680671

681672
/**

0 commit comments

Comments
 (0)