File tree 5 files changed +130
-0
lines changed
5 files changed +130
-0
lines changed Original file line number Diff line number Diff line change @@ -60,6 +60,7 @@ buildscript {
60
60
repositories {
61
61
jcenter()
62
62
maven { url " https://kotlin.bintray.com/kotlinx" }
63
+ maven { url " https://repo.spring.io/snapshot/" }
63
64
maven {
64
65
url " https://kotlin.bintray.com/kotlin-dev"
65
66
credentials {
@@ -153,6 +154,7 @@ allprojects {
153
154
jcenter()
154
155
maven {
155
156
url " https://kotlin.bintray.com/kotlin-dev"
157
+ url " https://repo.spring.io/snapshot/"
156
158
credentials {
157
159
username = project. hasProperty(' bintrayUser' ) ? project. property(' bintrayUser' ) : System . getenv(' BINTRAY_USER' ) ?: " "
158
160
password = project. hasProperty(' bintrayApiKey' ) ? project. property(' bintrayApiKey' ) : System . getenv(' BINTRAY_API_KEY' ) ?: " "
Original file line number Diff line number Diff line change @@ -22,6 +22,8 @@ dependencies {
22
22
compileOnly " junit:junit:$junit_version "
23
23
shadowDeps " net.bytebuddy:byte-buddy:$byte_buddy_version "
24
24
shadowDeps " net.bytebuddy:byte-buddy-agent:$byte_buddy_version "
25
+ compile ' io.projectreactor.tools:blockhound:1.0.1.BUILD-SNAPSHOT'
26
+ compile " org.jetbrains.kotlin:kotlin-reflect:$kotlin_version "
25
27
}
26
28
27
29
jar {
Original file line number Diff line number Diff line change
1
+ package kotlinx.coroutines.debug.internal
2
+
3
+ import reactor.blockhound.BlockHound
4
+ import kotlin.reflect.KClass
5
+ import kotlin.reflect.full.*
6
+
7
+ internal object BlockHoundIntegration {
8
+
9
+ init {
10
+ val cls = Class .forName(" kotlinx.coroutines.scheduling.CoroutineScheduler\$ Worker" ).kotlin
11
+ initializerHelper(cls)
12
+ }
13
+
14
+ private fun <T : Any > initializerHelper (cls : KClass <T >) {
15
+ val field = cls.declaredMemberProperties.find { it.name == " state" }!!
16
+ BlockHound .builder()
17
+ .addDynamicThreadPredicate(cls::isInstance)
18
+ .nonBlockingThreadPredicate { p ->
19
+ p.or { thread ->
20
+ val castThread = cls.safeCast(thread)
21
+ if (! enabled || castThread == null ) {
22
+ false
23
+ } else {
24
+ val state = field(castThread) as Enum <* >
25
+ state.name == " CPU_ACQUIRED"
26
+ }
27
+ }
28
+ }
29
+ .install()
30
+ }
31
+
32
+ @Volatile
33
+ private var enabled = false
34
+
35
+ fun install () {
36
+ enabled = true
37
+ }
38
+
39
+ fun uninstall () {
40
+ enabled = false
41
+ }
42
+
43
+ }
Original file line number Diff line number Diff line change @@ -66,6 +66,8 @@ internal object DebugProbesImpl {
66
66
.name(cl.name)
67
67
.make()
68
68
.load(cl.classLoader, ClassReloadingStrategy .fromInstalledAgent())
69
+
70
+ BlockHoundIntegration .install()
69
71
}
70
72
71
73
public fun uninstall (): Unit = coroutineStateLock.write {
@@ -82,6 +84,8 @@ internal object DebugProbesImpl {
82
84
.name(cl.name)
83
85
.make()
84
86
.load(cl.classLoader, ClassReloadingStrategy .fromInstalledAgent())
87
+
88
+ BlockHoundIntegration .uninstall()
85
89
}
86
90
87
91
public fun hierarchyToString (job : Job ): String = coroutineStateLock.write {
Original file line number Diff line number Diff line change
1
+ package kotlinx.coroutines.debug
2
+ import kotlinx.coroutines.*
3
+ import kotlinx.coroutines.debug.internal.BlockHoundIntegration
4
+ import org.junit.*
5
+ import reactor.blockhound.BlockingOperationError
6
+
7
+ class BlockHoundTest : TestBase () {
8
+
9
+ @Before
10
+ fun init () {
11
+ BlockHoundIntegration .install()
12
+ }
13
+
14
+ @After
15
+ fun deinit () {
16
+ BlockHoundIntegration .uninstall()
17
+ }
18
+
19
+ @Test(expected = BlockingOperationError ::class )
20
+ fun shouldDetectBlockingInDefault () = runTest {
21
+ withContext(Dispatchers .Default ) {
22
+ Thread .sleep(1 )
23
+ }
24
+ }
25
+
26
+ @Test
27
+ fun shouldNotDetectBlockingInIO () = runTest {
28
+ withContext(Dispatchers .IO ) {
29
+ Thread .sleep(1 )
30
+ }
31
+ }
32
+
33
+ @Test
34
+ fun shouldNotDetectNonblocking () = runTest {
35
+ withContext(Dispatchers .Default ) {
36
+ val a = 1
37
+ val b = 2
38
+ assert (a + b == 3 )
39
+ }
40
+ }
41
+
42
+ @Test
43
+ fun testReusingThreads () = runTest {
44
+ val n = 100
45
+ repeat(n) {
46
+ async(Dispatchers .IO ) {
47
+ Thread .sleep(1 )
48
+ }
49
+ }
50
+ repeat(n) {
51
+ async(Dispatchers .Default ) {
52
+ }
53
+ }
54
+ repeat(n) {
55
+ async(Dispatchers .IO ) {
56
+ Thread .sleep(1 )
57
+ }
58
+ }
59
+ }
60
+
61
+ @Test(expected = BlockingOperationError ::class )
62
+ fun testReusingThreadsFailure () = runTest {
63
+ val n = 100
64
+ repeat(n) {
65
+ async(Dispatchers .IO ) {
66
+ Thread .sleep(1 )
67
+ }
68
+ }
69
+ async(Dispatchers .Default ) {
70
+ Thread .sleep(1 )
71
+ }
72
+ repeat(n) {
73
+ async(Dispatchers .IO ) {
74
+ Thread .sleep(1 )
75
+ }
76
+ }
77
+ }
78
+
79
+ }
You can’t perform that action at this time.
0 commit comments