Skip to content

Commit a7a25e4

Browse files
committed
Use internal thread factory
1 parent 72db70c commit a7a25e4

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/ScheduledExecutorServiceWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import io.micrometer.core.instrument.util.NamedThreadFactory;
16+
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
1717
import java.time.Duration;
1818
import java.util.ArrayList;
1919
import java.util.Collection;

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ class StreamEnvironment implements Environment {
212212
executorService = scheduledExecutorService;
213213
this.privateScheduleExecutorService = false;
214214
}
215-
// TODO remove executor wrapper (it's here just for debugging)
216-
this.scheduledExecutorService = new ScheduledExecutorServiceWrapper(executorService);
215+
this.scheduledExecutorService = executorService;
217216

218217
this.producersCoordinator =
219218
new ProducersCoordinator(

src/main/java/com/rabbitmq/stream/impl/Utils.java

+27
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Map;
3535
import java.util.concurrent.ConcurrentHashMap;
3636
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.ThreadFactory;
3739
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicLong;
3941
import java.util.function.Consumer;
@@ -482,4 +484,29 @@ static String jsonField(String name, Number value) {
482484
static String jsonField(String name, String value) {
483485
return quote(name) + " : " + quote(value);
484486
}
487+
488+
static class NamedThreadFactory implements ThreadFactory {
489+
490+
private final ThreadFactory backingThreaFactory;
491+
492+
private final String prefix;
493+
494+
private final AtomicLong count = new AtomicLong(0);
495+
496+
public NamedThreadFactory(String prefix) {
497+
this(Executors.defaultThreadFactory(), prefix);
498+
}
499+
500+
public NamedThreadFactory(ThreadFactory backingThreadFactory, String prefix) {
501+
this.backingThreaFactory = backingThreadFactory;
502+
this.prefix = prefix;
503+
}
504+
505+
@Override
506+
public Thread newThread(Runnable r) {
507+
Thread thread = this.backingThreaFactory.newThread(r);
508+
thread.setName(prefix + count.getAndIncrement());
509+
return thread;
510+
}
511+
}
485512
}

0 commit comments

Comments
 (0)