Skip to content

Commit 628a4c5

Browse files
author
Simon MacMullen
committed
Backport eb09ae778764 (Merge of bug24030; Remove flushes after every frame to boost performance with SSL)
1 parent d95228d commit 628a4c5

File tree

7 files changed

+31
-2
lines changed

7 files changed

+31
-2
lines changed

src/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public void transmit(AMQChannel channel) throws IOException {
121121
}
122122
}
123123
}
124+
125+
connection.flush();
124126
}
125127

126128
@Override public String toString() {

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,13 @@ public void writeFrame(Frame f) throws IOException {
474474
_heartbeatSender.signalActivity();
475475
}
476476

477+
/**
478+
* Public API - flush the output buffers
479+
*/
480+
public void flush() throws IOException {
481+
_frameHandler.flush();
482+
}
483+
477484
private static final int negotiatedMaxValue(int clientValue, int serverValue) {
478485
return (clientValue == 0 || serverValue == 0) ?
479486
Math.max(clientValue, serverValue) :

src/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ public interface FrameHandler {
7171
*/
7272
void writeFrame(Frame frame) throws IOException;
7373

74+
/**
75+
* Flush the underlying data connection.
76+
*
77+
* @throws IOException if there is a problem accessing the connection
78+
*/
79+
void flush() throws IOException;
80+
7481
/** Close the underlying data connection (complaint not permitted). */
7582
void close();
7683
}

src/com/rabbitmq/client/impl/HeartbeatSender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public void run() {
131131

132132
if (now > (lastActivityTime + this.heartbeatNanos)) {
133133
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
134+
frameHandler.flush();
134135
}
135136
} catch (IOException e) {
136137
// ignore

src/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,15 @@ public Frame readFrame() throws IOException {
135135
public void writeFrame(Frame frame) throws IOException {
136136
synchronized (_outputStream) {
137137
frame.writeTo(_outputStream);
138-
_outputStream.flush();
139138
}
140139
}
141140

141+
public void flush() throws IOException {
142+
_outputStream.flush();
143+
}
144+
142145
public void close() {
143146
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _) {}
144-
try { _socket.close(); } catch (Exception _) {}
147+
try { flush(); _socket.close(); } catch (Exception _) {}
145148
}
146149
}

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ public InetAddress getAddress() {
216216
public int getPort() {
217217
return -1;
218218
}
219+
220+
public void flush() throws IOException {
221+
// no need to implement this: don't bother writing the frame
222+
}
219223
}
220224

221225
/** Exception handler to facilitate testing. */

test/src/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ public InetAddress getAddress() {
173173
public int getPort() {
174174
return -1;
175175
}
176+
177+
@Override
178+
public void flush() throws IOException {
179+
// no need to implement this: don't bother writing the frame
180+
}
176181
}
177182

178183
}

0 commit comments

Comments
 (0)