14
14
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15
15
//
16
16
17
-
18
17
package com .rabbitmq .client .impl ;
19
18
20
19
import java .io .EOFException ;
@@ -258,10 +257,13 @@ public AMQConnection(String username,
258
257
* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
259
258
* calls Connection.Open and waits for the OpenOk. Sets heart-beat
260
259
* and frame max values after tuning has taken place.
261
- * @throws IOException if an error is encountered;
260
+ * @throws IOException if an error is encountered
261
+ * either before, or during, protocol negotiation;
262
262
* sub-classes {@link ProtocolVersionMismatchException} and
263
263
* {@link PossibleAuthenticationFailureException} will be thrown in the
264
- * corresponding circumstances.
264
+ * corresponding circumstances. If an exception is thrown, connection
265
+ * resources allocated can all be garbage collected when the connection
266
+ * object is no longer referenced.
265
267
*/
266
268
public void start ()
267
269
throws IOException
@@ -278,17 +280,22 @@ public void start()
278
280
// initiator) is to wait for a connection.start method to
279
281
// arrive.
280
282
_channel0 .enqueueRpc (connStartBlocker );
281
- // The following two lines are akin to AMQChannel's
282
- // transmit() method for this pseudo-RPC.
283
- _frameHandler .setTimeout (HANDSHAKE_TIMEOUT );
284
- _frameHandler .sendHeader ();
283
+ try {
284
+ // The following two lines are akin to AMQChannel's
285
+ // transmit() method for this pseudo-RPC.
286
+ _frameHandler .setTimeout (HANDSHAKE_TIMEOUT );
287
+ _frameHandler .sendHeader ();
288
+ } catch (IOException ioe ) {
289
+ _frameHandler .close ();
290
+ throw ioe ;
291
+ }
285
292
286
293
// start the main loop going
287
- Thread ml = new MainLoop ();
288
- ml .setName ("AMQP Connection " + getHostAddress () + ":" + getPort ());
289
- ml .start ();
294
+ new MainLoop ("AMQP Connection " + getHostAddress () + ":" + getPort ()).start ();
295
+ // after this point clear-up of MainLoop is triggered by closing the frameHandler.
290
296
291
297
AMQP .Connection .Start connStart = null ;
298
+ AMQP .Connection .Tune connTune = null ;
292
299
try {
293
300
connStart =
294
301
(AMQP .Connection .Start ) connStartBlocker .getReply ().getMethod ();
@@ -300,70 +307,84 @@ public void start()
300
307
connStart .getVersionMinor ());
301
308
302
309
if (!Version .checkVersion (clientVersion , serverVersion )) {
303
- _frameHandler .close (); //this will cause mainLoop to terminate
304
310
throw new ProtocolVersionMismatchException (clientVersion ,
305
311
serverVersion );
306
312
}
313
+
314
+ String [] mechanisms = connStart .getMechanisms ().toString ().split (" " );
315
+ SaslMechanism sm = this .saslConfig .getSaslMechanism (mechanisms );
316
+ if (sm == null ) {
317
+ throw new IOException ("No compatible authentication mechanism found - " +
318
+ "server offered [" + connStart .getMechanisms () + "]" );
319
+ }
320
+
321
+ LongString challenge = null ;
322
+ LongString response = sm .handleChallenge (null , this .username , this .password );
323
+
324
+ do {
325
+ Method method = (challenge == null )
326
+ ? new AMQP .Connection .StartOk .Builder ()
327
+ .clientProperties (_clientProperties )
328
+ .mechanism (sm .getName ())
329
+ .response (response )
330
+ .build ()
331
+ : new AMQP .Connection .SecureOk .Builder ().response (response ).build ();
332
+
333
+ try {
334
+ Method serverResponse = _channel0 .rpc (method ).getMethod ();
335
+ if (serverResponse instanceof AMQP .Connection .Tune ) {
336
+ connTune = (AMQP .Connection .Tune ) serverResponse ;
337
+ } else {
338
+ challenge = ((AMQP .Connection .Secure ) serverResponse ).getChallenge ();
339
+ response = sm .handleChallenge (challenge , this .username , this .password );
340
+ }
341
+ } catch (ShutdownSignalException e ) {
342
+ throw new PossibleAuthenticationFailureException (e );
343
+ }
344
+ } while (connTune == null );
307
345
} catch (ShutdownSignalException sse ) {
346
+ _frameHandler .close ();
308
347
throw AMQChannel .wrap (sse );
348
+ } catch (IOException ioe ) {
349
+ _frameHandler .close ();
350
+ throw ioe ;
309
351
}
310
352
311
- String [] mechanisms = connStart .getMechanisms ().toString ().split (" " );
312
- SaslMechanism sm = this .saslConfig .getSaslMechanism (mechanisms );
313
- if (sm == null ) {
314
- throw new IOException ("No compatible authentication mechanism found - " +
315
- "server offered [" + connStart .getMechanisms () + "]" );
353
+ try {
354
+ int channelMax =
355
+ negotiatedMaxValue (this .requestedChannelMax ,
356
+ connTune .getChannelMax ());
357
+ _channelManager = new ChannelManager (this ._workService , channelMax );
358
+
359
+ int frameMax =
360
+ negotiatedMaxValue (this .requestedFrameMax ,
361
+ connTune .getFrameMax ());
362
+ this ._frameMax = frameMax ;
363
+
364
+ int heartbeat =
365
+ negotiatedMaxValue (this .requestedHeartbeat ,
366
+ connTune .getHeartbeat ());
367
+
368
+ setHeartbeat (heartbeat );
369
+
370
+ _channel0 .transmit (new AMQP .Connection .TuneOk .Builder ()
371
+ .channelMax (channelMax )
372
+ .frameMax (frameMax )
373
+ .heartbeat (heartbeat )
374
+ .build ());
375
+ _channel0 .exnWrappingRpc (new AMQP .Connection .Open .Builder ()
376
+ .virtualHost (_virtualHost )
377
+ .build ());
378
+ } catch (IOException ioe ) {
379
+ _heartbeatSender .shutdown ();
380
+ _frameHandler .close ();
381
+ throw ioe ;
382
+ } catch (ShutdownSignalException sse ) {
383
+ _heartbeatSender .shutdown ();
384
+ _frameHandler .close ();
385
+ throw AMQChannel .wrap (sse );
316
386
}
317
387
318
- LongString challenge = null ;
319
- LongString response = sm .handleChallenge (null , this .username , this .password );
320
-
321
- AMQP .Connection .Tune connTune = null ;
322
- do {
323
- Method method = (challenge == null )
324
- ? new AMQP .Connection .StartOk .Builder ()
325
- .clientProperties (_clientProperties )
326
- .mechanism (sm .getName ())
327
- .response (response )
328
- .build ()
329
- : new AMQP .Connection .SecureOk .Builder ().response (response ).build ();
330
-
331
- try {
332
- Method serverResponse = _channel0 .rpc (method ).getMethod ();
333
- if (serverResponse instanceof AMQP .Connection .Tune ) {
334
- connTune = (AMQP .Connection .Tune ) serverResponse ;
335
- } else {
336
- challenge = ((AMQP .Connection .Secure ) serverResponse ).getChallenge ();
337
- response = sm .handleChallenge (challenge , this .username , this .password );
338
- }
339
- } catch (ShutdownSignalException e ) {
340
- throw new PossibleAuthenticationFailureException (e );
341
- }
342
- } while (connTune == null );
343
-
344
- int channelMax =
345
- negotiatedMaxValue (this .requestedChannelMax ,
346
- connTune .getChannelMax ());
347
- _channelManager = new ChannelManager (this ._workService , channelMax );
348
-
349
- int frameMax =
350
- negotiatedMaxValue (this .requestedFrameMax ,
351
- connTune .getFrameMax ());
352
- this ._frameMax = frameMax ;
353
-
354
- int heartbeat =
355
- negotiatedMaxValue (this .requestedHeartbeat ,
356
- connTune .getHeartbeat ());
357
- setHeartbeat (heartbeat );
358
-
359
- _channel0 .transmit (new AMQP .Connection .TuneOk .Builder ()
360
- .channelMax (channelMax )
361
- .frameMax (frameMax )
362
- .heartbeat (heartbeat )
363
- .build ());
364
- _channel0 .exnWrappingRpc (new AMQP .Connection .Open .Builder ()
365
- .virtualHost (_virtualHost )
366
- .build ());
367
388
return ;
368
389
}
369
390
@@ -452,6 +473,13 @@ private static final int negotiatedMaxValue(int clientValue, int serverValue) {
452
473
453
474
private class MainLoop extends Thread {
454
475
476
+ /**
477
+ * @param name of thread
478
+ */
479
+ MainLoop (String name ) {
480
+ super (name );
481
+ }
482
+
455
483
/**
456
484
* Channel reader thread main loop. Reads a frame, and if it is
457
485
* not a heartbeat frame, dispatches it to the channel it refers to.
@@ -582,7 +610,7 @@ public void handleConnectionClose(Command closeCommand) {
582
610
}
583
611
584
612
private class SocketCloseWait extends Thread {
585
- private ShutdownSignalException cause ;
613
+ private final ShutdownSignalException cause ;
586
614
587
615
public SocketCloseWait (ShutdownSignalException sse ) {
588
616
cause = sse ;
@@ -614,7 +642,7 @@ public ShutdownSignalException shutdown(Object reason,
614
642
reason , this );
615
643
sse .initCause (cause );
616
644
if (!setShutdownCauseIfOpen (sse )) {
617
- if (initiatedByApplication )
645
+ if (initiatedByApplication )
618
646
throw new AlreadyClosedException ("Attempt to use closed connection" , this );
619
647
}
620
648
0 commit comments