Skip to content

Commit 2874d16

Browse files
author
Sergey Mashkov
committed
IO: optimize delegation (part 2)
1 parent 25bc365 commit 2874d16

File tree

1 file changed

+23
-22
lines changed

1 file changed

+23
-22
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

+23-22
Original file line numberDiff line numberDiff line change
@@ -754,14 +754,14 @@ internal class ByteBufferChannel(
754754
resumeWriteOp()
755755
}
756756

757-
private tailrec fun resolveDelegation(current: ByteBufferChannel, joining: JoiningState): ByteBufferChannel {
757+
private tailrec fun resolveDelegation(current: ByteBufferChannel, joining: JoiningState): ByteBufferChannel? {
758758
if (current.state === ReadWriteBufferState.Terminated) {
759759
val joinedTo = joining.delegatedTo
760760
val nextJoining = joinedTo.joining ?: return joinedTo
761761
return resolveDelegation(joinedTo, nextJoining)
762762
}
763763

764-
return current
764+
return null
765765
}
766766

767767
private suspend fun delegateByte(b: Byte) {
@@ -778,7 +778,7 @@ internal class ByteBufferChannel(
778778
}
779779

780780
suspend override fun writeByte(b: Byte) {
781-
joining?.let { return resolveDelegation(this, it).writeByte(b) }
781+
joining?.let { resolveDelegation(this, it)?.let { return it.writeByte(b) } }
782782

783783
val buffer = setupStateForWrite() ?: return delegateByte(b)
784784
val c = state.capacity
@@ -826,7 +826,7 @@ internal class ByteBufferChannel(
826826
}
827827

828828
suspend override fun writeShort(s: Short) {
829-
joining?.let { return resolveDelegation(this, it).writeShort(s) }
829+
joining?.let { resolveDelegation(this, it)?.let { return it.writeShort(s) } }
830830

831831
val buffer = setupStateForWrite() ?: return delegateShort(s)
832832
val c = state.capacity
@@ -907,7 +907,8 @@ internal class ByteBufferChannel(
907907
val buffer = setupStateForWrite()
908908
if (buffer == null) {
909909
val delegation = resolveDelegation(this, joining!!)
910-
if (delegation !== this) return delegation.writeInt(i)
910+
@Suppress("SuspiciousEqualsCombination")
911+
if (delegation != null && delegation !== this) return delegation.writeInt(i)
911912
else return delegateSuspend(joining!!, { writeInt(i) })
912913
}
913914
val c = state.capacity
@@ -965,7 +966,7 @@ internal class ByteBufferChannel(
965966
}
966967

967968
suspend override fun writeLong(l: Long) {
968-
joining?.let { return resolveDelegation(this, it).writeLong(l) }
969+
joining?.let { resolveDelegation(this, it)?.let { return it.writeLong(l) } }
969970

970971
val buffer = setupStateForWrite() ?: return delegateLong(l)
971972
val c = state.capacity
@@ -1004,43 +1005,43 @@ internal class ByteBufferChannel(
10041005
}
10051006

10061007
suspend override fun writeAvailable(src: ByteBuffer): Int {
1007-
joining?.let { return resolveDelegation(this, it).writeAvailable(src) }
1008+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src) } }
10081009

10091010
val copied = writeAsMuchAsPossible(src)
10101011
if (copied > 0) return copied
10111012

1012-
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1013+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
10131014
return writeAvailableSuspend(src)
10141015
}
10151016

10161017
suspend override fun writeAvailable(src: BufferView): Int {
1017-
joining?.let { return resolveDelegation(this, it).writeAvailable(src) }
1018+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src) } }
10181019

10191020
val copied = writeAsMuchAsPossible(src)
10201021
if (copied > 0) return copied
10211022

1022-
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1023+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
10231024
return writeAvailableSuspend(src)
10241025
}
10251026

10261027
private suspend fun writeAvailableSuspend(src: ByteBuffer): Int {
10271028
writeSuspend(1) // here we don't need to restoreStateAfterWrite as write copy loop doesn't hold state
10281029

1029-
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1030+
joining?.let { resolveDelegation(this, it)?.let { return it .writeAvailableSuspend(src) } }
10301031

10311032
return writeAvailable(src)
10321033
}
10331034

