Skip to content

Commit c20478a

Browse files
author
Nitesh Kant
authored
RoundRobinLoadBalancer should close connections gracefully (#1028)
__Motivation__ When a host used by `RoundRobinLoadBalancer` is unregistered from service discovery, we force close the connection. This means any outstanding requests on the connection will get interrupted. Since service discovery status is a hint we should err on the side of caution and let the outstanding requests complete gracefully. A fallout of this decision is that a connection may actually be dead and an absence of writes may mean that the we will never detect closure hence graceful closure will never complete. This is a trade-off between interrupting requests which may otherwise be successfully completed vs not force closing requests that may never complete. As requests could be infinitely latent in general, we assume that users have configured timeouts on the requests and hence all in-flight requests will eventually complete. __Modification__ Use `closeAsyncGracefully()` instead of `closeAsync()` when a host turns inactive. __Result__ Host inactivation does not interrupt in-flight requests.
1 parent 0f7770e commit c20478a

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ void markInactive() {
378378
@SuppressWarnings("unchecked")
379379
List<C> toRemove = connectionsUpdater.getAndSet(this, INACTIVE);
380380
for (C conn : toRemove) {
381-
conn.closeAsync().subscribe();
381+
conn.closeAsyncGracefully().subscribe();
382382
}
383383
}
384384

servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.servicetalk.client.api.LoadBalancerReadyEvent;
2323
import io.servicetalk.client.api.NoAvailableHostException;
2424
import io.servicetalk.client.api.ServiceDiscovererEvent;
25-
import io.servicetalk.concurrent.CompletableSource.Processor;
2625
import io.servicetalk.concurrent.PublisherSource.Subscriber;
2726
import io.servicetalk.concurrent.PublisherSource.Subscription;
2827
import io.servicetalk.concurrent.api.Completable;
@@ -62,11 +61,10 @@
6261
import java.util.function.Function;
6362
import java.util.function.Predicate;
6463

64+
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
6565
import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely;
66-
import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
6766
import static io.servicetalk.concurrent.api.Single.failed;
6867
import static io.servicetalk.concurrent.api.Single.succeeded;
69-
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
7068
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
7169
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
7270
import static io.servicetalk.concurrent.internal.ServiceTalkTestTimeout.DEFAULT_TIMEOUT_SECONDS;
@@ -89,6 +87,8 @@
8987
import static org.junit.Assert.assertTrue;
9088
import static org.junit.Assert.fail;
9189
import static org.mockito.Mockito.mock;
90+
import static org.mockito.Mockito.times;
91+
import static org.mockito.Mockito.verify;
9292
import static org.mockito.Mockito.when;
9393

9494
public class RoundRobinLoadBalancerTest {
@@ -105,7 +105,7 @@ public class RoundRobinLoadBalancerTest {
105105
private final List<TestLoadBalancedConnection> connectionsCreated = new CopyOnWriteArrayList<>();
106106
private final Queue<Runnable> connectionRealizers = new ConcurrentLinkedQueue<>();
107107

108-
private TestPublisher<ServiceDiscovererEvent<String>> serviceDiscoveryPublisher = new TestPublisher<>();
108+
private final TestPublisher<ServiceDiscovererEvent<String>> serviceDiscoveryPublisher = new TestPublisher<>();
109109
private RoundRobinLoadBalancer<String, TestLoadBalancedConnection> lb;
110110
private DelegatingConnectionFactory connectionFactory;
111111

@@ -406,6 +406,16 @@ public void newConnectionIsClosedWhenSelectorRejects() throws Exception {
406406
awaitIndefinitely(connection.onClose());
407407
}
408408

409+
@Test
410+
public void hostDownGracefulCloseConnection() throws Exception {
411+
sendServiceDiscoveryEvents(upEvent("address-1"));
412+
TestLoadBalancedConnection conn = lb.selectConnection(any()).toFuture().get();
413+
sendServiceDiscoveryEvents(downEvent("address-1"));
414+
conn.onClose().toFuture().get();
415+
verify(conn).closeAsyncGracefully();
416+
verify(conn, times(0)).closeAsync();
417+
}
418+
409419
@SuppressWarnings("unchecked")
410420
private void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) {
411421
serviceDiscoveryPublisher.onNext((ServiceDiscovererEvent<String>[]) events);
@@ -437,12 +447,10 @@ private Single<TestLoadBalancedConnection> newRealizedConnectionSingle(final Str
437447
@SuppressWarnings("unchecked")
438448
private TestLoadBalancedConnection newConnection(final String address) {
439449
final TestLoadBalancedConnection cnx = mock(TestLoadBalancedConnection.class);
440-
final Processor closeCompletable = newCompletableProcessor();
441-
when(cnx.closeAsync()).thenAnswer(__ -> {
442-
closeCompletable.onComplete();
443-
return closeCompletable;
444-
});
445-
when(cnx.onClose()).thenReturn(fromSource(closeCompletable));
450+
final ListenableAsyncCloseable closeable = emptyAsyncCloseable();
451+
when(cnx.closeAsync()).thenReturn(closeable.closeAsync());
452+
when(cnx.closeAsyncGracefully()).thenReturn(closeable.closeAsyncGracefully());
453+
when(cnx.onClose()).thenReturn(closeable.onClose());
446454
when(cnx.address()).thenReturn(address);
447455
when(cnx.toString()).thenReturn(address + '@' + cnx.hashCode());
448456

0 commit comments

Comments
 (0)