Skip to content

Commit 0f70efe

Browse files
committed
Merge bug18743 into default
2 parents 28de277 + b5b5778 commit 0f70efe

18 files changed

+247
-72
lines changed

src/com/rabbitmq/client/Connection.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,44 @@ public interface Connection extends ShutdownNotifier { // rename to AMQPConnecti
121121
Channel createChannel(int channelNumber) throws IOException;
122122

123123
/**
124-
* Close this connection with the given code and message.
125-
* @param closeCode code indicating the reason for closing the connection - see AMQP spec for a list of codes
126-
* @param closeMessage optional message describing the reason for closing the connection
124+
* Close this connection and all its channels.
125+
*
126+
* This method will wait infinitely for all the close operations to
127+
* complete.
128+
*
127129
* @throws IOException if an I/O problem is encountered
128130
*/
129-
void close(int closeCode, String closeMessage) throws IOException;
131+
void close() throws IOException;
132+
133+
/**
134+
* Close this connection and all its channels
135+
*
136+
* This method will wait with the given timeout for all the close
137+
* operations to complete. If timeout is reached then socket is forced
138+
* to close
139+
* @param timeout timeout (in milioseconds) for completing all the close-related
140+
* operations, use -1 for infinity
141+
* @throws IOException if an I/O problem is encountered
142+
*/
143+
void close(int timeout) throws IOException;
144+
145+
/**
146+
* Abort this connection and all its channels.
147+
*
148+
* This method will force the connection to close. It will silently discard
149+
* any exceptions enountered in close operations
150+
*/
151+
void abort();
152+
153+
/**
154+
* Abort this connection and all its channels.
155+
*
156+
* This method behaves in a similar way as abort(), with the only difference
157+
* that it will wait with a provided timeout for all the close operations to
158+
* complete. If timeout is reached socket is forced to close.
159+
*
160+
* @param timeout timeout (in miliseconds) for completing all the close-related
161+
* operations, use -1 for infinity
162+
*/
163+
void abort(int timeout);
130164
}

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
package com.rabbitmq.client.impl;
2727

2828
import java.io.IOException;
29+
import java.util.concurrent.TimeoutException;
2930

3031
import com.rabbitmq.client.AlreadyClosedException;
3132
import com.rabbitmq.client.Command;
3233
import com.rabbitmq.client.Connection;
3334
import com.rabbitmq.client.ShutdownSignalException;
3435
import com.rabbitmq.utility.BlockingValueOrException;
35-
import com.rabbitmq.utility.SingleShotLinearTimer;
3636

3737
/**
3838
* Base class modelling an AMQ channel. Subclasses implement close()
@@ -210,34 +210,11 @@ public synchronized void rpc(Method m, RpcContinuation k)
210210
public AMQCommand quiescingRpc(Method m,
211211
int timeoutMillisec,
212212
final AMQCommand timeoutReply)
213-
throws IOException, ShutdownSignalException
213+
throws IOException, ShutdownSignalException, TimeoutException
214214
{
215-
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
215+
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
216216
transmitAndEnqueue(m, k);
217-
if (timeoutMillisec != 0) {
218-
SingleShotLinearTimer timer = new SingleShotLinearTimer();
219-
220-
Runnable task = new Runnable() {
221-
public void run() {
222-
// Timed out waiting for reply.
223-
// Simulate a reply.
224-
// TODO: Warn the user somehow??
225-
try {
226-
handleCompleteInboundCommand(timeoutReply);
227-
} catch (IOException ioe) {
228-
// Ignore.
229-
}
230-
}
231-
};
232-
timer.schedule(task, timeoutMillisec);
233-
try {
234-
return k.getReply();
235-
} finally {
236-
timer.cancel();
237-
}
238-
} else {
239-
return k.getReply();
240-
}
217+
return k.getReply(timeoutMillisec);
241218
}
242219

243220
/**
@@ -297,6 +274,12 @@ public T getReply() throws ShutdownSignalException
297274
{
298275
return _blocker.uninterruptibleGetValue();
299276
}
277+
278+
public T getReply(int timeout)
279+
throws ShutdownSignalException, TimeoutException
280+
{
281+
return _blocker.uninterruptibleGetValue(timeout);
282+
}
300283

301284
public abstract T transformReply(AMQCommand command);
302285
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 107 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.net.SocketException;
3131
import java.util.Map;
32+
import java.util.concurrent.TimeoutException;
3233

3334
import com.rabbitmq.client.AMQP;
3435
import com.rabbitmq.client.Address;
@@ -40,6 +41,7 @@
4041
import com.rabbitmq.client.MissedHeartbeatException;
4142
import com.rabbitmq.client.RedirectException;
4243
import com.rabbitmq.client.ShutdownSignalException;
44+
import com.rabbitmq.utility.BlockingCell;
4345
import com.rabbitmq.utility.Utility;
4446

4547
/**
@@ -95,6 +97,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
9597
/** Handler for (otherwise-unhandled) exceptions that crop up in the mainloop. */
9698
public final ExceptionHandler _exceptionHandler;
9799

