Skip to content

Commit 98379b5

Browse files
committed
Add retry logic in NIO read
References #11
1 parent 1bcc2bf commit 98379b5

File tree

4 files changed

+27
-16
lines changed

4 files changed

+27
-16
lines changed

src/main/java/com/rabbitmq/client/impl/nio/ByteBufferInputStream.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,7 @@ private static void readFromNetworkIfNecessary(ReadableByteChannel channel, Byte
4949
buffer.clear();
5050
int read = NioHelper.read(channel, buffer);
5151
if(read <= 0) {
52-
int attempt = 0;
53-
while(attempt < 3) {
54-
try {
55-
Thread.sleep(100L);
56-
} catch (InterruptedException e) {
57-
// ignore
58-
}
59-
read = NioHelper.read(channel, buffer);
60-
if(read > 0) {
61-
break;
62-
}
63-
attempt++;
64-
}
52+
NioHelper.retryRead(channel, buffer);
6553
}
6654
buffer.flip();
6755
}

src/main/java/com/rabbitmq/client/impl/nio/NioHelper.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.io.IOException;
1919
import java.nio.ByteBuffer;
2020
import java.nio.channels.ReadableByteChannel;
21-
import java.nio.channels.SocketChannel;
2221

2322
public class NioHelper {
2423

@@ -30,4 +29,22 @@ static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOExcepti
3029
return read;
3130
}
3231

32+
static int retryRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
33+
int attempt = 0;
34+
int read = 0;
35+
while(attempt < 3) {
36+
try {
37+
Thread.sleep(100L);
38+
} catch (InterruptedException e) {
39+
// ignore
40+
}
41+
read = read(channel, buffer);
42+
if(read > 0) {
43+
break;
44+
}
45+
attempt++;
46+
}
47+
return read;
48+
}
49+
3350
}

src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferInputStream.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ public int read() throws IOException {
6969
if (bytesRead > 0) {
7070
cipherIn.flip();
7171
} else {
72-
throw new IllegalStateException("Should be reading something from the network");
72+
bytesRead = NioHelper.retryRead(channel, cipherIn);
73+
if(bytesRead <= 0) {
74+
throw new IllegalStateException("Should be reading something from the network");
75+
}
7376
}
7477
plainIn.clear();
7578
result = sslEngine.unwrap(cipherIn, plainIn);

src/main/java/com/rabbitmq/client/impl/nio/SslEngineHelper.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ private static SSLEngineResult.HandshakeStatus unwrap(ByteBuffer cipherIn, ByteB
8787
throw new SSLException("Buffer overflow during handshake");
8888
case BUFFER_UNDERFLOW:
8989
cipherIn.compact();
90-
NioHelper.read(channel, cipherIn);
90+
int read = NioHelper.read(channel, cipherIn);
91+
if(read <= 0) {
92+
NioHelper.retryRead(channel, cipherIn);
93+
}
9194
cipherIn.flip();
9295
break;
9396
case CLOSED:

0 commit comments

Comments
 (0)