Skip to content

Commit 7043ab5

Browse files
authored
Merge pull request #2388 from dani8art/wait-before-adding-data-to-queue
fix: WebSocketOutputStream wait to write until WebSocket queue is released
2 parents 1b904b0 + f9330fc commit 7043ab5

File tree

1 file changed

+29
-2
lines changed

1 file changed

+29
-2
lines changed

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.io.PipedOutputStream;
2323
import java.io.Reader;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Instant;
26+
import java.time.temporal.ChronoUnit;
2527
import java.util.HashMap;
2628
import java.util.Map;
2729
import okhttp3.WebSocket;
@@ -210,6 +212,13 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) {
210212
}
211213

212214
private class WebSocketOutputStream extends OutputStream {
215+
216+
private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
217+
218+
private static final int MAX_WAIT_MILLIS = 10000;
219+
220+
private static final int WAIT_MILLIS = 10;
221+
213222
private final byte stream;
214223

215224
public WebSocketOutputStream(int stream) {
@@ -266,10 +275,28 @@ public void write(byte[] b, int offset, int length) throws IOException {
266275
int bufferSize = Math.min(remaining, 15 * 1024 * 1024);
267276
byte[] buffer = new byte[bufferSize + 1];
268277
buffer[0] = stream;
278+
269279
System.arraycopy(b, offset + bytesWritten, buffer, 1, bufferSize);
270-
if (!WebSocketStreamHandler.this.socket.send(ByteString.of(buffer))) {
271-
throw new IOException("WebSocket has closed.");
280+
ByteString byteString = ByteString.of(buffer);
281+
282+
final Instant start = Instant.now();
283+
synchronized (WebSocketOutputStream.this) {
284+
while (WebSocketStreamHandler.this.socket.queueSize() + byteString.size() > MAX_QUEUE_SIZE
285+
&& Instant.now().isBefore(start.plus(MAX_WAIT_MILLIS, ChronoUnit.MILLIS))) {
286+
try {
287+
wait(WAIT_MILLIS);
288+
} catch (InterruptedException e) {
289+
throw new IOException("Error waiting web socket queue", e);
290+
}
291+
}
292+
293+
if (!WebSocketStreamHandler.this.socket.send(byteString)) {
294+
throw new IOException("WebSocket has closed.");
295+
}
296+
297+
notifyAll();
272298
}
299+
273300
bytesWritten += bufferSize;
274301
remaining -= bufferSize;
275302
}

0 commit comments

Comments
 (0)