37
37
import java .util .Map ;
38
38
import java .util .HashMap ;
39
39
import java .util .concurrent .TimeoutException ;
40
+ import java .util .concurrent .ScheduledExecutorService ;
41
+ import java .util .concurrent .Executors ;
42
+ import java .util .concurrent .TimeUnit ;
40
43
41
44
import com .rabbitmq .client .AMQP ;
42
45
import com .rabbitmq .client .Address ;
@@ -89,7 +92,7 @@ public static Map<String, Object> defaultClientProperties() {
89
92
new Version (AMQP .PROTOCOL .MAJOR , AMQP .PROTOCOL .MINOR );
90
93
91
94
/** Initialization parameters */
92
- private final ConnectionFactory factory ;
95
+ private final ConnectionFactory _factory ;
93
96
94
97
/** The special channel 0 */
95
98
private final AMQChannel _channel0 = new AMQChannel (this , 0 ) {
@@ -122,6 +125,9 @@ public static Map<String, Object> defaultClientProperties() {
122
125
/** Flag indicating whether the client received Connection.Close message from the broker */
123
126
private boolean _brokerInitiatedShutdown = false ;
124
127
128
+ /** Manages heartbeat sending for this connection */
129
+ private final HeartbeatSender _heartbeatSender ;
130
+
125
131
/**
126
132
* Protected API - respond, in the driver thread, to a ShutdownSignal.
127
133
* @param channel the channel to disconnect
@@ -138,12 +144,6 @@ public void ensureIsOpen()
138
144
}
139
145
}
140
146
141
- /**
142
- * Timestamp of last time we wrote a frame - used for deciding when to
143
- * send a heartbeat
144
- */
145
- private volatile long _lastActivityTime = Long .MAX_VALUE ;
146
-
147
147
/**
148
148
* Count of socket-timeouts that have happened without any incoming frames
149
149
*/
@@ -210,7 +210,8 @@ public AMQConnection(ConnectionFactory factory,
210
210
_requestedHeartbeat = factory .getRequestedHeartbeat ();
211
211
_clientProperties = new HashMap <String , Object >(factory .getClientProperties ());
212
212
213
- this .factory = factory ;
213
+ _factory = factory ;
214
+ _heartbeatSender = new HeartbeatSender (frameHandler );
214
215
_frameHandler = frameHandler ;
215
216
_running = true ;
216
217
_frameMax = 0 ;
@@ -288,17 +289,17 @@ public void start()
288
289
}
289
290
290
291
int channelMax =
291
- negotiatedMaxValue (factory .getRequestedChannelMax (),
292
+ negotiatedMaxValue (_factory .getRequestedChannelMax (),
292
293
connTune .getChannelMax ());
293
294
_channelManager = new ChannelManager (channelMax );
294
295
295
296
int frameMax =
296
- negotiatedMaxValue (factory .getRequestedFrameMax (),
297
+ negotiatedMaxValue (_factory .getRequestedFrameMax (),
297
298
connTune .getFrameMax ());
298
299
setFrameMax (frameMax );
299
300
300
301
int heartbeat =
301
- negotiatedMaxValue (factory .getRequestedHeartbeat (),
302
+ negotiatedMaxValue (_factory .getRequestedHeartbeat (),
302
303
connTune .getHeartbeat ());
303
304
setHeartbeat (heartbeat );
304
305
@@ -349,10 +350,12 @@ public int getHeartbeat() {
349
350
*/
350
351
public void setHeartbeat (int heartbeat ) {
351
352
try {
353
+ _heartbeatSender .setHeartbeat (heartbeat );
354
+ _heartbeat = heartbeat ;
355
+
352
356
// Divide by four to make the maximum unwanted delay in
353
357
// sending a timeout be less than a quarter of the
354
358
// timeout setting.
355
- _heartbeat = heartbeat ;
356
359
_frameHandler .setTimeout (heartbeat * 1000 / 4 );
357
360
} catch (SocketException se ) {
358
361
// should do more here?
@@ -395,7 +398,7 @@ public Frame readFrame() throws IOException {
395
398
*/
396
399
public void writeFrame (Frame f ) throws IOException {
397
400
_frameHandler .writeFrame (f );
398
- _lastActivityTime = System . nanoTime ();
401
+ _heartbeatSender . signalActivity ();
399
402
}
400
403
401
404
private static int negotiatedMaxValue (int clientValue , int serverValue ) {
@@ -416,7 +419,7 @@ private class MainLoop extends Thread {
416
419
try {
417
420
while (_running ) {
418
421
Frame frame = readFrame ();
419
- maybeSendHeartbeat ();
422
+
420
423
if (frame != null ) {
421
424
_missedHeartbeats = 0 ;
422
425
if (frame .type == AMQP .FRAME_HEARTBEAT ) {
@@ -459,25 +462,6 @@ private class MainLoop extends Thread {
459
462
}
460
463
}
461
464
462
- private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000 ;
463
-
464
- /**
465
- * Private API - Checks lastActivityTime and heartbeat, sending a
466
- * heartbeat frame if conditions are right.
467
- */
468
- public void maybeSendHeartbeat () throws IOException {
469
- if (_heartbeat == 0 ) {
470
- // No heartbeating.
471
- return ;
472
- }
473
-
474
- long now = System .nanoTime ();
475
- if (now > (_lastActivityTime + (_heartbeat * NANOS_IN_SECOND ))) {
476
- _lastActivityTime = now ;
477
- writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
478
- }
479
- }
480
-
481
465
/**
482
466
* Private API - Called when a frame-read operation times out. Checks to
483
467
* see if too many heartbeats have been missed, and if so, throws
@@ -557,7 +541,7 @@ public void handleConnectionClose(Command closeCommand) {
557
541
} catch (IOException ioe ) {
558
542
Utility .emptyStatement ();
559
543
}
560
- _heartbeat = 0 ; // Do not try to send heartbeats after CloseOk
544
+ _heartbeatSender . shutdown () ; // Do not try to send heartbeats after CloseOk
561
545
_brokerInitiatedShutdown = true ;
562
546
Thread scw = new SocketCloseWait (sse );
563
547
scw .setName ("AMQP Connection Closing Monitor " +
@@ -607,6 +591,10 @@ public ShutdownSignalException shutdown(Object reason,
607
591
if (isOpen ())
608
592
_shutdownCause = sse ;
609
593
}
594
+
595
+ // stop any heartbeating
596
+ _heartbeatSender .shutdown ();
597
+
610
598
_channel0 .processShutdownSignal (sse , !initiatedByApplication , notifyRpc );
611
599
_channelManager .handleSignal (sse );
612
600
return sse ;
0 commit comments