Skip to content

Commit f9330fc

Browse files
committed
fix: use wait/notify pattern instead of Thread.sleep
Signed-off-by: Daniel Arteaga Barba <[email protected]>
1 parent 87fea18 commit f9330fc

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.PipedOutputStream;
2323
import java.io.Reader;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Instant;
26+
import java.time.temporal.ChronoUnit;
2527
import java.util.HashMap;
2628
import java.util.Map;
2729
import okhttp3.WebSocket;
@@ -212,9 +214,9 @@ private class WebSocketOutputStream extends OutputStream {
212214

213215
private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
214216

215-
private static final int MAX_QUEUE_SIZE_MAX_ATTEMPTS = 15;
217+
private static final int MAX_WAIT_MILLIS = 10000;
216218

217-
private static final int MAX_QUEUE_SIZE_WAIT_MILLISECONDS = 1000;
219+
private static final int WAIT_MILLIS = 10;
218220

219221
private final byte stream;
220222

@@ -276,19 +278,22 @@ public void write(byte[] b, int offset, int length) throws IOException {
276278
System.arraycopy(b, offset + bytesWritten, buffer, 1, bufferSize);
277279
ByteString byteString = ByteString.of(buffer);
278280

279-
int attempts = 0;
280-
while (WebSocketStreamHandler.this.socket.queueSize() + byteString.size() > MAX_QUEUE_SIZE
281-
&& attempts < MAX_QUEUE_SIZE_MAX_ATTEMPTS) {
282-
try {
283-
Thread.sleep(MAX_QUEUE_SIZE_WAIT_MILLISECONDS);
284-
attempts++;
285-
} catch (InterruptedException e) {
286-
throw new IOException("Error waiting web socket queue", e);
281+
final Instant start = Instant.now();
282+
synchronized (WebSocketOutputStream.this) {
283+
while (WebSocketStreamHandler.this.socket.queueSize() + byteString.size() > MAX_QUEUE_SIZE
284+
&& Instant.now().isBefore(start.plus(MAX_WAIT_MILLIS, ChronoUnit.MILLIS))) {
285+
try {
286+
wait(WAIT_MILLIS);
287+
} catch (InterruptedException e) {
288+
throw new IOException("Error waiting web socket queue", e);
289+
}
290+
}
291+
292+
if (!WebSocketStreamHandler.this.socket.send(byteString)) {
293+
throw new IOException("WebSocket has closed.");
287294
}
288-
}
289295

290-
if (!WebSocketStreamHandler.this.socket.send(byteString)) {
291-
throw new IOException("WebSocket has closed.");
296+
notifyAll();
292297
}
293298

294299
bytesWritten += bufferSize;

0 commit comments

Comments
 (0)