Skip to content

Commit 0a8a211

Browse files
committed
KTOR-1159 Add support for UDP sockets on native
1 parent f2c50d0 commit 0a8a211

File tree

16 files changed

+497
-109
lines changed

16 files changed

+497
-109
lines changed

ktor-io/api/ktor-io.api

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,12 @@ public abstract interface class io/ktor/utils/io/WriterSuspendSession : io/ktor/
397397
public abstract fun tryAwait (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
398398
}
399399

400+
public abstract interface class io/ktor/utils/io/bits/Allocator {
401+
public abstract fun alloc-SK3TCg8 (I)Ljava/nio/ByteBuffer;
402+
public abstract fun alloc-SK3TCg8 (J)Ljava/nio/ByteBuffer;
403+
public abstract fun free-3GNKZMM (Ljava/nio/ByteBuffer;)V
404+
}
405+
400406
public final class io/ktor/utils/io/bits/ByteOrderJVMKt {
401407
public static final fun reverseByteOrder (D)D
402408
public static final fun reverseByteOrder (F)F
@@ -1104,6 +1110,16 @@ public final class io/ktor/utils/io/core/CopyKt {
11041110
public static final fun copyTo (Lio/ktor/utils/io/core/Input;Lio/ktor/utils/io/core/Output;)J
11051111
}
11061112

1113+
public final class io/ktor/utils/io/core/DefaultBufferPool : io/ktor/utils/io/pool/DefaultPool {
1114+
public fun <init> ()V
1115+
public fun <init> (IILio/ktor/utils/io/bits/Allocator;)V
1116+
public synthetic fun <init> (IILio/ktor/utils/io/bits/Allocator;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
1117+
public synthetic fun clearInstance (Ljava/lang/Object;)Ljava/lang/Object;
1118+
public synthetic fun disposeInstance (Ljava/lang/Object;)V
1119+
public synthetic fun produceInstance ()Ljava/lang/Object;
1120+
public synthetic fun validateInstance (Ljava/lang/Object;)V
1121+
}
1122+
11071123
public abstract interface annotation class io/ktor/utils/io/core/ExperimentalIoApi : java/lang/annotation/Annotation {
11081124
}
11091125

ktor-io/common/src/io/ktor/utils/io/bits/MemoryFactory.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ public inline fun <R> withMemory(size: Long, block: (Memory) -> R): R {
5353
internal expect object DefaultAllocator : Allocator
5454

5555
@DangerousInternalIoApi
56-
internal interface Allocator {
57-
fun alloc(size: Int): Memory
56+
public interface Allocator {
57+
public fun alloc(size: Int): Memory
5858

59-
fun alloc(size: Long): Memory
59+
public fun alloc(size: Long): Memory
6060

61-
fun free(instance: Memory)
61+
public fun free(instance: Memory)
6262
}

ktor-io/common/src/io/ktor/utils/io/core/BufferFactory.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.ktor.utils.io.core
22

33
import io.ktor.utils.io.bits.*
4-
import io.ktor.utils.io.core.internal.ChunkBuffer
4+
import io.ktor.utils.io.core.internal.*
55
import io.ktor.utils.io.pool.DefaultPool
66
import io.ktor.utils.io.pool.ObjectPool
77
import kotlin.native.concurrent.ThreadLocal
@@ -54,8 +54,9 @@ internal inline fun <R> withChunkBuffer(pool: ObjectPool<ChunkBuffer>, block: Ch
5454
@Suppress("DEPRECATION")
5555
internal val DefaultChunkedBufferPool: ObjectPool<IoBuffer> = DefaultBufferPool()
5656

57+
@DangerousInternalIoApi
5758
@Suppress("DEPRECATION")
58-
internal class DefaultBufferPool(
59+
public class DefaultBufferPool(
5960
private val bufferSize: Int = DEFAULT_BUFFER_SIZE,
6061

6162
capacity: Int = 1000,

ktor-network/posix/test/SocketTest.kt renamed to ktor-network/common/test/io/ktor/network/sockets/tests/TcpSocketTest.kt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
1-
package io.ktor.network.tests
1+
/*
2+
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.network.sockets.tests
26

37
import io.ktor.network.selector.*
48
import io.ktor.network.sockets.*
9+
import io.ktor.test.dispatcher.*
10+
import io.ktor.util.*
511
import io.ktor.utils.io.*
612
import io.ktor.utils.io.core.*
713
import kotlinx.coroutines.*
814
import kotlin.test.*
915

10-
11-
class SocketTest {
16+
class TcpSocketTest {
1217

1318
@Test
14-
fun testEcho() = runBlocking {
19+
fun testEcho() = testSuspend {
20+
if (!PlatformUtils.IS_JVM && !PlatformUtils.IS_NATIVE) return@testSuspend
1521
SelectorManager().use { selector ->
1622
val tcp = aSocket(selector).tcp()
1723
val server = tcp.bind("127.0.0.1", 8000)

ktor-network/common/test/io/ktor/network/sockets/tests/UdpBroadcastTest.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ class UdpBroadcastTest {
1717

1818
@Test
1919
fun testBroadcastSuccessful() = testSuspend {
20-
if (!PlatformUtils.IS_JVM) return@testSuspend
20+
if (!PlatformUtils.IS_JVM && !PlatformUtils.IS_NATIVE) return@testSuspend
2121
withTimeout(15000) {
22-
SelectorManager().use { selector ->
22+
// TODO: Calling [use] instead of [let] causes [UdpBroadcastTest] to get stuck on native.
23+
SelectorManager().let /*use*/ { selector ->
2324
val serverSocket = CompletableDeferred<BoundDatagramSocket>()
2425
val server = launch {
2526
aSocket(selector)
@@ -36,7 +37,7 @@ class UdpBroadcastTest {
3637

3738
aSocket(selector)
3839
.udp()
39-
.bind {
40+
.bind(NetworkAddress("0.0.0.0", 0)) {
4041
broadcast = true
4142
}
4243
.use { socket ->
@@ -56,7 +57,7 @@ class UdpBroadcastTest {
5657
// TODO: this test does not catch the permission denied exception
5758
// @Test
5859
// fun testBroadcastFails() = testSuspend {
59-
// if (!PlatformUtils.IS_JVM) return@testSuspend
60+
// if (!PlatformUtils.IS_JVM && !PlatformUtils.IS_NATIVE) return@testSuspend
6061
// withTimeout(15000) {
6162
// SelectorManager().use { selector ->
6263
// assertFails {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.network.sockets.tests
6+
7+
import io.ktor.network.selector.*
8+
import io.ktor.network.sockets.*
9+
import io.ktor.test.dispatcher.*
10+
import io.ktor.util.*
11+
import io.ktor.util.network.*
12+
import io.ktor.utils.io.core.*
13+
import kotlinx.coroutines.*
14+
import kotlin.test.*
15+
16+
class UdpSocketTest {
17+
18+
@Test
19+
fun test(): Unit = testSuspend {
20+
if (!PlatformUtils.IS_JVM && !PlatformUtils.IS_NATIVE) return@testSuspend
21+
withTimeout(15000) {
22+
// TODO: Calling [use] instead of [let] causes [UdpSocketTest] to get stuck on native.
23+
SelectorManager().let /*use*/ { selector ->
24+
aSocket(selector).udp().bind(NetworkAddress("127.0.0.1", 8000)) {
25+
reuseAddress = true
26+
reusePort = true
27+
}.use { socket ->
28+
// Send messages to localhost
29+
launch {
30+
val address = NetworkAddress("127.0.0.1", 8000)
31+
repeat(10) {
32+
val bytePacket = buildPacket { append("hello") }
33+
val data = Datagram(bytePacket, address)
34+
socket.send(data)
35+
}
36+
}
37+
// Receive messages from localhost
38+
repeat(10) {
39+
val incoming = socket.receive()
40+
assertEquals("hello", incoming.packet.readText())
41+
}
42+
}
43+
}
44+
}
45+
}
46+
}

ktor-network/posix/interop/network.def

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
package=io.ktor.network.interop
2+
compilerOpts.ios = -DM_DARWIN
3+
compilerOpts.osx = -DM_DARWIN
4+
compilerOpts.watchos = -DM_DARWIN
5+
compilerOpts.tvos = -DM_DARWIN
6+
compilerOpts.mingw = -DMINGW
27
---
38
#include <sys/select.h>
49

@@ -17,3 +22,17 @@ static inline void select_fd_clear(fd_set *set) {
1722
static inline int select_fd_isset(int descriptor, fd_set* set) {
1823
return FD_ISSET(descriptor, set);
1924
}
25+
26+
#if defined(M_DARWIN)
27+
#include <arpa/inet.h>
28+
static inline const char *ktor_inet_ntop(int af, const void *src, char *dst, socklen_t size) {
29+
return inet_ntop(af, src, dst, size);
30+
}
31+
#elif defined(MINGW)
32+
// TODO
33+
#else
34+
#include <arpa/inet.h>
35+
static inline const char *ktor_inet_ntop(int af, const void *src, char *dst, socklen_t size) {
36+
return inet_ntop(af, src, dst, size);
37+
}
38+
#endif
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.network.sockets
6+
7+
import io.ktor.network.selector.*
8+
import io.ktor.utils.io.*
9+
import io.ktor.utils.io.errors.*
10+
import kotlinx.cinterop.*
11+
import kotlinx.coroutines.*
12+
import platform.posix.*
13+
14+
internal fun CoroutineScope.attachForReadingImpl(
15+
channel: ByteChannel,
16+
descriptor: Int,
17+
selectable: Selectable,
18+
selector: SelectorManager
19+
): WriterJob = writer(Dispatchers.Unconfined, channel) {
20+
while (!channel.isClosedForWrite) {
21+
val count = channel.write { memory, startIndex, endIndex ->
22+
val bufferStart = memory.pointer + startIndex
23+
val size = endIndex - startIndex
24+
val bytesRead = recv(descriptor, bufferStart, size.convert(), 0).toInt()
25+
26+
when (bytesRead) {
27+
0 -> channel.close()
28+
-1 -> {
29+
if (errno == EAGAIN) {
30+
return@write 0
31+
}
32+
throw PosixException.forErrno()
33+
}
34+
}
35+
36+
bytesRead
37+
}
38+
39+
if (count == 0 && !channel.isClosedForWrite) {
40+
selector.select(selectable, SelectInterest.READ)
41+
}
42+
43+
channel.flush()
44+
}
45+
}.apply {
46+
invokeOnCompletion {
47+
shutdown(descriptor, SHUT_RD)
48+
}
49+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package io.ktor.network.sockets
6+
7+
import io.ktor.network.selector.*
8+
import io.ktor.utils.io.*
9+
import io.ktor.utils.io.errors.*
10+
import kotlinx.cinterop.*
11+
import kotlinx.coroutines.*
12+
import platform.posix.*
13+
import kotlin.math.*
14+
15+
internal fun CoroutineScope.attachForWritingImpl(
16+
channel: ByteChannel,
17+
descriptor: Int,
18+
selectable: Selectable,
19+
selector: SelectorManager
20+
): ReaderJob = reader(Dispatchers.Unconfined, channel) {
21+
var sockedClosed = false
22+
var needSelect = false
23+
var total = 0
24+
while (!sockedClosed && !channel.isClosedForRead) {
25+
val count = channel.read { memory, start, stop ->
26+
val bufferStart = memory.pointer + start
27+
val remaining = stop - start
28+
val bytesWritten = if (remaining > 0) {
29+
send(descriptor, bufferStart, remaining.convert(), 0).toInt()
30+
} else 0
31+
32+
when (bytesWritten) {
33+
0 -> sockedClosed = true
34+
-1 -> {
35+
if (errno == EAGAIN) {
36+
needSelect = true
37+
} else {
38+
throw PosixException.forErrno()
39+
}
40+
}
41+
}
42+
43+
max(0, bytesWritten)
44+
}
45+
46+
total += count
47+
if (!sockedClosed && needSelect) {
48+
selector.select(selectable, SelectInterest.WRITE)
49+
needSelect = false
50+
}
51+
}
52+
53+
if (!channel.isClosedForRead) {
54+
val availableForRead = channel.availableForRead
55+
val cause = IOException("Failed writing to closed socket. Some bytes remaining: $availableForRead")
56+
channel.cancel(cause)
57+
}
58+
59+
}.apply {
60+
invokeOnCompletion {
61+
shutdown(descriptor, SHUT_WR)
62+
}
63+
}

ktor-network/posix/src/io/ktor/network/sockets/ConnectUtilsNative.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ internal actual suspend fun connect(
2525
connect(descriptor, address, size).check()
2626
}
2727

28-
fcntl(descriptor, F_SETFL, O_NONBLOCK).check()
28+
assignOptions(descriptor, socketOptions)
29+
nonBlocking(descriptor)
2930

3031
val localAddress = getLocalAddress(descriptor)
3132

@@ -53,15 +54,15 @@ internal actual fun bind(
5354
val address = localAddress?.address ?: getAnyLocalAddress()
5455
val descriptor = socket(address.family.convert(), SOCK_STREAM, 0).check()
5556

56-
fcntl(descriptor, F_SETFL, O_NONBLOCK).check { it == 0 }
57+
assignOptions(descriptor, socketOptions)
58+
nonBlocking(descriptor)
5759

5860
address.nativeAddress { address, size ->
5961
bind(descriptor, address, size).check()
6062
}
6163

6264
listen(descriptor, DEFAULT_BACKLOG_SIZE).check()
6365

64-
6566
return TCPServerSocketNative(
6667
descriptor,
6768
selector,

0 commit comments

Comments
 (0)