Skip to content

Commit e407078

Browse files
committed
Fix a file descriptor leak with wrong user:pass
Set SO_LINGER to 0 in tests that produce many outgoing connections, because so many sockets in the TIME_WAIT state can lead to 'java.net.NoRouteToHostException: Cannot assign requested address (Address not available)' exceptions on Travis-CI. Fixes #132.
1 parent d2dfa4c commit e407078

File tree

6 files changed

+111
-37
lines changed

6 files changed

+111
-37
lines changed

src/main/java/org/tarantool/TarantoolBase.java

+22-14
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public TarantoolBase() {
4040
public TarantoolBase(String username, String password, Socket socket) {
4141
super();
4242
try {
43-
this.is = new DataInputStream(cis = new CountInputStreamImpl(socket.getInputStream()));
43+
cis = new CountInputStreamImpl(socket.getInputStream());
44+
is = new DataInputStream(cis);
4445
byte[] bytes = new byte[64];
4546
is.readFully(bytes);
4647
String firstLine = new String(bytes);
4748
if (!firstLine.startsWith(WELCOME)) {
49+
closeStreams();
4850
close();
4951
throw new CommunicationException("Welcome message should starts with tarantool but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet"));
5052
}
@@ -56,23 +58,15 @@ public TarantoolBase(String username, String password, Socket socket) {
5658
OutputStream os = socket.getOutputStream();
5759
os.write(authPacket.array(), 0, authPacket.remaining());
5860
os.flush();
59-
readPacket(is);
61+
readPacket();
6062
Long code = (Long) headers.get(Key.CODE.getId());
6163
if (code != 0) {
64+
closeStreams();
6265
throw serverError(code, body.get(Key.ERROR.getId()));
6366
}
6467
}
6568
} catch (IOException e) {
66-
try {
67-
is.close();
68-
} catch (IOException ignored) {
69-
70-
}
71-
try {
72-
cis.close();
73-
} catch (IOException ignored) {
74-
75-
}
69+
closeStreams();
7670
throw new CommunicationException("Couldn't connect to tarantool", e);
7771
}
7872
}
@@ -130,7 +124,7 @@ protected ByteBuffer createPacket(Code code, Long syncId, Long schemaId, Object.
130124
return buffer;
131125
}
132126

