File tree 5 files changed +130
-0
lines changed
5 files changed +130
-0
lines changed Original file line number Diff line number Diff line change @@ -51,6 +51,7 @@ buildscript {
51
51
repositories {
52
52
jcenter()
53
53
maven { url " https://kotlin.bintray.com/kotlinx" }
54
+ maven { url " https://repo.spring.io/snapshot/" }
54
55
maven {
55
56
url " https://kotlin.bintray.com/kotlin-dev"
56
57
credentials {
@@ -133,6 +134,7 @@ allprojects {
133
134
jcenter()
134
135
maven {
135
136
url " https://kotlin.bintray.com/kotlin-dev"
137
+ url " https://repo.spring.io/snapshot/"
136
138
credentials {
137
139
username = project. hasProperty(' bintrayUser' ) ? project. property(' bintrayUser' ) : System . getenv(' BINTRAY_USER' ) ?: " "
138
140
password = project. hasProperty(' bintrayApiKey' ) ? project. property(' bintrayApiKey' ) : System . getenv(' BINTRAY_API_KEY' ) ?: " "
Original file line number Diff line number Diff line change @@ -22,6 +22,8 @@ dependencies {
22
22
compileOnly " junit:junit:$junit_version "
23
23
shadowDeps " net.bytebuddy:byte-buddy:$byte_buddy_version "
24
24
shadowDeps " net.bytebuddy:byte-buddy-agent:$byte_buddy_version "
25
+ compile ' io.projectreactor.tools:blockhound:1.0.1.BUILD-SNAPSHOT'
26
+ compile " org.jetbrains.kotlin:kotlin-reflect:$kotlin_version "
25
27
}
26
28
27
29
jar {
Original file line number Diff line number Diff line change
1
+ package kotlinx.coroutines.debug.internal
2
+
3
+ import reactor.blockhound.BlockHound
4
+ import kotlin.reflect.KClass
5
+ import kotlin.reflect.full.*
6
+
7
+ internal object BlockHoundIntegration {
8
+
9
+ init {
10
+ val cls = Class .forName(" kotlinx.coroutines.scheduling.CoroutineScheduler\$ Worker" ).kotlin
11
+ initializerHelper(cls)
12
+ }
13
+
14
+ private fun <T : Any > initializerHelper (cls : KClass <T >) {
15
+ val field = cls.declaredMemberProperties.find { it.name == " state" }!!
16
+ BlockHound .builder()
17
+ .addDynamicThreadPredicate(cls::isInstance)
18
+ .nonBlockingThreadPredicate { p ->
19
+ p.or { thread ->
20
+ val castThread = cls.safeCast(thread)
21
+ if (! enabled || castThread == null ) {
22
+ false
23
+ } else {
24
+ val state = field(castThread) as Enum <* >
25
+ state.name == " CPU_ACQUIRED"
26
+ }
27
+ }
28
+ }
29
+ .install()
30
+ }
31
+
32
+ @Volatile
33
+ private var enabled = false
34
+
35
+ fun install () {
36
+ enabled = true
37
+ }
38
+
39
+ fun uninstall () {
40
+ enabled = false
41
+ }
42
+
43
+ }
Original file line number Diff line number Diff line change @@ -56,6 +56,8 @@ internal object DebugProbesImpl {
56
56
.name(cl.name)
57
57
.make()
58
58
.load(cl.classLoader, ClassReloadingStrategy .fromInstalledAgent())
59
+
60
+ BlockHoundIntegration .install()
59
61
}
60
62
61
63
@Synchronized
@@ -73,6 +75,8 @@ internal object DebugProbesImpl {
73
75
.name(cl.name)
74
76
.make()
75
77
.load(cl.classLoader, ClassReloadingStrategy .fromInstalledAgent())
78
+
79
+ BlockHoundIntegration .uninstall()
76
80
}
77
81
78
82
@Synchronized
Original file line number Diff line number Diff line change
1
+ package kotlinx.coroutines.debug
2
+ import kotlinx.coroutines.*
3
+ import kotlinx.coroutines.debug.internal.BlockHoundIntegration
4
+ import org.junit.*
5
+ import reactor.blockhound.BlockingOperationError
6
+
7
+ class BlockHoundTest : TestBase () {
8
+
9
+ @Before
10
+ fun init () {
11
+ BlockHoundIntegration .install()
12
+ }
13
+
14
+ @After
15
+ fun deinit () {
16
+ BlockHoundIntegration .uninstall()
17
+ }
18
+
19
+ @Test(expected = BlockingOperationError ::class )
20
+ fun shouldDetectBlockingInDefault () = runTest {
21
+ withContext(Dispatchers .Default ) {
22
+ Thread .sleep(1 )
23
+ }
24
+ }
25
+
26
+ @Test
27
+ fun shouldNotDetectBlockingInIO () = runTest {
28
+ withContext(Dispatchers .IO ) {
29
+ Thread .sleep(1 )
30
+ }
31
+ }
32
+
33
+ @Test
34
+ fun shouldNotDetectNonblocking () = runTest {
35
+ withContext(Dispatchers .Default ) {
36
+ val a = 1
37
+ val b = 2
38
+ assert (a + b == 3 )
39
+ }
40
+ }
41
+
42
+ @Test
43
+ fun testReusingThreads () = runTest {
44
+ val n = 100
45
+ repeat(n) {
46
+ async(Dispatchers .IO ) {
47
+ Thread .sleep(1 )
48
+ }
49
+ }
50
+ repeat(n) {
51
+ async(Dispatchers .Default ) {
52
+ }
53
+ }
54
+ repeat(n) {
55
+ async(Dispatchers .IO ) {
56
+ Thread .sleep(1 )
57
+ }
58
+ }
59
+ }
60
+
61
+ @Test(expected = BlockingOperationError ::class )
62
+ fun testReusingThreadsFailure () = runTest {
63
+ val n = 100
64
+ repeat(n) {
65
+ async(Dispatchers .IO ) {
66
+ Thread .sleep(1 )
67
+ }
68
+ }
69
+ async(Dispatchers .Default ) {
70
+ Thread .sleep(1 )
71
+ }
72
+ repeat(n) {
73
+ async(Dispatchers .IO ) {
74
+ Thread .sleep(1 )
75
+ }
76
+ }
77
+ }
78
+
79
+ }
You can’t perform that action at this time.
0 commit comments