From fbc4a84cfcad0982f1fc9794101c2c4d59a4fc4f Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Thu, 19 Dec 2024 18:25:24 +1100 Subject: [PATCH 1/5] chore: refactor speaker & handshaker into actors --- Coder Desktop/Proto/Receiver.swift | 2 +- Coder Desktop/Proto/Speaker.swift | 129 ++++++++++++++------ Coder Desktop/ProtoTests/SpeakerTests.swift | 101 ++++++--------- 3 files changed, 124 insertions(+), 108 deletions(-) diff --git a/Coder Desktop/Proto/Receiver.swift b/Coder Desktop/Proto/Receiver.swift index 6a279e6..2797bad 100644 --- a/Coder Desktop/Proto/Receiver.swift +++ b/Coder Desktop/Proto/Receiver.swift @@ -57,7 +57,7 @@ actor Receiver { /// Starts reading protocol messages from the `DispatchIO` channel and returns them as an `AsyncStream` of messages. /// On read or decoding error, it logs and closes the stream. - func messages() throws -> AsyncStream { + func messages() throws(ReceiveError) -> AsyncStream { if running { throw ReceiveError.alreadyRunning } diff --git a/Coder Desktop/Proto/Speaker.swift b/Coder Desktop/Proto/Speaker.swift index ca0740d..6184858 100644 --- a/Coder Desktop/Proto/Speaker.swift +++ b/Coder Desktop/Proto/Speaker.swift @@ -6,7 +6,7 @@ let newLine = 0x0A let headerPreamble = "codervpn" /// A message that has the `rpc` property for recording participation in a unary RPC. -protocol RPCMessage { +protocol RPCMessage: Sendable { var rpc: Vpn_RPC { get set } /// Returns true if `rpc` has been explicitly set. var hasRpc: Bool { get } @@ -49,8 +49,8 @@ struct ProtoVersion: CustomStringConvertible, Equatable, Codable { } } -/// An abstract base class for implementations that need to communicate using the VPN protocol. -class Speaker { +/// An actor that communicates using the VPN protocol +actor Speaker { private let logger = Logger(subsystem: "com.coder.Coder-Desktop", category: "proto") private let writeFD: FileHandle private let readFD: FileHandle @@ -59,6 +59,8 @@ class Speaker { private let sender: Sender private let receiver: Receiver private let secretary = RPCSecretary() + private var messageBuffer: MessageBuffer = .init() + private var readLoopTask: Task? let role: ProtoRole /// Creates an instance that communicates over the provided file handles. @@ -93,41 +95,45 @@ class Speaker { try _ = await hndsh.handshake() } - /// Reads and handles protocol messages. - func readLoop() async throws { - for try await msg in try await receiver.messages() { - guard msg.hasRpc else { - handleMessage(msg) - continue - } - guard msg.rpc.msgID == 0 else { - let req = RPCRequest(req: msg, sender: sender) - handleRPC(req) - continue - } - guard msg.rpc.responseTo == 0 else { - logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)") - do throws(RPCError) { - try await self.secretary.route(reply: msg) - } catch { - logger.error( - "couldn't route RPC reply for \(msg.rpc.responseTo): \(error)") + public func start() { + guard readLoopTask == nil else { + logger.error("speaker is already running") + return + } + readLoopTask = Task { + do throws(ReceiveError) { + for try await msg in try await self.receiver.messages() { + guard msg.hasRpc else { + await messageBuffer.push(.message(msg)) + continue + } + guard msg.rpc.msgID == 0 else { + let req = RPCRequest(req: msg, sender: self.sender) + await messageBuffer.push(.RPC(req)) + continue + } + guard msg.rpc.responseTo == 0 else { + self.logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)") + do throws(RPCError) { + try await self.secretary.route(reply: msg) + } catch { + self.logger.error( + "couldn't route RPC reply for \(msg.rpc.responseTo): \(error)") + } + continue + } } - continue + } catch { + self.logger.error("failed to receive messages: \(error)") } } } - /// Handles a single non-RPC message. It is expected that subclasses override this method with their own handlers. - func handleMessage(_ msg: RecvMsg) { - // just log - logger.debug("got non-RPC message \(msg.textFormatString())") - } - - /// Handle a single RPC request. It is expected that subclasses override this method with their own handlers. - func handleRPC(_ req: RPCRequest) { - // just log - logger.debug("got RPC message \(req.msg.textFormatString())") + func wait() async throws { + guard let task = readLoopTask else { + return + } + try await task.value } /// Send a unary RPC message and handle the response @@ -166,10 +172,51 @@ class Speaker { logger.error("failed to close read file handle: \(error)") } } + + enum IncomingMessage { + case message(RecvMsg) + case RPC(RPCRequest) + } + + private actor MessageBuffer { + private var messages: [IncomingMessage] = [] + private var continuations: [CheckedContinuation] = [] + + func push(_ message: IncomingMessage?) { + if let continuation = continuations.first { + continuations.removeFirst() + continuation.resume(returning: message) + } else if let message = message { + messages.append(message) + } + } + + func next() async -> IncomingMessage? { + if let message = messages.first { + messages.removeFirst() + return message + } + return await withCheckedContinuation { continuation in + continuations.append(continuation) + } + } + } } -/// A class that performs the initial VPN protocol handshake and version negotiation. -class Handshaker: @unchecked Sendable { +extension Speaker: AsyncSequence, AsyncIteratorProtocol { + typealias Element = IncomingMessage + + public nonisolated func makeAsyncIterator() -> Speaker { + self + } + + func next() async throws -> IncomingMessage? { + return await messageBuffer.next() + } +} + +/// An actor performs the initial VPN protocol handshake and version negotiation. +actor Handshaker { private let writeFD: FileHandle private let dispatch: DispatchIO private var theirData: Data = .init() @@ -193,17 +240,19 @@ class Handshaker: @unchecked Sendable { func handshake() async throws -> ProtoVersion { // kick off the read async before we try to write, synchronously, so we don't deadlock, both // waiting to write with nobody reading. - async let theirs = try withCheckedThrowingContinuation { cont in - continuation = cont - // send in a nil read to kick us off - handleRead(false, nil, 0) + let readTask = Task { + try await withCheckedThrowingContinuation { cont in + self.continuation = cont + // send in a nil read to kick us off + self.handleRead(false, nil, 0) + } } let vStr = versions.map { $0.description }.joined(separator: ",") let ours = String(format: "\(headerPreamble) \(role) \(vStr)\n") try writeFD.write(contentsOf: ours.data(using: .utf8)!) - let theirData = try await theirs + let theirData = try await readTask.value guard let theirsString = String(bytes: theirData, encoding: .utf8) else { throw HandshakeError.invalidHeader(", @unchecked Sendable { - private var msgHandler: CheckedContinuation? - override func handleMessage(_ msg: Vpn_ManagerMessage) { - msgHandler?.resume(returning: msg) - } - - /// Runs the given closure asynchronously and returns the next non-RPC message received. - func expectMessage(with closure: - @escaping @Sendable () async -> Void) async throws -> Vpn_ManagerMessage - { - return try await withCheckedThrowingContinuation { continuation in - msgHandler = continuation - Task { - await closure() - } - } - } - - private var rpcHandler: CheckedContinuation, Error>? - override func handleRPC(_ req: RPCRequest) { - rpcHandler?.resume(returning: req) - } - - /// Runs the given closure asynchronously and return the next non-RPC message received - func expectRPC(with closure: - @escaping @Sendable () async -> Void) async throws -> - RPCRequest - { - return try await withCheckedThrowingContinuation { continuation in - rpcHandler = continuation - Task { - await closure() - } - } - } -} - @Suite(.timeLimit(.minutes(1))) struct SpeakerTests: Sendable { let pipeMT = Pipe() let pipeTM = Pipe() - let uut: TestTunnel + let uut: Speaker let sender: Sender let dispatch: DispatchIO let receiver: Receiver @@ -53,7 +14,7 @@ struct SpeakerTests: Sendable { init() { let queue = DispatchQueue.global(qos: .utility) - uut = TestTunnel( + uut = Speaker( writeFD: pipeTM.fileHandleForWriting, readFD: pipeMT.fileHandleForReading ) @@ -79,39 +40,45 @@ struct SpeakerTests: Sendable { } @Test func handleSingleMessage() async throws { - async let readDone: () = try uut.readLoop() + await uut.start() - let got = try await uut.expectMessage { - var s = Vpn_ManagerMessage() - s.start = Vpn_StartRequest() - await #expect(throws: Never.self) { - try await sender.send(s) - } + var s = Vpn_ManagerMessage() + s.start = Vpn_StartRequest() + await #expect(throws: Never.self) { + try await sender.send(s) } - #expect(got.msg == .start(Vpn_StartRequest())) + let got = try #require(await uut.next()) + guard case let .message(msg) = got else { + Issue.record("Received unexpected message from speaker") + return + } + #expect(msg.msg == .start(Vpn_StartRequest())) try await sender.close() - try await readDone + try await uut.wait() } @Test func handleRPC() async throws { - async let readDone: () = try uut.readLoop() + await uut.start() - let got = try await uut.expectRPC { - var s = Vpn_ManagerMessage() - s.start = Vpn_StartRequest() - s.rpc = Vpn_RPC() - s.rpc.msgID = 33 - await #expect(throws: Never.self) { - try await sender.send(s) - } + var s = Vpn_ManagerMessage() + s.start = Vpn_StartRequest() + s.rpc = Vpn_RPC() + s.rpc.msgID = 33 + await #expect(throws: Never.self) { + try await sender.send(s) + } + let got = try #require(await uut.next()) + guard case let .RPC(req) = got else { + Issue.record("Received unexpected message from speaker") + return } - #expect(got.msg.msg == .start(Vpn_StartRequest())) - #expect(got.msg.rpc.msgID == 33) + #expect(req.msg.msg == .start(Vpn_StartRequest())) + #expect(req.msg.rpc.msgID == 33) var reply = Vpn_TunnelMessage() reply.start = Vpn_StartResponse() reply.rpc.responseTo = 33 - try await got.sendReply(reply) - uut.closeWrite() + try await req.sendReply(reply) + await uut.closeWrite() var count = 0 await #expect(throws: Never.self) { @@ -122,11 +89,11 @@ struct SpeakerTests: Sendable { #expect(count == 1) } try await sender.close() - try await readDone + try await uut.wait() } @Test func sendRPCs() async throws { - async let readDone: () = try uut.readLoop() + await uut.start() async let managerDone = Task { var count = 0 @@ -148,9 +115,9 @@ struct SpeakerTests: Sendable { let got = try await uut.unaryRPC(req) #expect(got.networkSettings.errorMessage == "test \(i)") } - uut.closeWrite() + await uut.closeWrite() _ = await managerDone try await sender.close() - try await readDone + try await uut.wait() } } From 8388b40bcd08e236c403358207ac506b845bc861 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Thu, 19 Dec 2024 18:44:44 +1100 Subject: [PATCH 2/5] rethrow readloop error --- Coder Desktop/Proto/Speaker.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Coder Desktop/Proto/Speaker.swift b/Coder Desktop/Proto/Speaker.swift index 6184858..8a2b71e 100644 --- a/Coder Desktop/Proto/Speaker.swift +++ b/Coder Desktop/Proto/Speaker.swift @@ -125,6 +125,7 @@ actor Speaker { } } catch { self.logger.error("failed to receive messages: \(error)") + throw error } } } From 6af612154fae119a1973510e32548251cb4006e9 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 6 Jan 2025 17:09:38 +1100 Subject: [PATCH 3/5] remove buffer --- Coder Desktop/Proto/Speaker.swift | 62 +++++++-------------- Coder Desktop/ProtoTests/SpeakerTests.swift | 13 ++--- 2 files changed, 23 insertions(+), 52 deletions(-) diff --git a/Coder Desktop/Proto/Speaker.swift b/Coder Desktop/Proto/Speaker.swift index 8a2b71e..37ca7ea 100644 --- a/Coder Desktop/Proto/Speaker.swift +++ b/Coder Desktop/Proto/Speaker.swift @@ -95,48 +95,6 @@ actor Speaker { try _ = await hndsh.handshake() } - public func start() { - guard readLoopTask == nil else { - logger.error("speaker is already running") - return - } - readLoopTask = Task { - do throws(ReceiveError) { - for try await msg in try await self.receiver.messages() { - guard msg.hasRpc else { - await messageBuffer.push(.message(msg)) - continue - } - guard msg.rpc.msgID == 0 else { - let req = RPCRequest(req: msg, sender: self.sender) - await messageBuffer.push(.RPC(req)) - continue - } - guard msg.rpc.responseTo == 0 else { - self.logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)") - do throws(RPCError) { - try await self.secretary.route(reply: msg) - } catch { - self.logger.error( - "couldn't route RPC reply for \(msg.rpc.responseTo): \(error)") - } - continue - } - } - } catch { - self.logger.error("failed to receive messages: \(error)") - throw error - } - } - } - - func wait() async throws { - guard let task = readLoopTask else { - return - } - try await task.value - } - /// Send a unary RPC message and handle the response func unaryRPC(_ req: SendMsg) async throws -> RecvMsg { return try await withCheckedThrowingContinuation { continuation in @@ -212,7 +170,25 @@ extension Speaker: AsyncSequence, AsyncIteratorProtocol { } func next() async throws -> IncomingMessage? { - return await messageBuffer.next() + for try await msg in try await receiver.messages() { + guard msg.hasRpc else { + return .message(msg) + } + guard msg.rpc.msgID == 0 else { + return .RPC(RPCRequest(req: msg, sender: sender)) + } + guard msg.rpc.responseTo == 0 else { + logger.debug("got RPC reply for msgID \(msg.rpc.responseTo)") + do throws(RPCError) { + try await self.secretary.route(reply: msg) + } catch { + logger.error( + "couldn't route RPC reply for \(msg.rpc.responseTo): \(error)") + } + continue + } + } + return nil } } diff --git a/Coder Desktop/ProtoTests/SpeakerTests.swift b/Coder Desktop/ProtoTests/SpeakerTests.swift index 2453821..3a0527e 100644 --- a/Coder Desktop/ProtoTests/SpeakerTests.swift +++ b/Coder Desktop/ProtoTests/SpeakerTests.swift @@ -40,8 +40,6 @@ struct SpeakerTests: Sendable { } @Test func handleSingleMessage() async throws { - await uut.start() - var s = Vpn_ManagerMessage() s.start = Vpn_StartRequest() await #expect(throws: Never.self) { @@ -54,12 +52,9 @@ struct SpeakerTests: Sendable { } #expect(msg.msg == .start(Vpn_StartRequest())) try await sender.close() - try await uut.wait() } @Test func handleRPC() async throws { - await uut.start() - var s = Vpn_ManagerMessage() s.start = Vpn_StartRequest() s.rpc = Vpn_RPC() @@ -89,12 +84,13 @@ struct SpeakerTests: Sendable { #expect(count == 1) } try await sender.close() - try await uut.wait() } @Test func sendRPCs() async throws { - await uut.start() - + // Speaker must be reading from the receiver for `unaryRPC` to return + Task { + for try await _ in uut {} + } async let managerDone = Task { var count = 0 for try await req in try await receiver.messages() { @@ -118,6 +114,5 @@ struct SpeakerTests: Sendable { await uut.closeWrite() _ = await managerDone try await sender.close() - try await uut.wait() } } From a1ab87c3470f249c1cab1f640b48bc515879f5ba Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 6 Jan 2025 17:14:37 +1100 Subject: [PATCH 4/5] rm dead code --- Coder Desktop/Proto/Speaker.swift | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/Coder Desktop/Proto/Speaker.swift b/Coder Desktop/Proto/Speaker.swift index 37ca7ea..6751aee 100644 --- a/Coder Desktop/Proto/Speaker.swift +++ b/Coder Desktop/Proto/Speaker.swift @@ -59,8 +59,6 @@ actor Speaker { private let sender: Sender private let receiver: Receiver private let secretary = RPCSecretary() - private var messageBuffer: MessageBuffer = .init() - private var readLoopTask: Task? let role: ProtoRole /// Creates an instance that communicates over the provided file handles. @@ -136,30 +134,6 @@ actor Speaker { case message(RecvMsg) case RPC(RPCRequest) } - - private actor MessageBuffer { - private var messages: [IncomingMessage] = [] - private var continuations: [CheckedContinuation] = [] - - func push(_ message: IncomingMessage?) { - if let continuation = continuations.first { - continuations.removeFirst() - continuation.resume(returning: message) - } else if let message = message { - messages.append(message) - } - } - - func next() async -> IncomingMessage? { - if let message = messages.first { - messages.removeFirst() - return message - } - return await withCheckedContinuation { continuation in - continuations.append(continuation) - } - } - } } extension Speaker: AsyncSequence, AsyncIteratorProtocol { From f5b96c4083cfb24e985766c9b9e4444efbb83b80 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Mon, 6 Jan 2025 17:17:53 +1100 Subject: [PATCH 5/5] fixup --- Coder Desktop/ProtoTests/SpeakerTests.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Coder Desktop/ProtoTests/SpeakerTests.swift b/Coder Desktop/ProtoTests/SpeakerTests.swift index 3a0527e..3d90769 100644 --- a/Coder Desktop/ProtoTests/SpeakerTests.swift +++ b/Coder Desktop/ProtoTests/SpeakerTests.swift @@ -88,7 +88,7 @@ struct SpeakerTests: Sendable { @Test func sendRPCs() async throws { // Speaker must be reading from the receiver for `unaryRPC` to return - Task { + let readDone = Task { for try await _ in uut {} } async let managerDone = Task { @@ -114,5 +114,6 @@ struct SpeakerTests: Sendable { await uut.closeWrite() _ = await managerDone try await sender.close() + try await readDone.value } }