133-
protected void readPacket(DataInputStream is) throws IOException {
127+
protected void readPacket() throws IOException {
134128
int size = ((Number) msgPackLite.unpack(is)).intValue();
135129
long mark = cis.getBytesRead();
136130
headers = (Map<Integer, Object>) msgPackLite.unpack(is);
@@ -185,7 +179,6 @@ protected List<Map<String, Object>> readSqlResult(List<List<?>> data) {
185179
return values;
186180
}
187181

188-
189182
protected Long getSqlRowCount() {
190183
Map<Key, Object> info = (Map<Key, Object>) body.get(Key.SQL_INFO.getId());
191184
Number rowCount;
@@ -220,6 +213,21 @@ protected void closeChannel(SocketChannel channel) {
220213
}
221214
}
222215

216+
protected void closeStreams() {
217+
if (is != null) {
218+
try {
219+
is.close();
220+
} catch (IOException ignored) {
221+
}
222+
}
223+
if (cis != null) {
224+
try {
225+
cis.close();
226+
} catch (IOException ignored) {
227+
}
228+
}
229+
}
230+
223231
protected void validateArgs(Object[] args) {
224232
if (args != null) {
225233
for (int i = 0; i < args.length; i += 2) {

src/main/java/org/tarantool/TarantoolClientImpl.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -132,40 +132,34 @@ protected void reconnect(int retry, Throwable lastError) {
132132
}
133133

134134
protected void connect(final SocketChannel channel) throws Exception {
135+
closeStreams();
135136
try {
136-
DataInputStream is = new DataInputStream(cis = new ByteBufferInputStream(channel));
137+
cis = new ByteBufferInputStream(channel);
138+
is = new DataInputStream(cis);
137139
byte[] bytes = new byte[64];
138140
is.readFully(bytes);
139141
String firstLine = new String(bytes);
140142
if (!firstLine.startsWith("Tarantool")) {
141143
CommunicationException e = new CommunicationException("Welcome message should starts with tarantool " +
142144
"but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet"));
143145

146+
closeStreams();
144147
close(e);
145148
throw e;
146149
}
147150
is.readFully(bytes);
148151
this.salt = new String(bytes);
149152
if (config.username != null && config.password != null) {
150153
writeFully(channel, createAuthPacket(config.username, config.password));
151-
readPacket(is);
154+
readPacket();
152155
Long code = (Long) headers.get(Key.CODE.getId());
153156
if (code != 0) {
157+
closeStreams();
154158
throw serverError(code, body.get(Key.ERROR.getId()));
155159
}
156160
}
157-
this.is = is;
158161
} catch (IOException e) {
159-
try {
160-
is.close();
161-
} catch (IOException ignored) {
162-
163-
}
164-
try {
165-
cis.close();
166-
} catch (IOException ignored) {
167-
168-
}
162+
closeStreams();
169163
throw new CommunicationException("Couldn't connect to tarantool", e);
170164
}
171165
channel.configureBlocking(false);
@@ -358,7 +352,7 @@ protected void readThread() {
358352
while (!Thread.currentThread().isInterrupted()) {
359353
try {
360354
long code;
361-
readPacket(is);
355+
readPacket();
362356
code = (Long) headers.get(Key.CODE.getId());
363357
Long syncId = (Long) headers.get(Key.SYNC.getId());
364358
CompletableFuture<?> future = futures.remove(syncId);

src/main/java/org/tarantool/TarantoolConnection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protected List<?> exec(Code code, Object... args) {
2828
ByteBuffer packet = createPacket(code, syncId.incrementAndGet(), null, args);
2929
out.write(packet.array(), 0, packet.remaining());
3030
out.flush();
31-
readPacket(is);
31+
readPacket();
3232
Long c = (Long) headers.get(Key.CODE.getId());
3333
if (c == 0) {
3434
return (List) body.get(Key.DATA.getId());

src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ protected TarantoolClient makeClient() {
135135
return new TarantoolClientImpl(socketChannelProvider, makeClientConfig());
136136
}
137137

138+
protected TarantoolClient makeClient(SocketChannelProvider provider) {
139+
return new TarantoolClientImpl(provider, makeClientConfig());
140+
}
141+
138142
protected static TarantoolClientConfig makeClientConfig() {
139143
return fillClientConfig(new TarantoolClientConfig());
140144
}

src/test/java/org/tarantool/ClientReconnectIT.java

+56-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222
import static org.junit.jupiter.api.Assertions.assertFalse;
23+
import static org.junit.jupiter.api.Assertions.assertNull;
2324
import static org.junit.jupiter.api.Assertions.assertNotNull;
2425
import static org.junit.jupiter.api.Assertions.assertThrows;
2526
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -215,19 +216,27 @@ public void run() {
215216

216217
/**
217218
* Test concurrent operations, reconnects and close.
219+
*
218220
* Expected situation is nothing gets stuck.
221+
*
222+
* The test sets SO_LINGER to 0 for outgoing connections to avoid producing
223+
* many TIME_WAIT sockets, because an available port range can be
224+
* exhausted.
219225
*/
220226
@Test
221227
public void testLongParallelCloseReconnects() {
222228
int numThreads = 4;
223229
int numClients = 4;
224230
int timeBudget = 30*1000;
225231

232+
SocketChannelProvider provider = new TestSocketChannelProvider(host,
233+
port, RESTART_TIMEOUT).setSoLinger(0);
234+
226235
final AtomicReferenceArray<TarantoolClient> clients =
227236
new AtomicReferenceArray<TarantoolClient>(numClients);
228237

229238
for (int idx = 0; idx < clients.length(); idx++) {
230-
clients.set(idx, makeClient());
239+
clients.set(idx, makeClient(provider));
231240
}
232241

233242
final Random rnd = new Random();
@@ -256,7 +265,7 @@ public void run() {
256265

257266
cli.close();
258267

259-
TarantoolClient next = makeClient();
268+
TarantoolClient next = makeClient(provider);
260269
if (!clients.compareAndSet(idx, cli, next)) {
261270
next.close();
262271
}
@@ -284,7 +293,9 @@ public void run() {
284293
fail(e);
285294
}
286295
if (deadline > System.currentTimeMillis()) {
287-
System.out.println("" + (deadline - System.currentTimeMillis())/1000 + "s remains.");
296+
System.out.println("testLongParallelCloseReconnects: " +
297+
(deadline - System.currentTimeMillis()) / 1000 +
298+
"s remain");
288299
}
289300
}
290301

@@ -302,4 +313,46 @@ public void run() {
302313

303314
assertTrue(cnt.get() > threads.length);
304315
}
316+
317+
/**
318+
* Verify that we don't exceed a file descriptor limit (and so likely don't
319+
* leak file descriptors) when trying to connect to an existing node with
320+
* wrong authentification credentials.
321+
*
322+
* The test sets SO_LINGER to 0 for outgoing connections to avoid producing
323+
* many TIME_WAIT sockets, because an available port range can be
324+
* exhausted.
325+
*/
326+
@Test
327+
public void testReconnectWrongAuth() throws Exception {
328+
SocketChannelProvider provider = new TestSocketChannelProvider(host,
329+
port, RESTART_TIMEOUT).setSoLinger(0);
330+
TarantoolClientConfig config = makeClientConfig();
331+
config.initTimeoutMillis = 100;
332+
config.password = config.password + 'x';
333+
for (int i = 0; i < 100; ++i) {
334+
if (i % 10 == 0)
335+
System.out.println("testReconnectWrongAuth: " + (100 - i) +
336+
" iterations remain");
337+
CommunicationException e = assertThrows(CommunicationException.class,
338+
new Executable() {
339+
@Override
340+
public void execute() throws Throwable {
341+
client = new TarantoolClientImpl(provider, config);
342+
}
343+
}
344+
);
345+
assertEquals(e.getMessage(), "100ms is exceeded when waiting " +
346+
"for client initialization. You could configure init " +
347+
"timeout in TarantoolConfig");
348+
}
349+
350+
/*
351+
* Verify we don't exceed a file descriptor limit. If we exceed it, a
352+
* client will not able to connect to tarantool.
353+
*/
354+
TarantoolClient client = makeClient();
355+
client.syncOps().ping();
356+
client.close();
357+
}
305358
}

src/test/java/org/tarantool/TestSocketChannelProvider.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,42 @@
22

33
import java.net.InetSocketAddress;
44
import java.nio.channels.SocketChannel;
5+
import static java.net.StandardSocketOptions.SO_LINGER;
56

67
/**
78
* Socket channel provider to be used throughout the tests.
89
*/
910
public class TestSocketChannelProvider implements SocketChannelProvider {
1011
String host;
1112
int port;
12-
int restart_timeout;
13+
int restartTimeout;
14+
int soLinger;
1315

14-
public TestSocketChannelProvider(String host, int port, int restart_timeout) {
16+
public TestSocketChannelProvider(String host, int port, int restartTimeout) {
1517
this.host = host;
1618
this.port = port;
17-
this.restart_timeout = restart_timeout;
19+
this.restartTimeout = restartTimeout;
20+
this.soLinger = -1;
21+
}
22+
23+
public TestSocketChannelProvider setSoLinger(int soLinger) {
24+
this.soLinger = soLinger;
25+
return this;
1826
}
1927

2028
@Override
2129
public SocketChannel get(int retryNumber, Throwable lastError) {
22-
long budget = System.currentTimeMillis() + restart_timeout;
30+
long budget = System.currentTimeMillis() + restartTimeout;
2331
while (!Thread.currentThread().isInterrupted()) {
2432
try {
25-
return SocketChannel.open(new InetSocketAddress(host, port));
33+
SocketChannel channel = SocketChannel.open();
34+
/*
35+
* A value less then zero means disable lingering (it is a
36+
* default behaviour).
37+
*/
38+
channel.setOption(SO_LINGER, soLinger);
39+
channel.connect(new InetSocketAddress(host, port));
40+
return channel;
2641
} catch (Exception e) {
2742
if (budget < System.currentTimeMillis())
2843
throw new RuntimeException(e);

0 commit comments

Comments
 (0)