Skip to content

Commit b8e6d81

Browse files
committed
Check null for exception cause
References #630
1 parent 55b8c57 commit b8e6d81

File tree

2 files changed

+155
-29
lines changed

2 files changed

+155
-29
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import java.lang.reflect.InvocationTargetException;
18+
import java.lang.reflect.Method;
19+
import java.util.Arrays;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ThreadFactory;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.function.Function;
25+
import java.util.function.Predicate;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
final class ThreadUtils {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadUtils.class);
32+
33+
private static final ThreadFactory THREAD_FACTORY;
34+
private static final Function<String, ExecutorService> EXECUTOR_SERVICE_FACTORY;
35+
private static final Predicate<Thread> IS_VIRTUAL;
36+
37+
static {
38+
if (isJava21OrMore()) {
39+
LOGGER.debug("Running Java 21 or more, using virtual threads");
40+
Class<?> builderClass =
41+
Arrays.stream(Thread.class.getDeclaredClasses())
42+
.filter(c -> "Builder".equals(c.getSimpleName()))
43+
.findFirst()
44+
.get();
45+
// Reflection code is the same as:
46+
// Thread.ofVirtual().factory();
47+
try {
48+
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
49+
THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
50+
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
51+
throw new RuntimeException(e);
52+
}
53+
EXECUTOR_SERVICE_FACTORY =
54+
prefix -> {
55+
try {
56+
// Reflection code is the same as the 2 following lines:
57+
// ThreadFactory factory = Thread.ofVirtual().name(prefix, 0).factory();
58+
// Executors.newThreadPerTaskExecutor(factory);
59+
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
60+
if (prefix != null) {
61+
builder =
62+
builderClass
63+
.getDeclaredMethod("name", String.class, Long.TYPE)
64+
.invoke(builder, prefix, 0L);
65+
}
66+
ThreadFactory factory =
67+
(ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
68+
return (ExecutorService)
69+
Executors.class
70+
.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
71+
.invoke(null, factory);
72+
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
73+
throw new RuntimeException(e);
74+
}
75+
};
76+
IS_VIRTUAL =
77+
thread -> {
78+
Method method = null;
79+
try {
80+
method = Thread.class.getDeclaredMethod("isVirtual");
81+
return (boolean) method.invoke(thread);
82+
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
83+
LOGGER.info("Error while checking if a thread is virtual: {}", e.getMessage());
84+
return false;
85+
}
86+
};
87+
} else {
88+
THREAD_FACTORY = Executors.defaultThreadFactory();
89+
EXECUTOR_SERVICE_FACTORY = prefix -> Executors.newCachedThreadPool(threadFactory(prefix));
90+
IS_VIRTUAL = ignored -> false;
91+
}
92+
}
93+
94+
private ThreadUtils() {}
95+
96+
static ThreadFactory threadFactory(String prefix) {
97+
if (prefix == null) {
98+
return Executors.defaultThreadFactory();
99+
} else {
100+
return new NamedThreadFactory(prefix);
101+
}
102+
}
103+
104+
static ThreadFactory internalThreadFactory(String prefix) {
105+
return new NamedThreadFactory(THREAD_FACTORY, prefix);
106+
}
107+
108+
static boolean isVirtual(Thread thread) {
109+
return IS_VIRTUAL.test(thread);
110+
}
111+
112+
private static boolean isJava21OrMore() {
113+
return Runtime.version().compareTo(Runtime.Version.parse("21")) >= 0;
114+
}
115+
116+
private static class NamedThreadFactory implements ThreadFactory {
117+
118+
private final ThreadFactory backingThreadFactory;
119+
120+
private final String prefix;
121+
122+
private final AtomicLong count = new AtomicLong(0);
123+
124+
private NamedThreadFactory(String prefix) {
125+
this(Executors.defaultThreadFactory(), prefix);
126+
}
127+
128+
private NamedThreadFactory(ThreadFactory backingThreadFactory, String prefix) {
129+
this.backingThreadFactory = backingThreadFactory;
130+
this.prefix = prefix;
131+
}
132+
133+
@Override
134+
public Thread newThread(Runnable r) {
135+
Thread thread = this.backingThreadFactory.newThread(r);
136+
thread.setName(prefix + count.getAndIncrement());
137+
return thread;
138+
}
139+
}
140+
}

src/main/java/com/rabbitmq/stream/impl/Utils.java

+15-29
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.CopyOnWriteArrayList;
3030
import java.util.concurrent.ExecutorService;
31-
import java.util.concurrent.Executors;
3231
import java.util.concurrent.Future;
33-
import java.util.concurrent.ThreadFactory;
3432
import java.util.concurrent.TimeUnit;
3533
import java.util.concurrent.atomic.AtomicBoolean;
3634
import java.util.concurrent.atomic.AtomicLong;
@@ -160,8 +158,12 @@ static ClientFactory coordinatorClientFactory(
160158
return Utils.connectToAdvertisedNodeClientFactory(delegate, retryInterval)
161159
.client(clientFactoryContext);
162160
} catch (TimeoutStreamException e) {
163-
throw new TimeoutStreamException(
164-
format(messageFormat, e.getMessage(), e.getCause().getMessage(), e.getCause()));
161+
if (e.getCause() == null) {
162+
throw new TimeoutStreamException(format(messageFormat, e.getMessage(), "No root cause"));
163+
} else {
164+
throw new TimeoutStreamException(
165+
format(messageFormat, e.getMessage(), e.getCause().getMessage()), e.getCause());
166+
}
165167
} catch (StreamException e) {
166168
if (e.getCause() != null
167169
&& (e.getCause() instanceof UnknownHostException
@@ -552,31 +554,6 @@ static String jsonField(String name, String value) {
552554
return quote(name) + " : " + quote(value);
553555
}
554556

555-
static class NamedThreadFactory implements ThreadFactory {
556-
557-
private final ThreadFactory backingThreaFactory;
558-
559-
private final String prefix;
560-
561-
private final AtomicLong count = new AtomicLong(0);
562-
563-
public NamedThreadFactory(String prefix) {
564-
this(Executors.defaultThreadFactory(), prefix);
565-
}
566-
567-
public NamedThreadFactory(ThreadFactory backingThreadFactory, String prefix) {
568-
this.backingThreaFactory = backingThreadFactory;
569-
this.prefix = prefix;
570-
}
571-
572-
@Override
573-
public Thread newThread(Runnable r) {
574-
Thread thread = this.backingThreaFactory.newThread(r);
575-
thread.setName(prefix + count.getAndIncrement());
576-
return thread;
577-
}
578-
}
579-
580557
static final ExecutorServiceFactory NO_OP_EXECUTOR_SERVICE_FACTORY =
581558
new NoOpExecutorServiceFactory();
582559

@@ -678,6 +655,15 @@ boolean get() {
678655
}
679656
}
680657

658+
static void lock(Lock lock, Runnable action) {
659+
lock(
660+
lock,
661+
() -> {
662+
action.run();
663+
return null;
664+
});
665+
}
666+
681667
static <T> T lock(Lock lock, Supplier<T> action) {
682668
lock.lock();
683669
try {

0 commit comments

Comments
 (0)