1
+ package dotty .tools .dotc .profile
2
+
3
+ import java .util .concurrent .ThreadPoolExecutor .AbortPolicy
4
+ import java .util .concurrent ._
5
+ import java .util .concurrent .atomic .AtomicInteger
6
+
7
+ import dotty .tools .dotc .core .Phases .Phase
8
+ import dotty .tools .dotc .core .Contexts .Context
9
+
10
+ sealed trait AsyncHelper {
11
+
12
+ def newUnboundedQueueFixedThreadPool
13
+ (nThreads : Int ,
14
+ shortId : String , priority : Int = Thread .NORM_PRIORITY ) : ThreadPoolExecutor
15
+ def newBoundedQueueFixedThreadPool
16
+ (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler ,
17
+ shortId : String , priority : Int = Thread .NORM_PRIORITY ) : ThreadPoolExecutor
18
+
19
+ }
20
+
21
+ object AsyncHelper {
22
+ def apply (phase : Phase )(implicit ctx : Context ): AsyncHelper = ctx.profiler match {
23
+ case NoOpProfiler => new BasicAsyncHelper (phase)
24
+ case r : RealProfiler => new ProfilingAsyncHelper (phase, r)
25
+ }
26
+
27
+ private abstract class BaseAsyncHelper (phase : Phase )(implicit ctx : Context ) extends AsyncHelper {
28
+ val baseGroup = new ThreadGroup (s " scalac- ${phase.phaseName}" )
29
+ private def childGroup (name : String ) = new ThreadGroup (baseGroup, name)
30
+
31
+ protected def wrapRunnable (r : Runnable ): Runnable
32
+
33
+ protected class CommonThreadFactory (shortId : String ,
34
+ daemon : Boolean = true ,
35
+ priority : Int ) extends ThreadFactory {
36
+ private val group : ThreadGroup = childGroup(shortId)
37
+ private val threadNumber : AtomicInteger = new AtomicInteger (1 )
38
+ private val namePrefix = s " ${baseGroup.getName}- $shortId- "
39
+
40
+ override def newThread (r : Runnable ): Thread = {
41
+ val wrapped = wrapRunnable(r)
42
+ val t : Thread = new Thread (group, wrapped, namePrefix + threadNumber.getAndIncrement, 0 )
43
+ if (t.isDaemon != daemon) t.setDaemon(daemon)
44
+ if (t.getPriority != priority) t.setPriority(priority)
45
+ t
46
+ }
47
+ }
48
+ }
49
+
50
+ private final class BasicAsyncHelper (phase : Phase )(implicit ctx : Context ) extends BaseAsyncHelper (phase) {
51
+
52
+ override def newUnboundedQueueFixedThreadPool (nThreads : Int , shortId : String , priority : Int ): ThreadPoolExecutor = {
53
+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
54
+ // like Executors.newFixedThreadPool
55
+ new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue [Runnable ], threadFactory)
56
+ }
57
+
58
+ override def newBoundedQueueFixedThreadPool (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler , shortId : String , priority : Int ): ThreadPoolExecutor = {
59
+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
60
+ // like Executors.newFixedThreadPool
61
+ new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new ArrayBlockingQueue [Runnable ](maxQueueSize), threadFactory, rejectHandler)
62
+ }
63
+
64
+ override protected def wrapRunnable (r : Runnable ): Runnable = r
65
+ }
66
+
67
+ private class ProfilingAsyncHelper (phase : Phase , private val profiler : RealProfiler )(implicit ctx : Context ) extends BaseAsyncHelper (phase) {
68
+
69
+ override def newUnboundedQueueFixedThreadPool (nThreads : Int , shortId : String , priority : Int ): ThreadPoolExecutor = {
70
+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
71
+ // like Executors.newFixedThreadPool
72
+ new SinglePhaseInstrumentedThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue [Runnable ], threadFactory, new AbortPolicy )
73
+ }
74
+
75
+ override def newBoundedQueueFixedThreadPool (nThreads : Int , maxQueueSize : Int , rejectHandler : RejectedExecutionHandler , shortId : String , priority : Int ): ThreadPoolExecutor = {
76
+ val threadFactory = new CommonThreadFactory (shortId, priority = priority)
77
+ // like Executors.newFixedThreadPool
78
+ new SinglePhaseInstrumentedThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit .MILLISECONDS , new ArrayBlockingQueue [Runnable ](maxQueueSize), threadFactory, rejectHandler)
79
+ }
80
+
81
+ override protected def wrapRunnable (r : Runnable ): Runnable = () => {
82
+ val data = new ThreadProfileData
83
+ localData.set(data)
84
+
85
+ val profileStart = Profiler .emptySnap
86
+ try r.run finally {
87
+ val snap = profiler.snapThread()
88
+ val threadRange = ProfileRange (profileStart, snap, phase, 0 , " " , Thread .currentThread())
89
+ profiler.completeBackground(threadRange)
90
+ }
91
+ }
92
+
93
+ /**
94
+ * data for thread run. Not threadsafe, only written from a single thread
95
+ */
96
+ final class ThreadProfileData {
97
+ var firstStartNs = 0L
98
+ var taskCount = 0
99
+
100
+ var idleNs = 0L
101
+ var runningNs = 0L
102
+
103
+ var lastStartNs = 0L
104
+ var lastEndNs = 0L
105
+ }
106
+
107
+ val localData = new ThreadLocal [ThreadProfileData ]
108
+
109
+ private class SinglePhaseInstrumentedThreadPoolExecutor
110
+ ( corePoolSize : Int , maximumPoolSize : Int , keepAliveTime : Long , unit : TimeUnit ,
111
+ workQueue : BlockingQueue [Runnable ], threadFactory : ThreadFactory , handler : RejectedExecutionHandler
112
+ ) extends ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler) {
113
+
114
+ override def beforeExecute (t : Thread , r : Runnable ): Unit = {
115
+ val data = localData.get
116
+ data.taskCount += 1
117
+ val now = System .nanoTime()
118
+
119
+ if (data.firstStartNs == 0 ) data.firstStartNs = now
120
+ else data.idleNs += now - data.lastEndNs
121
+
122
+ data.lastStartNs = now
123
+
124
+ super .beforeExecute(t, r)
125
+ }
126
+
127
+ override def afterExecute (r : Runnable , t : Throwable ): Unit = {
128
+ val now = System .nanoTime()
129
+ val data = localData.get
130
+
131
+ data.lastEndNs = now
132
+ data.runningNs += now - data.lastStartNs
133
+
134
+ super .afterExecute(r, t)
135
+ }
136
+
137
+ }
138
+ }
139
+ }
0 commit comments