Skip to content

Commit 06346ed

Browse files
Merge pull request #1119 from rjbaucells/virtual-threads
Prevent pinned CarrierThreads on JDK-21 while using Virtual Threads
2 parents fb64c5f + 8e855a3 commit 06346ed

File tree

4 files changed

+111
-30
lines changed

4 files changed

+111
-30
lines changed

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

+60-18
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.io.IOException;
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.locks.Condition;
35+
import java.util.concurrent.locks.ReentrantLock;
3436
import java.util.function.Supplier;
3537

3638
/**
@@ -54,7 +56,8 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
5456
* so that clients can themselves use the channel to synchronize
5557
* on.
5658
*/
57-
protected final Object _channelMutex = new Object();
59+
protected final ReentrantLock _channelLock = new ReentrantLock();
60+
protected final Condition _channelLockCondition = _channelLock.newCondition();
5861

5962
/** The connection this channel is associated with. */
6063
private final AMQConnection _connection;
@@ -191,14 +194,17 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
191194
// so it must be a response to an earlier RPC.
192195

193196
if (_checkRpcResponseType) {
194-
synchronized (_channelMutex) {
197+
_channelLock.lock();
198+
try {
195199
// check if this reply command is intended for the current waiting request before calling nextOutstandingRpc()
196200
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
197201
// this reply command is not intended for the current waiting request
198202
// most likely a previous request timed out and this command is the reply for that.
199203
// Throw this reply command away so we don't stop the current request from waiting for its reply
200204
return;
201205
}
206+
} finally {
207+
_channelLock.unlock();
202208
}
203209
}
204210
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
@@ -220,11 +226,12 @@ public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
220226
}
221227