100+
public BlockingCell<Object> appContinuation = new BlockingCell<Object>();
101+
98102
/**
99103
* Protected API - respond, in the driver thread, to a ShutdownSignal.
100104
* @param channelNumber the number of the channel to disconnect
@@ -437,7 +441,8 @@ public MainLoop() {
437441
// channel zero that aren't Connection.CloseOk) must
438442
// be discarded.
439443
ChannelN channel = _channelManager.getChannel(frame.channel);
440-
channel.handleFrame(frame); // Should check for null here?
444+
// FIXME: catch NullPointerException and throw more informative one?
445+
channel.handleFrame(frame);
441446
}
442447
}
443448
}
@@ -462,6 +467,10 @@ public MainLoop() {
462467

463468
// Finally, shut down our underlying data connection.
464469
_frameHandler.close();
470+
471+
synchronized(this) {
472+
appContinuation.set(null);
473+
}
465474
}
466475
}
467476

@@ -543,12 +552,24 @@ public boolean processControlCommand(Command c)
543552
}
544553

545554
public void handleConnectionClose(Command closeCommand) {
555+
shutdown(closeCommand, false, null);
546556
try {
547557
_channel0.transmit(new AMQImpl.Connection.CloseOk());
548558
} catch (IOException ioe) {
549559
Utility.emptyStatement();
550560
}
551-
shutdown(closeCommand, false, null);
561+
562+
try {
563+
synchronized(this) {
564+
appContinuation.uninterruptibleGet(CONNECTION_CLOSING_TIMEOUT);
565+
}
566+
} catch (TimeoutException ise) {
567+
// Broker didn't close socket on time, force socket close
568+
// FIXME: notify about timeout exception?
569+
_frameHandler.close();
570+
} finally {
571+
_running = false;
572+
}
552573
notifyListeners();
553574
}
554575

@@ -561,27 +582,86 @@ public void shutdown(Object reason,
561582
boolean initiatedByApplication,
562583
Throwable cause)
563584
{
564-
synchronized (this) {
565-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
566-
_shutdownCause = new ShutdownSignalException(true,
567-
initiatedByApplication,
568-
reason, this);
569-
}
570-
if (cause != null) {
571-
_shutdownCause.initCause(cause);
572-
}
573-
_channel0.processShutdownSignal(_shutdownCause);
585+
try {
586+
synchronized (this) {
587+
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
588+
_shutdownCause = new ShutdownSignalException(true,
589+
initiatedByApplication,
590+
reason, this);
591+
}
592+
593+
if (cause != null) {
594+
_shutdownCause.initCause(cause);
595+
}
596+
_channel0.processShutdownSignal(_shutdownCause);
597+
} catch (AlreadyClosedException ace) {
598+
if (initiatedByApplication)
599+
throw ace;
600+
}
574601
_channelManager.handleSignal(_shutdownCause);
575602
}
576603

577604
/**
578-
* Public API - Close this connection with the given code and message.
605+
* Public API - Close this connection and all its channels
606+
*/
607+
public void close()
608+
throws IOException
609+
{
610+
close(-1);
611+
}
612+
613+
/**
614+
* Public API - Close this connection and all its channels
615+
* with a given timeout
616+
*/
617+
public void close(int timeout)
618+
throws IOException
619+
{
620+
close(200, "Goodbye", timeout);
621+
}
622+
623+
/**
624+
* Public API - Abort this connection and all its channels
625+
*/
626+
public void abort()
627+
{
628+
abort(-1);
629+
}
630+
631+
public void abort(int timeout)
632+
{
633+
634+
try {
635+
close(200, "Goodbye", true, null, timeout, true);
636+
} catch (IOException e) {
637+
Utility.emptyStatement();
638+
}
639+
}
640+
641+
/**
642+
* Protected API - Close this connection with the given code and message.
579643
* See the comments in ChannelN.close() - we're very similar.
580644
*/
581645
public void close(int closeCode, String closeMessage)
582646
throws IOException
583647
{
584-
close(closeCode, closeMessage, true, null);
648+
close (closeCode, closeMessage, 0);
649+
}
650+
651+
public void close(int closeCode, String closeMessage, int timeout)
652+
throws IOException
653+
{
654+
close(closeCode, closeMessage, true, null, timeout, false);
655+
}
656+
657+
public void close(int closeCode,
658+
String closeMessage,
659+
boolean initiatedByApplication,
660+
Throwable cause)
661+
throws IOException
662+
{
663+
close(closeCode, closeMessage, initiatedByApplication, cause, 0, false);
664+
585665
}
586666

