Skip to content

Commit e08ea2f

Browse files
fix: Change proxy service to call child start methods on executors (#1334)
* fix: Change proxy service to call child start methods on executors Also change service clients to use an isolated executor Also change channel pools to not use a fixed pool size, and use an auto-scaling channel pool. * feat: Convert internal interfaces to use protos * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 0150fb7 commit e08ea2f

File tree

5 files changed

+62
-39
lines changed

5 files changed

+62
-39
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
3232
If you are using Gradle without BOM, add this to your dependencies:
3333

3434
```Groovy
35-
implementation 'com.google.cloud:google-cloud-pubsublite:1.9.4'
35+
implementation 'com.google.cloud:google-cloud-pubsublite:1.10.0'
3636
```
3737

3838
If you are using SBT, add this to your dependencies:
3939

4040
```Scala
41-
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.9.4"
41+
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.10.0"
4242
```
4343

4444
## Authentication

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ProxyService.java

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
public abstract class ProxyService extends AbstractApiService {
4141
private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();
4242
private final List<ApiService> services = new ArrayList<>();
43-
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);
43+
private final AtomicBoolean failed = new AtomicBoolean(false);
4444

4545
protected <T extends ApiService> ProxyService(Collection<T> services) {
4646
addServices(services);
@@ -55,8 +55,16 @@ protected ProxyService(ApiService... services) throws ApiException {
5555
protected final <T extends ApiService> void addServices(Collection<T> services)
5656
throws ApiException {
5757
checkState(state() == State.NEW);
58+
Listener onServiceError =
59+
new Listener() {
60+
@Override
61+
public void failed(State state, Throwable throwable) {
62+
onPermanentError(toCanonical(throwable));
63+
}
64+
};
5865
for (ApiService service : services) {
5966
checkArgument(service.state() == State.NEW, "All services must not be started.");
67+
service.addListener(onServiceError, SystemExecutors.getFuturesExecutor());
6068
this.services.add(service);
6169
}
6270
}
@@ -76,7 +84,7 @@ protected void handlePermanentError(CheckedApiException error) {}
7684

7785
// Tries to stop all dependent services and sets this service into the FAILED state.
7886
protected final void onPermanentError(CheckedApiException error) {
79-
if (stoppedOrFailed.getAndSet(true)) return;
87+
if (failed.getAndSet(true)) return;
8088
try {
8189
ApiServiceUtils.stopAsync(services);
8290
} catch (Throwable t) {
@@ -87,13 +95,21 @@ protected final void onPermanentError(CheckedApiException error) {
8795
} catch (Throwable t) {
8896
LOGGER.atFine().withCause(t).log("Exception in handlePermanentError.");
8997
}
90-
// Failures are sent to the client and should always be ApiExceptions.
91-
notifyFailed(error.underlying);
98+
try {
99+
// Failures are sent to the client and should always be ApiExceptions.
100+
notifyFailed(error.underlying);
101+
} catch (IllegalStateException e) {
102+
LOGGER.atFine().withCause(e).log("Exception in notifyFailed.");
103+
}
92104
}
93105

94106
// AbstractApiService implementation.
95107
@Override
96108
protected final void doStart() {
109+
SystemExecutors.getFuturesExecutor().execute(this::startImpl);
110+
}
111+
112+
private void startImpl() {
97113
Listener listener =
98114
new Listener() {
99115
private final AtomicInteger leftToStart = new AtomicInteger(services.size());
@@ -103,54 +119,53 @@ public void running() {
103119
if (leftToStart.decrementAndGet() == 0) {
104120
try {
105121
start();
122+
notifyStarted();
106123
} catch (CheckedApiException e) {
107124
onPermanentError(e);
108-
return;
109125
}
110-
notifyStarted();
111126
}
112127
}
113-
114-
@Override
115-
public void failed(State state, Throwable throwable) {
116-
onPermanentError(toCanonical(throwable));
117-
}
118128
};
119-
for (ApiService service : services) {
120-
service.addListener(listener, SystemExecutors.getFuturesExecutor());
121-
service.startAsync();
129+
try {
130+
for (ApiService service : services) {
131+
service.addListener(listener, SystemExecutors.getFuturesExecutor());
132+
service.startAsync();
133+
}
134+
} catch (Throwable t) {
135+
onPermanentError(toCanonical(t));
122136
}
123137
}
124138

125139
@Override
126140
protected final void doStop() {
141+
SystemExecutors.getFuturesExecutor().execute(this::stopImpl);
142+
}
143+
144+
private void stopImpl() {
127145
Listener listener =
128146
new Listener() {
129147
private final AtomicInteger leftToStop = new AtomicInteger(services.size());
130148

131149
@Override
132150
public void terminated(State state) {
133151
if (leftToStop.decrementAndGet() == 0) {
134-
if (!stoppedOrFailed.getAndSet(true)) {
135-
notifyStopped();
136-
}
152+
notifyStopped();
137153
}
138154
}
139-
140-
@Override
141-
public void failed(State state, Throwable throwable) {
142-
onPermanentError(toCanonical(throwable));
143-
}
144155
};
145156
try {
146157
stop();
147158
} catch (CheckedApiException e) {
148159
onPermanentError(e);
149160
return;
150161
}
151-
for (ApiService service : services) {
152-
service.addListener(listener, SystemExecutors.getFuturesExecutor());
153-
service.stopAsync();
162+
try {
163+
for (ApiService service : services) {
164+
service.addListener(listener, SystemExecutors.getFuturesExecutor());
165+
service.stopAsync();
166+
}
167+
} catch (Throwable t) {
168+
onPermanentError(toCanonical(t));
154169
}
155170
}
156171
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ServiceClients.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
2020

2121
import com.google.api.gax.core.FixedExecutorProvider;
22+
import com.google.api.gax.grpc.ChannelPoolSettings;
2223
import com.google.api.gax.grpc.GrpcCallContext;
2324
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2425
import com.google.api.gax.rpc.ApiCallContext;
@@ -27,15 +28,15 @@
2728
import com.google.api.gax.rpc.TransportChannelProvider;
2829
import com.google.cloud.pubsublite.CloudRegion;
2930
import com.google.cloud.pubsublite.Endpoints;
31+
import com.google.cloud.pubsublite.internal.Lazy;
3032
import com.google.common.collect.ImmutableListMultimap;
3133
import com.google.common.collect.Multimaps;
34+
import java.util.concurrent.ScheduledExecutorService;
3235
import org.threeten.bp.Duration;
3336

3437
public final class ServiceClients {
35-
// Default to 10 channels per client to avoid server limitations on streams and requests
36-
// per-channel.
37-
private static final int CLIENT_POOL_SIZE =
38-
Integer.parseInt(System.getProperty("PUBSUB_LITE_CHANNELS_PER_CLIENT", "10"));
38+
private static final Lazy<ScheduledExecutorService> GRPC_EXECUTOR =
39+
new Lazy<>(() -> SystemExecutors.newDaemonExecutor("pubsub-lite-grpc"));
3940

4041
private ServiceClients() {}
4142

@@ -45,7 +46,11 @@ private static TransportChannelProvider getTransportChannelProvider() {
4546
.setKeepAliveTime(Duration.ofMinutes(1))
4647
.setKeepAliveWithoutCalls(true)
4748
.setKeepAliveTimeout(Duration.ofMinutes(1))
48-
.setPoolSize(CLIENT_POOL_SIZE)
49+
.setChannelPoolSettings(
50+
ChannelPoolSettings.builder()
51+
.setInitialChannelCount(25)
52+
.setMaxRpcsPerChannel(50)
53+
.build())
4954
.setExecutor(SystemExecutors.getFuturesExecutor())
5055
.build();
5156
}
@@ -57,8 +62,7 @@ Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiExcep
5762
try {
5863
return builder
5964
.setEndpoint(Endpoints.regionalEndpoint(target))
60-
.setBackgroundExecutorProvider(
61-
FixedExecutorProvider.create(SystemExecutors.getAlarmExecutor()))
65+
.setBackgroundExecutorProvider(FixedExecutorProvider.create(GRPC_EXECUTOR.get()))
6266
.setTransportChannelProvider(getTransportChannelProvider())
6367
.build();
6468
} catch (Throwable t) {

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/internal/AssigningSubscriberTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private Subscriber initSub1() throws CheckedApiException {
196196
public void stopStopsSubs() throws CheckedApiException {
197197
Subscriber sub1 = initSub1();
198198

199-
assigningSubscriber.stopAsync();
199+
assigningSubscriber.stopAsync().awaitTerminated();
200200
verify(sub1).stopAsync();
201201
verify(sub1).awaitTerminated();
202202
}

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherImplTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19+
import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertFutureThrowsCode;
1920
import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertThrowableMatches;
2021
import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed;
2122
import static com.google.common.truth.Truth.assertThat;
@@ -67,7 +68,9 @@
6768
import java.util.stream.Collectors;
6869
import java.util.stream.IntStream;
6970
import org.junit.Before;
71+
import org.junit.Rule;
7072
import org.junit.Test;
73+
import org.junit.rules.Timeout;
7174
import org.junit.runner.RunWith;
7275
import org.junit.runners.JUnit4;
7376
import org.mockito.InOrder;
@@ -99,6 +102,8 @@ private static MessagePublishResponse messageResponse(Offset startOffset, int me
99102
.build();
100103
}
101104

105+
@Rule public Timeout globalTimeout = Timeout.seconds(30);
106+
102107
@Mock private PublishStreamFactory unusedStreamFactory;
103108
@Mock private BatchPublisher mockBatchPublisher;
104109
@Mock private BatchPublisherFactory mockPublisherFactory;
@@ -203,11 +208,10 @@ public void construct_closeSendsBatched() throws Exception {
203208
}
204209

205210
@Test
206-
public void publishBeforeStart_isPermanentError() throws Exception {
211+
public void publishBeforeStart_FailsFuture() {
207212
Message message = Message.builder().build();
208-
assertThrows(
209-
IllegalStateException.class, () -> publisher.publish(message, PublishSequenceNumber.of(0)));
210-
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
213+
assertFutureThrowsCode(
214+
publisher.publish(message, PublishSequenceNumber.of(0)), Code.FAILED_PRECONDITION);
211215
verifyNoInteractions(mockPublisherFactory);
212216
verifyNoInteractions(mockBatchPublisher);
213217
}

0 commit comments

Comments
 (0)