diff --git a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java index 877450fcc..c14fbd611 100644 --- a/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java +++ b/src/main/java/com/google/firebase/internal/FirebaseThreadManagers.java @@ -23,7 +23,10 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +87,11 @@ private static class DefaultThreadManager extends GlobalThreadManager { protected ExecutorService doInit() { ThreadFactory threadFactory = FirebaseScheduledExecutor.getThreadFactoryWithName( getThreadFactory(), "firebase-default-%d"); - return Executors.newCachedThreadPool(threadFactory); + + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, + TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); + threadPoolExecutor.allowCoreThreadTimeOut(true); + return threadPoolExecutor; } @Override diff --git a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java index e16b6ac9d..870940f77 100644 --- a/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java +++ b/src/main/java/com/google/firebase/messaging/FirebaseMessaging.java @@ -26,6 +26,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import com.google.firebase.ErrorCode; import com.google.firebase.FirebaseApp; import com.google.firebase.ImplFirebaseTrampolines; @@ -159,7 +160,7 @@ protected String execute() throws FirebaseMessagingException { * no failures are only indicated by a {@link BatchResponse}. */ public BatchResponse sendEach(@NonNull List messages) throws FirebaseMessagingException { - return sendEachOp(messages, false).call(); + return sendEach(messages, false); } @@ -186,7 +187,11 @@ public BatchResponse sendEach(@NonNull List messages) throws FirebaseMe */ public BatchResponse sendEach( @NonNull List messages, boolean dryRun) throws FirebaseMessagingException { - return sendEachOp(messages, dryRun).call(); + try { + return sendEachOpAsync(messages, dryRun).get(); + } catch (InterruptedException | ExecutionException e) { + throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); + } } /** @@ -197,7 +202,7 @@ public BatchResponse sendEach( * the messages have been sent. */ public ApiFuture sendEachAsync(@NonNull List messages) { - return sendEachOp(messages, false).callAsync(app); + return sendEachOpAsync(messages, false); } /** @@ -209,32 +214,37 @@ public ApiFuture sendEachAsync(@NonNull List messages) { * the messages have been sent. */ public ApiFuture sendEachAsync(@NonNull List messages, boolean dryRun) { - return sendEachOp(messages, dryRun).callAsync(app); + return sendEachOpAsync(messages, dryRun); } - private CallableOperation sendEachOp( + // Returns an ApiFuture directly since this function is non-blocking. Individual child send + // requests are still called async and run in background threads. + private ApiFuture sendEachOpAsync( final List messages, final boolean dryRun) { final List immutableMessages = ImmutableList.copyOf(messages); checkArgument(!immutableMessages.isEmpty(), "messages list must not be empty"); checkArgument(immutableMessages.size() <= 500, "messages list must not contain more than 500 elements"); - return new CallableOperation() { - @Override - protected BatchResponse execute() throws FirebaseMessagingException { - List> list = new ArrayList<>(); - for (Message message : immutableMessages) { - ApiFuture messageId = sendOpForSendResponse(message, dryRun).callAsync(app); - list.add(messageId); - } - try { - List responses = ApiFutures.allAsList(list).get(); - return new BatchResponseImpl(responses); - } catch (InterruptedException | ExecutionException e) { - throw new FirebaseMessagingException(ErrorCode.CANCELLED, SERVICE_ID); - } - } - }; + List> list = new ArrayList<>(); + for (Message message : immutableMessages) { + // Make async send calls per message + ApiFuture messageId = sendOpForSendResponse(message, dryRun).callAsync(app); + list.add(messageId); + } + + // Gather all futures and combine into a list + ApiFuture> responsesFuture = ApiFutures.allAsList(list); + + // Chain this future to wrap the eventual responses in a BatchResponse without blocking + // the main thread. This uses the current thread to execute, but since the transformation + // function is non-blocking the transformation itself is also non-blocking. + return ApiFutures.transform( + responsesFuture, + (responses) -> { + return new BatchResponseImpl(responses); + }, + MoreExecutors.directExecutor()); } private CallableOperation sendOpForSendResponse(