-
Notifications
You must be signed in to change notification settings - Fork 285
fix: Limit default ThreadPoolExecutor
thread count and remove deadlock scenario
#985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0c974a9
3afbc3e
59e1516
d2a76d3
c938ac3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import com.google.common.base.Supplier; | ||
import com.google.common.base.Suppliers; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.firebase.ErrorCode; | ||
import com.google.firebase.FirebaseApp; | ||
import com.google.firebase.ImplFirebaseTrampolines; | ||
|
@@ -159,7 +160,7 @@ protected String execute() throws FirebaseMessagingException { | |
* no failures are only indicated by a {@link BatchResponse}. | ||
*/ | ||
public BatchResponse sendEach(@NonNull List<Message> messages) throws FirebaseMessagingException { | ||
return sendEachOp(messages, false).call(); | ||
return sendEach(messages, false); | ||
} | ||
|
||
|
||
|
@@ -186,7 +187,11 @@ public BatchResponse sendEach(@NonNull List<Message> messages) throws FirebaseMe | |
*/ | ||
public BatchResponse sendEach( | ||
@NonNull List<Message> messages, boolean dryRun) throws FirebaseMessagingException { | ||
return sendEachOp(messages, dryRun).call(); | ||
try { | ||
return sendEachOpAsync(messages, dryRun).get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -197,7 +202,7 @@ public BatchResponse sendEach( | |
* the messages have been sent. | ||
*/ | ||
public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages) { | ||
return sendEachOp(messages, false).callAsync(app); | ||
return sendEachOpAsync(messages, false); | ||
} | ||
|
||
/** | ||
|
@@ -209,32 +214,37 @@ public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages) { | |
* the messages have been sent. | ||
*/ | ||
public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages, boolean dryRun) { | ||
return sendEachOp(messages, dryRun).callAsync(app); | ||
return sendEachOpAsync(messages, dryRun); | ||
} | ||
|
||
private CallableOperation<BatchResponse, FirebaseMessagingException> sendEachOp( | ||
// Returns an ApiFuture directly since this function is non-blocking. Individual child send | ||
// requests are still called async and run in background threads. | ||
private ApiFuture<BatchResponse> sendEachOpAsync( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This LGTM, but since this more of the opposite of how the rest of the SDK implements async operations, could we add a comment in code or in this thread briefly explaining this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added! |
||
final List<Message> messages, final boolean dryRun) { | ||
final List<Message> immutableMessages = ImmutableList.copyOf(messages); | ||
checkArgument(!immutableMessages.isEmpty(), "messages list must not be empty"); | ||
checkArgument(immutableMessages.size() <= 500, | ||
"messages list must not contain more than 500 elements"); | ||
|
||
return new CallableOperation<BatchResponse, FirebaseMessagingException>() { | ||
@Override | ||
protected BatchResponse execute() throws FirebaseMessagingException { | ||
List<ApiFuture<SendResponse>> list = new ArrayList<>(); | ||
for (Message message : immutableMessages) { | ||
ApiFuture<SendResponse> messageId = sendOpForSendResponse(message, dryRun).callAsync(app); | ||
list.add(messageId); | ||
} | ||
try { | ||
List<SendResponse> responses = ApiFutures.allAsList(list).get(); | ||
return new BatchResponseImpl(responses); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); | ||
} | ||
} | ||
}; | ||
List<ApiFuture<SendResponse>> list = new ArrayList<>(); | ||
for (Message message : immutableMessages) { | ||
// Make async send calls per message | ||
ApiFuture<SendResponse> messageId = sendOpForSendResponse(message, dryRun).callAsync(app); | ||
list.add(messageId); | ||
} | ||
|
||
// Gather all futures and combine into a list | ||
ApiFuture<List<SendResponse>> responsesFuture = ApiFutures.allAsList(list); | ||
|
||
// Chain this future to wrap the eventual responses in a BatchResponse without blocking | ||
// the main thread. This uses the current thread to execute, but since the transformation | ||
// function is non-blocking the transformation itself is also non-blocking. | ||
return ApiFutures.transform( | ||
responsesFuture, | ||
(responses) -> { | ||
return new BatchResponseImpl(responses); | ||
}, | ||
MoreExecutors.directExecutor()); | ||
} | ||
|
||
private CallableOperation<SendResponse, FirebaseMessagingException> sendOpForSendResponse( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since a cached thread pool is preferred for short lived tasks, could this introduce any potential performance issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the downside of the this is that threads once created aren't closed until shutdown.
Made a change to add a keepAliveTime for idle threads so this would no longer be the case. I was missing the
allowCoreThreadTimeOut()
method in testing which allows the timer to be applied.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This LGTM!