Skip to content

Commit add7bbc

Browse files
author
Steve Powell
committed
Merge default
2 parents 0b7bba7 + ea89b31 commit add7bbc

File tree

6 files changed

+85
-69
lines changed

6 files changed

+85
-69
lines changed

src/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ public void processShutdownSignal(ShutdownSignalException signal,
245245
boolean notifyRpc) {
246246
try {
247247
synchronized (_channelMutex) {
248-
if (!ignoreClosed)
249-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
250-
if (isOpen())
251-
_shutdownCause = signal;
248+
if (!setShutdownCauseIfOpen(signal)) {
249+
if (!ignoreClosed)
250+
throw new AlreadyClosedException("Attempt to use closed channel", this);
251+
}
252252

253253
_channelMutex.notifyAll();
254254
}

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ public void start()
317317
heartbeat));
318318
// 0.9.1: insist [on not being redirected] is deprecated, but
319319
// still in generated code; just pass a dummy value here
320-
Method res = _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321-
"",
322-
false)).getMethod();
320+
_channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
321+
"",
322+
false)).getMethod();
323323
return;
324324
}
325325

@@ -594,11 +594,9 @@ public ShutdownSignalException shutdown(Object reason,
594594
ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
595595
reason, this);
596596
sse.initCause(cause);
597-
synchronized (this) {
598-
if (initiatedByApplication)
599-
ensureIsOpen(); // invariant: we should never be shut down more than once per instance
600-
if (isOpen())
601-
_shutdownCause = sse;
597+
if (!setShutdownCauseIfOpen(sse)) {
598+
if (initiatedByApplication)
599+
throw new AlreadyClosedException("Attempt to use closed connection", this);
602600
}
603601

604602
// stop any heartbeating

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Collections;
4343
import java.util.HashMap;
4444
import java.util.Map;
45-
import java.util.concurrent.atomic.AtomicLong;
4645
import java.util.concurrent.TimeoutException;
4746

4847

@@ -236,20 +235,7 @@ public void releaseChannelNumber() {
236235
// We're in normal running mode.
237236

238237
if (method instanceof Channel.Close) {
239-
releaseChannelNumber();
240-
ShutdownSignalException signal = new ShutdownSignalException(false,
241-
false,
242-
command,
243-
this);
244-
synchronized (_channelMutex) {
245-
try {
246-
processShutdownSignal(signal, true, false);
247-
quiescingTransmit(new Channel.CloseOk());
248-
} finally {
249-
notifyOutstandingRpc(signal);
250-
}
251-
}
252-
notifyListeners();
238+
asyncShutdown(command);
253239
return true;
254240
} else if (method instanceof Basic.Deliver) {
255241
Basic.Deliver m = (Basic.Deliver) method;
@@ -355,13 +341,11 @@ public void releaseChannelNumber() {
355341
return false;
356342
}
357343
} else {
358-
// We're in quiescing mode.
344+
// We're in quiescing mode == !isOpen()
359345

360346
if (method instanceof Channel.Close) {
361-
// We're already shutting down, so just send back an ok.
362-
synchronized (_channelMutex) {
363-
quiescingTransmit(new Channel.CloseOk());
364-
}
347+
// We are already shutting down, but we cannot assume no Rpc is waiting.
348+
asyncShutdown(command);
365349
return true;
366350
} else if (method instanceof Channel.CloseOk) {
367351
// We're quiescing, and we see a channel.close-ok:
@@ -378,6 +362,23 @@ public void releaseChannelNumber() {
378362
}
379363
}
380364

365+
private void asyncShutdown(Command command) throws IOException {
366+
releaseChannelNumber();
367+
ShutdownSignalException signal = new ShutdownSignalException(false,
368+
false,
369+
command,
370+
this);
371+
synchronized (_channelMutex) {
372+
try {
373+
processShutdownSignal(signal, true, false);
374+
quiescingTransmit(new Channel.CloseOk());
375+
} finally {
376+
notifyOutstandingRpc(signal);
377+
}
378+
}
379+
notifyListeners();
380+
}
381+
381382
/** Public API - {@inheritDoc} */
382383
public void close()
383384
throws IOException
@@ -419,7 +420,7 @@ public void close(int closeCode,
419420
throws IOException
420421
{
421422
// First, notify all our dependents that we are shutting down.
422-
// This clears _isOpen, so no further work from the
423+
// This clears isOpen(), so no further work from the
423424
// application side will be accepted, and any inbound commands
424425
// will be discarded (unless they're channel.close-oks).
425426
Channel.Close reason = new Channel.Close(closeCode, closeMessage, 0, 0);
@@ -442,8 +443,8 @@ public void close(int closeCode,
442443
}
443444

444445
// Now that we're in quiescing state, channel.close was sent and
445-
// we wait for the reply. We ignore the result. (It's always
446-
// close-ok.)
446+
// we wait for the reply. We ignore the result.
447+
// (It's NOT always close-ok.)
447448
notify = true;
448449
k.getReply(-1);
449450
} catch (TimeoutException ise) {
@@ -891,4 +892,5 @@ public Channel.FlowOk getFlow() {
891892
public long getNextPublishSeqNo() {
892893
return nextPublishSeqNo;
893894
}
894-
}
895+
896+
}

src/com/rabbitmq/client/impl/Frame.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,12 @@ public DataOutputStream getOutputStream() {
241241

242242
@Override public String toString() {
243243
StringBuffer sb = new StringBuffer();
244-
sb.append("Frame(" + type + ", " + channel + ", ");
244+
sb.append("Frame(type=").append(type).append(", channel=").append(channel).append(", ");
245245
if (accumulator == null) {
246-
sb.append(payload.length + " bytes of payload");
246+
sb.append(payload.length).append(" bytes of payload)");
247247
} else {
248-
sb.append(accumulator.size() + " bytes of accumulator");
248+
sb.append(accumulator.size()).append(" bytes of accumulator)");
249249
}
250-
sb.append(")");
251250
return sb.toString();
252251
}
253252

@@ -325,7 +324,7 @@ else if(value instanceof byte[]) {
325324
acc += 4 + ((byte[])value).length;
326325
}
327326
else if(value instanceof List) {
328-
acc += 4 + arraySize((List)value);
327+
acc += 4 + arraySize((List<?>)value);
329328
}
330329
else if(value == null) {
331330
}
@@ -336,7 +335,7 @@ else if(value == null) {
336335
}
337336

338337
/** Computes the AMQP wire-protocol length of an encoded field-array */
339-
public static long arraySize(List values)
338+
public static long arraySize(List<?> values)
340339
throws UnsupportedEncodingException
341340
{
342341
long acc = 0;

src/com/rabbitmq/client/impl/ShutdownNotifierComponent.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,53 +26,71 @@
2626

2727
public class ShutdownNotifierComponent implements ShutdownNotifier {
2828

29+
/** Monitor for listeners and shutdownCause */
30+
private final Object monitor = new Object();
31+
2932
/** List of all shutdown listeners associated with the component */
30-
public List<ShutdownListener> listeners
31-
= new ArrayList<ShutdownListener>();
33+
private final List<ShutdownListener> shutdownListeners
34+
= new ArrayList<ShutdownListener>();
3235

3336
/**
3437
* When this value is null, the component is in an "open"
3538
* state. When non-null, the component is in "closed" state, and
3639
* this value indicates the circumstances of the shutdown.
3740
*/
38-
public volatile ShutdownSignalException _shutdownCause = null;
41+
private volatile ShutdownSignalException shutdownCause = null;
3942

4043
public void addShutdownListener(ShutdownListener listener)
4144
{
42-
boolean closed = false;
43-
synchronized(listeners) {
44-
closed = !isOpen();
45-
listeners.add(listener);
45+
ShutdownSignalException sse = null;
46+
synchronized(this.monitor) {
47+
sse = this.shutdownCause;
48+
this.shutdownListeners.add(listener);
4649
}
47-
if (closed)
48-
listener.shutdownCompleted(getCloseReason());
50+
if (sse != null) // closed
51+
listener.shutdownCompleted(sse);
4952
}
5053

5154
public ShutdownSignalException getCloseReason() {
52-
return _shutdownCause;
55+
return this.shutdownCause;
5356
}
5457

5558
public void notifyListeners()
5659
{
57-
synchronized(listeners) {
58-
for (ShutdownListener l: listeners)
59-
try {
60-
l.shutdownCompleted(getCloseReason());
61-
} catch (Exception e) {
62-
// FIXME: proper logging
63-
}
60+
ShutdownSignalException sse = null;
61+
ShutdownListener[] sdls = null;
62+
synchronized(this.monitor) {
63+
sdls = this.shutdownListeners
64+
.toArray(new ShutdownListener[this.shutdownListeners.size()]);
65+
sse = this.shutdownCause;
66+
}
67+
for (ShutdownListener l: sdls) {
68+
try {
69+
l.shutdownCompleted(sse);
70+
} catch (Exception e) {
71+
// FIXME: proper logging
72+
}
6473
}
6574
}
6675

6776
public void removeShutdownListener(ShutdownListener listener)
6877
{
69-
synchronized(listeners) {
70-
listeners.remove(listener);
78+
synchronized(this.monitor) {
79+
this.shutdownListeners.remove(listener);
7180
}
7281
}
7382

7483
public boolean isOpen() {
75-
return _shutdownCause == null;
84+
return this.shutdownCause == null;
7685
}
7786

87+
public boolean setShutdownCauseIfOpen(ShutdownSignalException sse) {
88+
synchronized (this.monitor) {
89+
if (isOpen()) {
90+
this.shutdownCause = sse;
91+
return true;
92+
}
93+
return false;
94+
}
95+
}
7896
}

src/com/rabbitmq/utility/BlockingCell.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public synchronized T get() throws InterruptedException {
5151
}
5252
return _value;
5353
}
54-
54+
5555
/**
5656
* Wait for a value, and when one arrives, return it (without clearing it). If there's
5757
* already a value present, there's no need to wait - the existing value is returned.
58-
* If timeout is reached and value hasn't arrived, TimeoutException is thrown
58+
* If timeout is reached and value hasn't arrived, TimeoutException is thrown.
5959
*
60-
* @param timeout timeout in miliseconds. -1 effectively means infinity
60+
* @param timeout timeout in milliseconds. -1 effectively means infinity
6161
* @return the waited-for value
6262
* @throws InterruptedException if this thread is interrupted
6363
*/
@@ -98,8 +98,8 @@ public synchronized T uninterruptibleGet() {
9898
* a value appears or until specified timeout is reached. If timeout is reached,
9999
* TimeoutException it thrown.
100100
* We also use System.nanoTime() to behave correctly when system clock jumps around.
101-
*
102-
* @param timeout timeout in miliseconds. -1 effectively means infinity
101+
*
102+
* @param timeout timeout in milliseconds. -1 effectively means infinity
103103
* @return the waited-for value
104104
*/
105105
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
@@ -127,7 +127,7 @@ public synchronized void set(T newValue) {
127127
}
128128
_value = newValue;
129129
_filled = true;
130-
notify();
130+
notifyAll();
131131
}
132132

133133
/**
@@ -140,7 +140,6 @@ public synchronized boolean setIfUnset(T newValue) {
140140
return false;
141141
}
142142
set(newValue);
143-
_filled = true;
144143
return true;
145144
}
146145
}

0 commit comments

Comments
 (0)