222228
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
223-
synchronized (_channelMutex) {
229+
_channelLock.lock();
230+
try {
224231
boolean waitClearedInterruptStatus = false;
225232
while (_activeRpc != null) {
226233
try {
227-
_channelMutex.wait();
234+
_channelLockCondition.await();
228235
} catch (InterruptedException e) { //NOSONAR
229236
waitClearedInterruptStatus = true;
230237
// No Sonar: we re-interrupt the thread later
@@ -234,23 +241,31 @@ private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
234241
Thread.currentThread().interrupt();
235242
}
236243
_activeRpc = rpcWrapperSupplier.get();
244+
} finally {
245+
_channelLock.unlock();
237246
}
238247
}
239248

240249
public boolean isOutstandingRpc()
241250
{
242-
synchronized (_channelMutex) {
251+
_channelLock.lock();
252+
try {
243253
return (_activeRpc != null);
254+
} finally {
255+
_channelLock.unlock();
244256
}
245257
}
246258

247259
public RpcWrapper nextOutstandingRpc()
248260
{
249-
synchronized (_channelMutex) {
261+
_channelLock.lock();
262+
try {
250263
RpcWrapper result = _activeRpc;
251264
_activeRpc = null;
252-
_channelMutex.notifyAll();
265+
_channelLockCondition.signalAll();
253266
return result;
267+
} finally {
268+
_channelLock.unlock();
254269
}
255270
}
256271

@@ -344,36 +359,48 @@ private AMQCommand privateRpc(Method m, int timeout)
344359
public void rpc(Method m, RpcContinuation k)
345360
throws IOException
346361
{
347-
synchronized (_channelMutex) {
362+
_channelLock.lock();
363+
try {
348364
ensureIsOpen();
349365
quiescingRpc(m, k);
366+
} finally {
367+
_channelLock.unlock();
350368
}
351369
}
352370

353371
public void quiescingRpc(Method m, RpcContinuation k)
354372
throws IOException
355373
{
356-
synchronized (_channelMutex) {
374+
_channelLock.lock();
375+
try {
357376
enqueueRpc(k);
358377
quiescingTransmit(m);
378+
} finally {
379+
_channelLock.unlock();
359380
}
360381
}
361382

362383
public void asyncRpc(Method m, CompletableFuture<Command> future)
363384
throws IOException
364385
{
365-
synchronized (_channelMutex) {
386+
_channelLock.lock();
387+
try {
366388
ensureIsOpen();
367389
quiescingAsyncRpc(m, future);
390+
} finally {
391+
_channelLock.unlock();
368392
}
369393
}
370394

371395
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
372396
throws IOException
373397
{
374-
synchronized (_channelMutex) {
398+
_channelLock.lock();
399+
try {
375400
enqueueAsyncRpc(m, future);
376401
quiescingTransmit(m);
402+
} finally {
403+
_channelLock.unlock();
377404
}
378405
}
379406

@@ -402,13 +429,16 @@ public void processShutdownSignal(ShutdownSignalException signal,
402429
boolean ignoreClosed,
403430
boolean notifyRpc) {
404431
try {
405-
synchronized (_channelMutex) {
432+
_channelLock.lock();
433+
try {
406434
if (!setShutdownCauseIfOpen(signal)) {
407435
if (!ignoreClosed)
408436
throw new AlreadyClosedException(getCloseReason());
409437
}
410438

411-
_channelMutex.notifyAll();
439+
_channelLockCondition.signalAll();
440+
} finally {
441+
_channelLock.unlock();
412442
}
413443
} finally {
414444
if (notifyRpc)
@@ -424,30 +454,40 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
424454
}
425455

426456
public void transmit(Method m) throws IOException {
427-
synchronized (_channelMutex) {
457+
_channelLock.lock();
458+
try {
428459
transmit(new AMQCommand(m));
460+
} finally {
461+
_channelLock.unlock();
429462
}
430463
}
431464

432465
public void transmit(AMQCommand c) throws IOException {
433-
synchronized (_channelMutex) {
466+
_channelLock.lock();
467+
try {
434468
ensureIsOpen();
435469
quiescingTransmit(c);
470+
} finally {
471+
_channelLock.unlock();
436472
}
437473
}
438474

439475
public void quiescingTransmit(Method m) throws IOException {
440-
synchronized (_channelMutex) {
476+
_channelLock.lock();
477+
try {
441478
quiescingTransmit(new AMQCommand(m));
479+
} finally {
480+
_channelLock.unlock();
442481
}
443482
}
444483

445484
public void quiescingTransmit(AMQCommand c) throws IOException {
446-
synchronized (_channelMutex) {
485+
_channelLock.lock();
486+
try {
447487
if (c.getMethod().hasContent()) {
448488
while (_blockContent) {
449489
try {
450-
_channelMutex.wait();
490+
_channelLockCondition.await();
451491
} catch (InterruptedException ignored) {
452492
Thread.currentThread().interrupt();
453493
}
@@ -460,6 +500,8 @@ public void quiescingTransmit(AMQCommand c) throws IOException {
460500
}
461501
this._trafficListener.write(c);
462502
c.transmit(this);
503+
} finally {
504+
_channelLock.unlock();
463505
}
464506
}
465507

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.ByteArrayOutputStream;
1919
import java.io.DataOutputStream;
2020
import java.io.IOException;
21+
import java.util.concurrent.locks.ReentrantLock;
2122

2223
import com.rabbitmq.client.AMQP;
2324
import com.rabbitmq.client.Command;
@@ -43,6 +44,7 @@ public class AMQCommand implements Command {
4344

4445
/** The assembler for this command - synchronised on - contains all the state */
4546
private final CommandAssembler assembler;
47+
private final ReentrantLock assemblerLock = new ReentrantLock();
4648

4749
AMQCommand(int maxBodyLength) {
4850
this(null, null, null, maxBodyLength);
@@ -115,7 +117,8 @@ public void transmit(AMQChannel channel) throws IOException {
115117
int channelNumber = channel.getChannelNumber();
116118
AMQConnection connection = channel.getConnection();
117119

118-
synchronized (assembler) {
120+
assemblerLock.lock();
121+
try {
119122
Method m = this.assembler.getMethod();
120123
if (m.hasContent()) {
121124
byte[] body = this.assembler.getContentBody();
@@ -145,6 +148,8 @@ public void transmit(AMQChannel channel) throws IOException {
145148
} else {
146149
connection.writeFrame(m.toFrame(channelNumber));
147150
}
151+
} finally {
152+
assemblerLock.unlock();
148153
}
149154

150155
connection.flush();
@@ -155,7 +160,8 @@ public void transmit(AMQChannel channel) throws IOException {
155160
}
156161

157162
public String toString(boolean suppressBody){
158-
synchronized (assembler) {
163+
assemblerLock.lock();
164+
try {
159165
return new StringBuilder()
160166
.append('{')
161167
.append(this.assembler.getMethod())
@@ -165,6 +171,8 @@ public String toString(boolean suppressBody){
165171
.append(contentBodyStringBuilder(
166172
this.assembler.getContentBody(), suppressBody))
167173
.append('}').toString();
174+
} finally {
175+
assemblerLock.unlock();
168176
}
169177
}
170178

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,13 @@ private void releaseChannel() {
361361
return true;
362362
} else if (method instanceof Channel.Flow) {
363363
Channel.Flow channelFlow = (Channel.Flow) method;
364-
synchronized (_channelMutex) {
364+
_channelLock.lock();
365+
try {
365366
_blockContent = !channelFlow.getActive();
366367
transmit(new Channel.FlowOk(!_blockContent));
367-
_channelMutex.notifyAll();
368+
_channelLockCondition.signalAll();
369+
} finally {
370+
_channelLock.unlock();
368371
}
369372
return true;
370373
} else if (method instanceof Basic.Ack) {
@@ -524,7 +527,8 @@ private void asyncShutdown(Command command) throws IOException {
524527
false,
525528
command.getMethod(),
526529
this);
527-
synchronized (_channelMutex) {
530+
_channelLock.lock();
531+
try {
528532
try {
529533
processShutdownSignal(signal, true, false);
530534
quiescingTransmit(new Channel.CloseOk());
@@ -533,6 +537,9 @@ private void asyncShutdown(Command command) throws IOException {
533537
notifyOutstandingRpc(signal);
534538
}
535539
}
540+
finally {
541+
_channelLock.unlock();
542+
}
536543
notifyListeners();
537544
}
538545

@@ -612,9 +619,12 @@ public AMQCommand transformReply(AMQCommand command) {
612619
try {
613620
// Synchronize the block below to avoid race conditions in case
614621
// connection wants to send Connection-CloseOK
615-
synchronized (_channelMutex) {
622+
_channelLock.lock();
623+
try {
616624
startProcessShutdownSignal(signal, !initiatedByApplication, true);
617625
quiescingRpc(reason, k);
626+
} finally {
627+
_channelLock.unlock();
618628
}
619629

620630
// Now that we're in quiescing state, channel.close was sent and
@@ -1602,16 +1612,22 @@ public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOEx
16021612

16031613
@Override
16041614
public void enqueueRpc(RpcContinuation k) {
1605-
synchronized (_channelMutex) {
1615+
_channelLock.lock();
1616+
try {
16061617
super.enqueueRpc(k);
16071618
dispatcher.setUnlimited(true);
1619+
} finally {
1620+
_channelLock.unlock();
16081621
}
16091622
}
16101623

16111624
@Override
16121625
protected void markRpcFinished() {
1613-
synchronized (_channelMutex) {
1626+
_channelLock.lock();
1627+
try {
16141628
dispatcher.setUnlimited(false);
1629+
} finally {
1630+
_channelLock.unlock();
16151631
}
16161632
}
16171633

0 commit comments

Comments
 (0)