Skip to content

Commit 797a8af

Browse files
authored
fix: Limit default ThreadPoolExecutor thread count and remove deadlock scenario (#985)
* fix: Remove possible deadlock with nested `callAsync()` * Set default `ThreadPoolExecutor` to a fixed thread pool * fix: add keepAliveTime to close idle threads * Added comments explainging the sendEachAsync change
1 parent bbfa43a commit 797a8af

File tree

2 files changed

+39
-22
lines changed

2 files changed

+39
-22
lines changed

src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import java.util.Set;
2424
import java.util.concurrent.ExecutorService;
2525
import java.util.concurrent.Executors;
26+
import java.util.concurrent.LinkedBlockingQueue;
2627
import java.util.concurrent.ThreadFactory;
28+
import java.util.concurrent.ThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2730

2831
import org.slf4j.Logger;
2932
import org.slf4j.LoggerFactory;
@@ -84,7 +87,11 @@ private static class DefaultThreadManager extends GlobalThreadManager {
8487
protected ExecutorService doInit() {
8588
ThreadFactory threadFactory = FirebaseScheduledExecutor.getThreadFactoryWithName(
8689
getThreadFactory(), "firebase-default-%d");
87-
return Executors.newCachedThreadPool(threadFactory);
90+
91+
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L,
92+
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
93+
threadPoolExecutor.allowCoreThreadTimeOut(true);
94+
return threadPoolExecutor;
8895
}
8996

9097
@Override

src/main/java/com/google/firebase/messaging/FirebaseMessaging.java

+31-21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.base.Supplier;
2727
import com.google.common.base.Suppliers;
2828
import com.google.common.collect.ImmutableList;
29+
import com.google.common.util.concurrent.MoreExecutors;
2930
import com.google.firebase.ErrorCode;
3031
import com.google.firebase.FirebaseApp;
3132
import com.google.firebase.ImplFirebaseTrampolines;
@@ -159,7 +160,7 @@ protected String execute() throws FirebaseMessagingException {
159160
* no failures are only indicated by a {@link BatchResponse}.
160161
*/
161162
public BatchResponse sendEach(@NonNull List<Message> messages) throws FirebaseMessagingException {
162-
return sendEachOp(messages, false).call();
163+
return sendEach(messages, false);
163164
}
164165

165166

@@ -186,7 +187,11 @@ public BatchResponse sendEach(@NonNull List<Message> messages) throws FirebaseMe
186187
*/
187188
public BatchResponse sendEach(
188189
@NonNull List<Message> messages, boolean dryRun) throws FirebaseMessagingException {
189-
return sendEachOp(messages, dryRun).call();
190+
try {
191+
return sendEachOpAsync(messages, dryRun).get();
192+
} catch (InterruptedException | ExecutionException e) {
193+
throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID);
194+
}
190195
}
191196

192197
/**
@@ -197,7 +202,7 @@ public BatchResponse sendEach(
197202
* the messages have been sent.
198203
*/
199204
public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages) {
200-
return sendEachOp(messages, false).callAsync(app);
205+
return sendEachOpAsync(messages, false);
201206
}
202207

203208
/**
@@ -209,32 +214,37 @@ public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages) {
209214
* the messages have been sent.
210215
*/
211216
public ApiFuture<BatchResponse> sendEachAsync(@NonNull List<Message> messages, boolean dryRun) {
212-
return sendEachOp(messages, dryRun).callAsync(app);
217+
return sendEachOpAsync(messages, dryRun);
213218
}
214219

215-
private CallableOperation<BatchResponse, FirebaseMessagingException> sendEachOp(
220+
// Returns an ApiFuture directly since this function is non-blocking. Individual child send
221+
// requests are still called async and run in background threads.
222+
private ApiFuture<BatchResponse> sendEachOpAsync(
216223
final List<Message> messages, final boolean dryRun) {
217224
final List<Message> immutableMessages = ImmutableList.copyOf(messages);
218225
checkArgument(!immutableMessages.isEmpty(), "messages list must not be empty");
219226
checkArgument(immutableMessages.size() <= 500,
220227
"messages list must not contain more than 500 elements");
221228

222-
return new CallableOperation<BatchResponse, FirebaseMessagingException>() {
223-
@Override
224-
protected BatchResponse execute() throws FirebaseMessagingException {
225-
List<ApiFuture<SendResponse>> list = new ArrayList<>();
226-
for (Message message : immutableMessages) {
227-
ApiFuture<SendResponse> messageId = sendOpForSendResponse(message, dryRun).callAsync(app);
228-
list.add(messageId);
229-
}
230-
try {
231-
List<SendResponse> responses = ApiFutures.allAsList(list).get();
232-
return new BatchResponseImpl(responses);
233-
} catch (InterruptedException | ExecutionException e) {
234-
throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID);
235-
}
236-
}
237-
};
229+
List<ApiFuture<SendResponse>> list = new ArrayList<>();
230+
for (Message message : immutableMessages) {
231+
// Make async send calls per message
232+
ApiFuture<SendResponse> messageId = sendOpForSendResponse(message, dryRun).callAsync(app);
233+
list.add(messageId);
234+
}
235+
236+
// Gather all futures and combine into a list
237+
ApiFuture<List<SendResponse>> responsesFuture = ApiFutures.allAsList(list);
238+
239+
// Chain this future to wrap the eventual responses in a BatchResponse without blocking
240+
// the main thread. This uses the current thread to execute, but since the transformation
241+
// function is non-blocking the transformation itself is also non-blocking.
242+
return ApiFutures.transform(
243+
responsesFuture,
244+
(responses) -> {
245+
return new BatchResponseImpl(responses);
246+
},
247+
MoreExecutors.directExecutor());
238248
}
239249

240250
private CallableOperation<SendResponse, FirebaseMessagingException> sendOpForSendResponse(

0 commit comments

Comments
 (0)