Skip to content

Commit ac4cd21

Browse files
authored
test: fix BulkWriter flaky tests. (#1510)
1 parent 111b4e4 commit ac4cd21

File tree

1 file changed

+33
-9
lines changed

1 file changed

+33
-9
lines changed

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

+33-9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.concurrent.ScheduledThreadPoolExecutor;
5858
import java.util.concurrent.TimeUnit;
5959
import javax.annotation.Nonnull;
60+
import org.junit.After;
6061
import org.junit.Assert;
6162
import org.junit.Before;
6263
import org.junit.Rule;
@@ -121,6 +122,8 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
121122
private DocumentReference doc1;
122123
private DocumentReference doc2;
123124

125+
private ScheduledExecutorService timeoutExecutor;
126+
124127
public static ApiFuture<BatchWriteResponse> successResponse(int updateTimeSeconds) {
125128
BatchWriteResponse.Builder response = BatchWriteResponse.newBuilder();
126129
response.addWriteResultsBuilder().getUpdateTimeBuilder().setSeconds(updateTimeSeconds).build();
@@ -155,7 +158,7 @@ public void before() {
155158
lenient().doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
156159
testExecutor = Executors.newSingleThreadScheduledExecutor();
157160

158-
final ScheduledExecutorService timeoutExecutor =
161+
timeoutExecutor =
159162
new ScheduledThreadPoolExecutor(1) {
160163
@Override
161164
@Nonnull
@@ -170,6 +173,21 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
170173
doc2 = firestoreMock.document("coll/doc2");
171174
}
172175

176+
@After
177+
public void after() throws InterruptedException {
178+
shutdownScheduledExecutorService(timeoutExecutor);
179+
}
180+
181+
void shutdownScheduledExecutorService(ScheduledExecutorService executorService)
182+
throws InterruptedException {
183+
// Wait for the executor to finish after each test.
184+
//
185+
// This ensures the executor service is shut down properly within the given timeout, and thereby
186+
// avoids potential hangs caused by lingering threads. Note that if a given thread is terminated
187+
// because of the timeout, the associated test will fail, which is what we want.
188+
executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
189+
}
190+
173191
@Test
174192
public void hasSetMethod() throws Exception {
175193
ResponseStubber responseStubber =
@@ -968,7 +986,7 @@ public void doesNotSendBatchesIfDoingSoExceedsRateLimit() throws Exception {
968986
// future at the end of the test to ensure that the timeout was called.
969987
final SettableApiFuture<Void> timeoutCalledFuture = SettableApiFuture.create();
970988

971-
final ScheduledExecutorService timeoutExecutor =
989+
final ScheduledExecutorService customExecutor =
972990
new ScheduledThreadPoolExecutor(1) {
973991
@Override
974992
@Nonnull
@@ -994,14 +1012,15 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
9941012
firestoreMock.bulkWriter(
9951013
BulkWriterOptions.builder()
9961014
.setInitialOpsPerSecond(5)
997-
.setExecutor(timeoutExecutor)
1015+
.setExecutor(customExecutor)
9981016
.build());
9991017

10001018
for (int i = 0; i < 600; ++i) {
10011019
bulkWriter.set(firestoreMock.document("coll/doc"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
10021020
}
10031021
bulkWriter.flush();
10041022
timeoutCalledFuture.get();
1023+
shutdownScheduledExecutorService(customExecutor);
10051024
}
10061025

10071026
@Test
@@ -1097,7 +1116,7 @@ public void retriesWritesWhenBatchWriteFailsWithRetryableError() throws Exceptio
10971116
public void failsWritesAfterAllRetryAttemptsFail() throws Exception {
10981117
final int[] retryAttempts = {0};
10991118
final int[] scheduleWithDelayCount = {0};
1100-
final ScheduledExecutorService timeoutExecutor =
1119+
final ScheduledExecutorService customExecutor =
11011120
new ScheduledThreadPoolExecutor(1) {
11021121
@Override
11031122
@Nonnull
@@ -1127,7 +1146,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
11271146
ArgumentMatchers.<UnaryCallable<BatchWriteRequest, BatchWriteResponse>>any());
11281147

11291148
bulkWriter =
1130-
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
1149+
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
11311150
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
11321151
bulkWriter.flush().get();
11331152

@@ -1139,14 +1158,16 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
11391158
assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS + 1, retryAttempts[0]);
11401159
// The first attempt should not have a delay.
11411160
assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS, scheduleWithDelayCount[0]);
1161+
} finally {
1162+
shutdownScheduledExecutorService(customExecutor);
11421163
}
11431164
}
11441165

11451166
@Test
11461167
public void appliesMaxBackoffOnRetriesForResourceExhausted() throws Exception {
11471168
final int[] retryAttempts = {0};
11481169
final int[] scheduleWithDelayCount = {0};
1149-
final ScheduledExecutorService timeoutExecutor =
1170+
final ScheduledExecutorService customExecutor =
11501171
new ScheduledThreadPoolExecutor(1) {
11511172
@Override
11521173
@Nonnull
@@ -1177,7 +1198,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
11771198
ArgumentMatchers.<UnaryCallable<BatchWriteRequest, BatchWriteResponse>>any());
11781199

11791200
bulkWriter =
1180-
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
1201+
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
11811202
bulkWriter.addWriteErrorListener(error -> error.getFailedAttempts() < 5);
11821203

11831204
ApiFuture<WriteResult> result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
@@ -1191,6 +1212,8 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
11911212
assertEquals(5, retryAttempts[0]);
11921213
// The first attempt should not have a delay.
11931214
assertEquals(4, scheduleWithDelayCount[0]);
1215+
} finally {
1216+
shutdownScheduledExecutorService(customExecutor);
11941217
}
11951218
}
11961219

@@ -1203,7 +1226,7 @@ public void usesHighestBackoffFoundInBatch() throws Exception {
12031226
* BulkWriterOperation.DEFAULT_BACKOFF_FACTOR)
12041227
};
12051228
final int[] retryAttempts = {0};
1206-
final ScheduledExecutorService timeoutExecutor =
1229+
final ScheduledExecutorService customExecutor =
12071230
new ScheduledThreadPoolExecutor(1) {
12081231
@Override
12091232
@Nonnull
@@ -1244,14 +1267,15 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
12441267
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
12451268

12461269
bulkWriter =
1247-
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build());
1270+
firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(customExecutor).build());
12481271
bulkWriter.addWriteErrorListener(error -> error.getFailedAttempts() < 5);
12491272

12501273
bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
12511274
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
12521275
bulkWriter.close();
12531276
responseStubber.verifyAllRequestsSent();
12541277
assertEquals(2, retryAttempts[0]);
1278+
shutdownScheduledExecutorService(customExecutor);
12551279
}
12561280

12571281
@Test

0 commit comments

Comments
 (0)