10341035
private suspend fun writeAvailableSuspend(src: BufferView): Int {
10351036
writeSuspend(1)
10361037

1037-
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1038+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailableSuspend(src) } }
10381039

10391040
return writeAvailable(src)
10401041
}
10411042

10421043
suspend override fun writeFully(src: ByteBuffer) {
1043-
joining?.let { return resolveDelegation(this, it).writeFully(src) }
1044+
joining?.let { resolveDelegation(this, it)?.let { return .writeFully(src) } }
10441045

10451046
writeAsMuchAsPossible(src)
10461047
if (!src.hasRemaining()) return
@@ -1059,7 +1060,7 @@ internal class ByteBufferChannel(
10591060
while (src.hasRemaining()) {
10601061
writeSuspend(1)
10611062

1062-
joining?.let { return resolveDelegation(this, it).writeFully(src) }
1063+
joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
10631064

10641065
writeAsMuchAsPossible(src)
10651066
}
@@ -1069,7 +1070,7 @@ internal class ByteBufferChannel(
10691070
while (src.canRead()) {
10701071
writeSuspend(1)
10711072

1072-
joining?.let { return resolveDelegation(this, it).writeFully(src) }
1073+
joining?.let { resolveDelegation(this, it)?.let { return it.writeFully(src) } }
10731074

10741075
writeAsMuchAsPossible(src)
10751076
}
@@ -1313,7 +1314,7 @@ internal class ByteBufferChannel(
13131314
}
13141315

13151316
suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
1316-
joining?.let { return resolveDelegation(this, it).writeFully(src, offset, length) }
1317+
joining?.let { resolveDelegation(this, it)?.let { return it .writeFully(src, offset, length) } }
13171318

13181319
var rem = length
13191320
var off = offset
@@ -1338,7 +1339,7 @@ internal class ByteBufferChannel(
13381339
}
13391340

13401341
suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
1341-
joining?.let { return resolveDelegation(this, it).writeAvailable(src, offset, length) }
1342+
joining?.let { resolveDelegation(this, it)?.let { return it.writeAvailable(src, offset, length) } }
13421343

13431344
val size = writeAsMuchAsPossible(src, offset, length)
13441345
if (size > 0) return size
@@ -1349,7 +1350,7 @@ internal class ByteBufferChannel(
13491350
while (true) {
13501351
writeSuspend(1)
13511352

1352-
joining?.let { return resolveDelegation(this, it).writeSuspend(src, offset, length) }
1353+
joining?.let { resolveDelegation(this, it)?.let { return it.writeSuspend(src, offset, length) } }
13531354

13541355
val size = writeAsMuchAsPossible(src, offset, length)
13551356
if (size > 0) return size
@@ -1383,7 +1384,7 @@ internal class ByteBufferChannel(
13831384

13841385
private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
13851386
writeSuspend(min)
1386-
joining?.let { return resolveDelegation(this, it).write(min, block) }
1387+
joining?.let { resolveDelegation(this, it)?.let { return it.write(min, block) } }
13871388
return write(min, block)
13881389
}
13891390

@@ -1417,7 +1418,7 @@ internal class ByteBufferChannel(
14171418
}
14181419

14191420
suspend override fun writePacket(packet: ByteReadPacket) {
1420-
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
1421+
joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
14211422

14221423
try {
14231424
while (!packet.isEmpty) {
@@ -1429,7 +1430,7 @@ internal class ByteBufferChannel(
14291430
}
14301431

14311432
if (packet.remaining > 0) {
1432-
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
1433+
joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
14331434
return writePacketSuspend(packet)
14341435
}
14351436
}
@@ -1439,7 +1440,7 @@ internal class ByteBufferChannel(
14391440
while (!packet.isEmpty) {
14401441
writeSuspend(1)
14411442

1442-
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
1443+
joining?.let { resolveDelegation(this, it)?.let { return it.writePacket(packet) } }
14431444
tryWritePacketPart(packet)
14441445
}
14451446
} finally {

0 commit comments

Comments
 (0)