@@ -209,6 +209,13 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) {
209
209
}
210
210
211
211
private class WebSocketOutputStream extends OutputStream {
212
+
213
+ private static final long WEB_SOCKET_MAX_QUEUE_SIZE = 16L * 1024 * 1024 ;
214
+
215
+ private static final int WEB_SOCKET_MAX_QUEUE_SIZE_MAX_ATTEMPTS = 15 ;
216
+
217
+ private static final int WEB_SOCKET_MAX_QUEUE_SIZE_WAIT_MILLISECONDS = 1000 ;
218
+
212
219
private final byte stream ;
213
220
214
221
public WebSocketOutputStream (int stream ) {
@@ -265,10 +272,25 @@ public void write(byte[] b, int offset, int length) throws IOException {
265
272
int bufferSize = Math .min (remaining , 15 * 1024 * 1024 );
266
273
byte [] buffer = new byte [bufferSize + 1 ];
267
274
buffer [0 ] = stream ;
275
+
268
276
System .arraycopy (b , offset + bytesWritten , buffer , 1 , bufferSize );
269
- if (!WebSocketStreamHandler .this .socket .send (ByteString .of (buffer ))) {
277
+ ByteString byteString = ByteString .of (buffer );
278
+
279
+ int attempts = 0 ;
280
+ while (WebSocketStreamHandler .this .socket .queueSize () + byteString .size () > WEB_SOCKET_MAX_QUEUE_SIZE
281
+ && attempts < WEB_SOCKET_MAX_QUEUE_SIZE_MAX_ATTEMPTS ) {
282
+ try {
283
+ Thread .sleep (WEB_SOCKET_MAX_QUEUE_SIZE_WAIT_MILLISECONDS );
284
+ attempts ++;
285
+ } catch (InterruptedException e ) {
286
+ throw new IOException ("Error waiting web socket queue" , e );
287
+ }
288
+ }
289
+
290
+ if (!WebSocketStreamHandler .this .socket .send (byteString )) {
270
291
throw new IOException ("WebSocket has closed." );
271
292
}
293
+
272
294
bytesWritten += bufferSize ;
273
295
remaining -= bufferSize ;
274
296
}
0 commit comments