Skip to content

Commit 4fe8eec

Browse files
author
Sergey Mashkov
committed
IO: optimize delegation
1 parent cc4d12a commit 4fe8eec

File tree

1 file changed

+34
-37
lines changed

1 file changed

+34
-37
lines changed

core/kotlinx-coroutines-io/src/main/kotlin/kotlinx/coroutines/experimental/io/ByteBufferChannel.kt

+34-37
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ internal class ByteBufferChannel(
294294
private fun ByteBuffer.carryIndex(idx: Int) = if (idx >= capacity() - reservedSize) idx - (capacity() - reservedSize) else idx
295295

296296
private inline fun writing(block: ByteBufferChannel.(ByteBuffer, RingBufferCapacity) -> Unit) {
297-
val current = resolveDelegation(this)
297+
val current = joining?.let { resolveDelegation(this, it) } ?: this
298298
val buffer = current.setupStateForWrite() ?: return
299299
val capacity = current.state.capacity
300300

@@ -754,9 +754,13 @@ internal class ByteBufferChannel(
754754
resumeWriteOp()
755755
}
756756

757-
private tailrec fun resolveDelegation(current: ByteBufferChannel): ByteBufferChannel {
758-
val joined = current.joining ?: return current
759-
if (current.state === ReadWriteBufferState.Terminated) return resolveDelegation(joined.delegatedTo)
757+
private tailrec fun resolveDelegation(current: ByteBufferChannel, joining: JoiningState): ByteBufferChannel {
758+
if (current.state === ReadWriteBufferState.Terminated) {
759+
val joinedTo = joining.delegatedTo
760+
val nextJoining = joinedTo.joining ?: return joinedTo
761+
return resolveDelegation(joinedTo, nextJoining)
762+
}
763+
760764
return current
761765
}
762766

@@ -774,8 +778,7 @@ internal class ByteBufferChannel(
774778
}
775779

776780
suspend override fun writeByte(b: Byte) {
777-
val delegated = resolveDelegation(this)
778-
if (delegated !== this) return delegated.writeByte(b)
781+
joining?.let { return resolveDelegation(this, it).writeByte(b) }
779782

780783
val buffer = setupStateForWrite() ?: return delegateByte(b)
781784
val c = state.capacity
@@ -823,8 +826,7 @@ internal class ByteBufferChannel(
823826
}
824827

825828
suspend override fun writeShort(s: Short) {
826-
val delegated = resolveDelegation(this)
827-
if (delegated !== this) return delegated.writeShort(s)
829+
joining?.let { return resolveDelegation(this, it).writeShort(s) }
828830

829831
val buffer = setupStateForWrite() ?: return delegateShort(s)
830832
val c = state.capacity
@@ -904,7 +906,7 @@ internal class ByteBufferChannel(
904906
suspend override fun writeInt(i: Int) {
905907
val buffer = setupStateForWrite()
906908
if (buffer == null) {
907-
val delegation = resolveDelegation(this)
909+
val delegation = resolveDelegation(this, joining!!)
908910
if (delegation !== this) return delegation.writeInt(i)
909911
else return delegateSuspend(joining!!, { writeInt(i) })
910912
}
@@ -963,8 +965,7 @@ internal class ByteBufferChannel(
963965
}
964966

965967
suspend override fun writeLong(l: Long) {
966-
val delegated = resolveDelegation(this)
967-
if (delegated !== this) return delegated.writeLong(l)
968+
joining?.let { return resolveDelegation(this, it).writeLong(l) }
968969

969970
val buffer = setupStateForWrite() ?: return delegateLong(l)
970971
val c = state.capacity
@@ -1003,43 +1004,43 @@ internal class ByteBufferChannel(
10031004
}
10041005

10051006
suspend override fun writeAvailable(src: ByteBuffer): Int {
1006-
val delegated = resolveDelegation(this)
1007-
if (delegated !== this) return delegated.writeAvailable(src)
1007+
joining?.let { return resolveDelegation(this, it).writeAvailable(src) }
10081008

10091009
val copied = writeAsMuchAsPossible(src)
10101010
if (copied > 0) return copied
10111011

1012-
return resolveDelegation(this).writeAvailableSuspend(src)
1012+
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1013+
return writeAvailableSuspend(src)
10131014
}
10141015

10151016
suspend override fun writeAvailable(src: BufferView): Int {
1017+
joining?.let { return resolveDelegation(this, it).writeAvailable(src) }
1018+
10161019
val copied = writeAsMuchAsPossible(src)
10171020
if (copied > 0) return copied
10181021

1019-
return resolveDelegation(this).writeAvailableSuspend(src)
1022+
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
1023+
return writeAvailableSuspend(src)
10201024
}
10211025

10221026
private suspend fun writeAvailableSuspend(src: ByteBuffer): Int {
10231027
writeSuspend(1) // here we don't need to restoreStateAfterWrite as write copy loop doesn't hold state
10241028

1025-
val delegated = resolveDelegation(this)
1026-
if (delegated !== this) return delegated.writeAvailableSuspend(src)
1029+
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
10271030

10281031
return writeAvailable(src)
10291032
}
10301033

10311034
private suspend fun writeAvailableSuspend(src: BufferView): Int {
10321035
writeSuspend(1)
10331036

1034-
val delegated = resolveDelegation(this)
1035-
if (delegated !== this) return delegated.writeAvailableSuspend(src)
1037+
joining?.let { return resolveDelegation(this, it).writeAvailableSuspend(src) }
10361038

10371039
return writeAvailable(src)
10381040
}
10391041

10401042
suspend override fun writeFully(src: ByteBuffer) {
1041-
val delegated = resolveDelegation(this)
1042-
if (delegated !== this) return delegated.writeFully(src)
1043+
joining?.let { return resolveDelegation(this, it).writeFully(src) }
10431044

10441045
writeAsMuchAsPossible(src)
10451046
if (!src.hasRemaining()) return
@@ -1058,8 +1059,7 @@ internal class ByteBufferChannel(
10581059
while (src.hasRemaining()) {
10591060
writeSuspend(1)
10601061

1061-
val delegated = resolveDelegation(this)
1062-
if (delegated !== this) return delegated.writeFullySuspend(src)
1062+
joining?.let { return resolveDelegation(this, it).writeFully(src) }
10631063

10641064
writeAsMuchAsPossible(src)
10651065
}
@@ -1069,8 +1069,7 @@ internal class ByteBufferChannel(
10691069
while (src.canRead()) {
10701070
writeSuspend(1)
10711071

1072-
val delegated = resolveDelegation(this)
1073-
if (delegated !== this) return delegated.writeFullySuspend(src)
1072+
joining?.let { return resolveDelegation(this, it).writeFully(src) }
10741073

10751074
writeAsMuchAsPossible(src)
10761075
}
@@ -1314,8 +1313,7 @@ internal class ByteBufferChannel(
13141313
}
13151314

13161315
suspend override fun writeFully(src: ByteArray, offset: Int, length: Int) {
1317-
val delegated = resolveDelegation(this)
1318-
if (delegated !== this) return delegated.writeFully(src, offset, length)
1316+
joining?.let { return resolveDelegation(this, it).writeFully(src, offset, length) }
13191317

13201318
var rem = length
13211319
var off = offset
@@ -1340,8 +1338,7 @@ internal class ByteBufferChannel(
13401338
}
13411339

13421340
suspend override fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
1343-
val delegated = resolveDelegation(this)
1344-
if (delegated !== this) return delegated.writeAvailable(src, offset, length)
1341+
joining?.let { return resolveDelegation(this, it).writeAvailable(src, offset, length) }
13451342

13461343
val size = writeAsMuchAsPossible(src, offset, length)
13471344
if (size > 0) return size
@@ -1352,8 +1349,7 @@ internal class ByteBufferChannel(
13521349
while (true) {
13531350
writeSuspend(1)
13541351

1355-
val delegated = resolveDelegation(this)
1356-
if (delegated !== this) return delegated.writeSuspend(src, offset, length)
1352+
joining?.let { return resolveDelegation(this, it).writeSuspend(src, offset, length) }
13571353

13581354
val size = writeAsMuchAsPossible(src, offset, length)
13591355
if (size > 0) return size
@@ -1387,7 +1383,8 @@ internal class ByteBufferChannel(
13871383

13881384
private suspend fun writeBlockSuspend(min: Int, block: (ByteBuffer) -> Unit) {
13891385
writeSuspend(min)
1390-
resolveDelegation(this).write(min, block)
1386+
joining?.let { return resolveDelegation(this, it).write(min, block) }
1387+
return write(min, block)
13911388
}
13921389

13931390
override suspend fun read(min: Int, block: (ByteBuffer) -> Unit) {
@@ -1420,8 +1417,7 @@ internal class ByteBufferChannel(
14201417
}
14211418

14221419
suspend override fun writePacket(packet: ByteReadPacket) {
1423-
val delegated = resolveDelegation(this)
1424-
if (delegated !== this) return delegated.writePacket(packet)
1420+
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
14251421

14261422
try {
14271423
while (!packet.isEmpty) {
@@ -1433,16 +1429,17 @@ internal class ByteBufferChannel(
14331429
}
14341430

14351431
if (packet.remaining > 0) {
1436-
return resolveDelegation(this).writePacketSuspend(packet)
1432+
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
1433+
return writePacketSuspend(packet)
14371434
}
14381435
}
14391436

14401437
private suspend fun writePacketSuspend(packet: ByteReadPacket) {
14411438
try {
14421439
while (!packet.isEmpty) {
14431440
writeSuspend(1)
1444-
val delegated = resolveDelegation(this)
1445-
if (delegated !== this) return delegated.writePacketSuspend(packet)
1441+
1442+
joining?.let { return resolveDelegation(this, it).writePacket(packet) }
14461443
tryWritePacketPart(packet)
14471444
}
14481445
} finally {

0 commit comments

Comments
 (0)