587667
/**
@@ -590,25 +670,34 @@ public void close(int closeCode, String closeMessage)
590670
public void close(int closeCode,
591671
String closeMessage,
592672
boolean initiatedByApplication,
593-
Throwable cause)
673+
Throwable cause,
674+
int timeout,
675+
boolean abort)
594676
throws IOException
595677
{
596678
try {
597679
AMQImpl.Connection.Close reason =
598680
new AMQImpl.Connection.Close(closeCode, closeMessage, 0, 0);
599681
shutdown(reason, initiatedByApplication, cause);
600682
_channel0.quiescingRpc(reason,
601-
CONNECTION_CLOSING_TIMEOUT,
683+
timeout,
602684
new AMQCommand(new AMQImpl.Connection.CloseOk()));
685+
} catch (TimeoutException ise) {
686+
// FIXME: notify about timeout exception ?
603687
} catch (ShutdownSignalException sse) {
604-
// Ignore.
688+
if (!abort)
689+
throw sse;
690+
} catch (IOException ioe) {
691+
if (!abort)
692+
throw ioe;
605693
} finally {
606694
_running = false;
695+
_frameHandler.close();
607696
}
608697
notifyListeners();
609698
}
610699

611700
@Override public String toString() {
612701
return "amqp://" + _params.getUserName() + "@" + getHost() + ":" + getPort() + _params.getVirtualHost();
613702
}
614-
}
703+
}

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public void handleSignal(ShutdownSignalException signal) {
6868
}
6969
for (AMQChannel channel : channels) {
7070
disconnectChannel(channel.getChannelNumber());
71-
channel.processShutdownSignal(signal);
71+
try {
72+
channel.processShutdownSignal(signal);
73+
} catch (ShutdownSignalException sse) {
74+
// Ignore already closed channels
75+
}
7276
}
7377
}
7478

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Collections;
3030
import java.util.HashMap;
3131
import java.util.Map;
32+
import java.util.concurrent.TimeoutException;
3233

3334
import com.rabbitmq.client.Command;
3435
import com.rabbitmq.client.Connection;
@@ -124,7 +125,6 @@ public void setReturnListener(ReturnListener listener) {
124125
* @param signal an exception signalling channel shutdown
125126
*/
126127
public void broadcastShutdownSignal(ShutdownSignalException signal) {
127-
128128
Map<String, Consumer> snapshotConsumers;
129129
synchronized (_consumers) {
130130
snapshotConsumers = new HashMap<String, Consumer>(_consumers);
@@ -218,13 +218,13 @@ public void releaseChannelNumber() {
218218
}
219219
return true;
220220
} else if (method instanceof Channel.Close) {
221-
transmit(new Channel.CloseOk());
222221
releaseChannelNumber();
223222
ShutdownSignalException signal = new ShutdownSignalException(false,
224223
false,
225224
command,
226225
this);
227226
processShutdownSignal(signal);
227+
transmit(new Channel.CloseOk());
228228
notifyListeners();
229229
return true;
230230
} else {
@@ -291,6 +291,8 @@ public void close(int closeCode,
291291
quiescingRpc(reason,
292292
CLOSING_TIMEOUT,
293293
new AMQCommand(new Channel.CloseOk()));
294+
} catch (TimeoutException ise) {
295+
// FIXME: propagate it to the user
294296
} catch (ShutdownSignalException sse) {
295297
// Ignore.
296298
}

0 commit comments

Comments
 (0)