Skip to content

Commit 3ab49db

Browse files
author
Emile Joubert
committed
Merged bug23722 into default
2 parents 2a4cb87 + 84e678f commit 3ab49db

File tree

6 files changed

+89
-75
lines changed

6 files changed

+89
-75
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ public void processShutdownSignal(ShutdownSignalException signal,
245245
boolean notifyRpc) {
246246
try {
247247
synchronized (_channelMutex) {
248-
if (!ignoreClosed)
249-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
250-
if (isOpen())
251-
_shutdownCause = signal;
248+
if (!setShutdownCauseIfOpen(signal)) {
249+
if (!ignoreClosed)
250+
throw new AlreadyClosedException("Attempt to use closed channel", this);
251+
}
252252

253253
_channelMutex.notifyAll();
254254
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ public void start()
317317
heartbeat));
318318
// 0.9.1: insist [on not being redirected] is deprecated, but
319319
// still in generated code; just pass a dummy value here
320-
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321-
"",
322-
false)).getMethod();
320+
_channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321+
"",
322+
false)).getMethod();
323323
return;
324324
}
325325

@@ -594,11 +594,9 @@ public ShutdownSignalException shutdown(Object reason,
594594
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
595595
reason, this);
596596
sse.initCause(cause);
597-
synchronized (this) {
598-
if (initiatedByApplication)
599-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
600-
if (isOpen())
601-
_shutdownCause = sse;
597+
if (!setShutdownCauseIfOpen(sse)) {
598+
if (initiatedByApplication)
599+
throw new AlreadyClosedException("Attempt to use closed connection", this);
602600
}
603601

604602
// stop any heartbeating

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Collections;
4343
import java.util.HashMap;
4444
import java.util.Map;
45-
import java.util.concurrent.atomic.AtomicLong;
4645
import java.util.concurrent.TimeoutException;
4746

4847

@@ -231,27 +230,16 @@ public void releaseChannelNumber() {
231230
// incoming commands except for a close and close-ok.
232231

233232
Method method = command.getMethod();
233+
// we deal with channel.close in the same way, regardless
234+
if (method instanceof Channel.Close) {
235+
asyncShutdown(command);
236+
return true;
237+
}
234238

235239
if (isOpen()) {
236240
// We're in normal running mode.
237241

238-
if (method instanceof Channel.Close) {
239-
releaseChannelNumber();
240-
ShutdownSignalException signal = new ShutdownSignalException(false,
241-
false,
242-
command,
243-
this);
244-
synchronized (_channelMutex) {
245-
try {
246-
processShutdownSignal(signal, true, false);
247-
quiescingTransmit(new Channel.CloseOk());
248-
} finally {
249-
notifyOutstandingRpc(signal);
250-
}
251-
}
252-
notifyListeners();
253-
return true;
254-
} else if (method instanceof Basic.Deliver) {
242+
if (method instanceof Basic.Deliver) {
255243
Basic.Deliver m = (Basic.Deliver) method;
256244

257245
Consumer callback = _consumers.get(m.consumerTag);
@@ -355,15 +343,9 @@ public void releaseChannelNumber() {
355343
return false;
356344
}
357345
} else {
358-
// We're in quiescing mode.
346+
// We're in quiescing mode == !isOpen()
359347

360-
if (method instanceof Channel.Close) {
361-
// We're already shutting down, so just send back an ok.
362-
synchronized (_channelMutex) {
363-
quiescingTransmit(new Channel.CloseOk());
364-
}
365-
return true;
366-
} else if (method instanceof Channel.CloseOk) {
348+
if (method instanceof Channel.CloseOk) {
367349
// We're quiescing, and we see a channel.close-ok:
368350
// this is our signal to leave quiescing mode and
369351
// finally shut down for good. Let it be handled as an
@@ -378,6 +360,23 @@ public void releaseChannelNumber() {
378360
}
379361
}
380362

363+
private void asyncShutdown(Command command) throws IOException {
364+
releaseChannelNumber();
365+
ShutdownSignalException signal = new ShutdownSignalException(false,
366+
false,
367+
command,
368+
this);
369+
synchronized (_channelMutex) {
370+
try {
371+
processShutdownSignal(signal, true, false);
372+
quiescingTransmit(new Channel.CloseOk());
373+
} finally {
374+
notifyOutstandingRpc(signal);
375+
}
376+
}
377+
notifyListeners();
378+
}
379+
381380
/** Public API - {@inheritDoc} */
382381
public void close()
383382
throws IOException
@@ -419,7 +418,7 @@ public void close(int closeCode,
419418
throws IOException
420419
{
421420
// First, notify all our dependents that we are shutting down.
422-
// This clears _isOpen, so no further work from the
421+
// This clears isOpen(), so no further work from the
423422
// application side will be accepted, and any inbound commands
424423
// will be discarded (unless they're channel.close-oks).
425424
Channel.Close reason = new Channel.Close(closeCode, closeMessage, 0, 0);
@@ -442,8 +441,8 @@ public void close(int closeCode,
442441
}
443442

444443
// Now that we're in quiescing state, channel.close was sent and
445-
// we wait for the reply. We ignore the result. (It's always
446-
// close-ok.)
444+
// we wait for the reply. We ignore the result.
445+
// (It's NOT always close-ok.)
447446
notify = true;
448447
k.getReply(-1);
449448
} catch (TimeoutException ise) {
@@ -891,4 +890,5 @@ public Channel.FlowOk getFlow() {
891890
public long getNextPublishSeqNo() {
892891
return nextPublishSeqNo;
893892
}
894-
}
893+
894+
}

src/com/rabbitmq/client/impl/Frame.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,12 @@ public DataOutputStream getOutputStream() {
241241

242242
@Override public String toString() {
243243
StringBuffer sb = new StringBuffer();
244-
sb.append("Frame(" + type + ", " + channel + ", ");
244+
sb.append("Frame(type=").append(type).append(", channel=").append(channel).append(", ");
245245
if (accumulator == null) {
246-
sb.append(payload.length + " bytes of payload");
246+
sb.append(payload.length).append(" bytes of payload)");
247247
} else {
248-
sb.append(accumulator.size() + " bytes of accumulator");
248+
sb.append(accumulator.size()).append(" bytes of accumulator)");
249249
}
250-
sb.append(")");
251250
return sb.toString();
252251
}
253252

@@ -325,7 +324,7 @@ else if(value instanceof byte[]) {
325324
acc += 4 + ((byte[])value).length;
326325
}
327326
else if(value instanceof List) {
328-
acc += 4 + arraySize((List)value);
327+
acc += 4 + arraySize((List<?>)value);
329328
}
330329
else if(value == null) {
331330
}
@@ -336,7 +335,7 @@ else if(value == null) {
336335
}
337336

338337
/** Computes the AMQP wire-protocol length of an encoded field-array */
339-
public static long arraySize(List values)
338+
public static long arraySize(List<?> values)
340339
throws UnsupportedEncodingException
341340
{
342341
long acc = 0;

src/com/rabbitmq/client/impl/ShutdownNotifierComponent.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,53 +26,71 @@
2626

2727
public class ShutdownNotifierComponent implements ShutdownNotifier {
2828

29+
/** Monitor for listeners and shutdownCause */
30+
private final Object monitor = new Object();
31+
2932
/** List of all shutdown listeners associated with the component */
30-
public List<ShutdownListener> listeners
31-
= new ArrayList<ShutdownListener>();
33+
private final List<ShutdownListener> shutdownListeners
34+
= new ArrayList<ShutdownListener>();
3235

3336
/**
3437
* When this value is null, the component is in an "open"
3538
* state. When non-null, the component is in "closed" state, and
3639
* this value indicates the circumstances of the shutdown.
3740
*/
38-
public volatile ShutdownSignalException _shutdownCause = null;
41+
private volatile ShutdownSignalException shutdownCause = null;
3942

4043
public void addShutdownListener(ShutdownListener listener)
4144
{
42-
boolean closed = false;
43-
synchronized(listeners) {
44-
closed = !isOpen();
45-
listeners.add(listener);
45+
ShutdownSignalException sse = null;
46+
synchronized(this.monitor) {
47+
sse = this.shutdownCause;
48+
this.shutdownListeners.add(listener);
4649
}
47-
if (closed)
48-
listener.shutdownCompleted(getCloseReason());
50+
if (sse != null) // closed
51+
listener.shutdownCompleted(sse);
4952
}
5053

5154
public ShutdownSignalException getCloseReason() {
52-
return _shutdownCause;
55+
return this.shutdownCause;
5356
}
5457

5558
public void notifyListeners()
5659
{
57-
synchronized(listeners) {
58-
for (ShutdownListener l: listeners)
59-
try {
60-
l.shutdownCompleted(getCloseReason());
61-
} catch (Exception e) {
62-
// FIXME: proper logging
63-
}
60+
ShutdownSignalException sse = null;
61+
ShutdownListener[] sdls = null;
62+
synchronized(this.monitor) {
63+
sdls = this.shutdownListeners
64+
.toArray(new ShutdownListener[this.shutdownListeners.size()]);
65+
sse = this.shutdownCause;
66+
}
67+
for (ShutdownListener l: sdls) {
68+
try {
69+
l.shutdownCompleted(sse);
70+
} catch (Exception e) {
71+
// FIXME: proper logging
72+
}
6473
}
6574
}
6675

6776
public void removeShutdownListener(ShutdownListener listener)
6877
{
69-
synchronized(listeners) {
70-
listeners.remove(listener);
78+
synchronized(this.monitor) {
79+
this.shutdownListeners.remove(listener);
7180
}
7281
}
7382

7483
public boolean isOpen() {
75-
return _shutdownCause == null;
84+
return this.shutdownCause == null;
7685
}
7786

87+
public boolean setShutdownCauseIfOpen(ShutdownSignalException sse) {
88+
synchronized (this.monitor) {
89+
if (isOpen()) {
90+
this.shutdownCause = sse;
91+
return true;
92+
}
93+
return false;
94+
}
95+
}
7896
}

src/com/rabbitmq/utility/BlockingCell.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public synchronized T get() throws InterruptedException {
5151
}
5252
return _value;
5353
}
54-
54+
5555
/**
5656
* Wait for a value, and when one arrives, return it (without clearing it). If there's
5757
* already a value present, there's no need to wait - the existing value is returned.
58-
* If timeout is reached and value hasn't arrived, TimeoutException is thrown
58+
* If timeout is reached and value hasn't arrived, TimeoutException is thrown.
5959
*
60-
* @param timeout timeout in miliseconds. -1 effectively means infinity
60+
* @param timeout timeout in milliseconds. -1 effectively means infinity
6161
* @return the waited-for value
6262
* @throws InterruptedException if this thread is interrupted
6363
*/
@@ -98,8 +98,8 @@ public synchronized T uninterruptibleGet() {
9898
* a value appears or until specified timeout is reached. If timeout is reached,
9999
* TimeoutException it thrown.
100100
* We also use System.nanoTime() to behave correctly when system clock jumps around.
101-
*
102-
* @param timeout timeout in miliseconds. -1 effectively means infinity
101+
*
102+
* @param timeout timeout in milliseconds. -1 effectively means infinity
103103
* @return the waited-for value
104104
*/
105105
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
@@ -127,7 +127,7 @@ public synchronized void set(T newValue) {
127127
}
128128
_value = newValue;
129129
_filled = true;
130-
notify();
130+
notifyAll();
131131
}
132132

133133
/**
@@ -140,7 +140,6 @@ public synchronized boolean setIfUnset(T newValue) {
140140
return false;
141141
}
142142
set(newValue);
143-
_filled = true;
144143
return true;
145144
}
146145
}

0 commit comments

Comments
 (0)