diff --git a/Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift b/Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift index 091a1c25..f2c7b20a 100644 --- a/Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift +++ b/Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift @@ -36,11 +36,18 @@ class AppDelegate: NSObject, NSApplicationDelegate { override init() { vpn = CoderVPNService() state = AppState(onChange: vpn.configureTunnelProviderProtocol) - fileSyncDaemon = MutagenDaemon() if state.startVPNOnLaunch { vpn.startWhenReady = true } vpn.installSystemExtension() + #if arch(arm64) + let mutagenBinary = "mutagen-darwin-arm64" + #elseif arch(x86_64) + let mutagenBinary = "mutagen-darwin-amd64" + #endif + fileSyncDaemon = MutagenDaemon( + mutagenPath: Bundle.main.url(forResource: mutagenBinary, withExtension: nil) + ) } func applicationDidFinishLaunching(_: Notification) { @@ -73,10 +80,6 @@ class AppDelegate: NSObject, NSApplicationDelegate { state.reconfigure() } } - // TODO: Start the daemon only once a file sync is configured - Task { - await fileSyncDaemon.start() - } } deinit { diff --git a/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift b/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift index 9324c076..68446940 100644 --- a/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift +++ b/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift @@ -2,13 +2,26 @@ import Foundation import GRPC import NIO import os +import Semaphore import Subprocess +import SwiftUI @MainActor public protocol FileSyncDaemon: ObservableObject { var state: DaemonState { get } - func start() async + func start() async throws(DaemonError) func stop() async + func listSessions() async throws -> [FileSyncSession] + func createSession(with: FileSyncSession) async throws +} + +public struct FileSyncSession { + public let id: String + public let name: String + public let localPath: URL + public let workspace: String + public let agent: String + public let remotePath: URL } @MainActor @@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon { @Published public var state: DaemonState = .stopped { didSet { - logger.info("daemon state changed: \(self.state.description, privacy: .public)") + logger.info("daemon state set: \(self.state.description, privacy: .public)") + if case .failed = state { + Task { + try? await cleanupGRPC() + } + mutagenProcess?.kill() + mutagenProcess = nil + } } } @@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon { private let mutagenDataDirectory: URL private let mutagenDaemonSocket: URL + // Non-nil when the daemon is running private var group: MultiThreadedEventLoopGroup? private var channel: GRPCChannel? - private var client: Daemon_DaemonAsyncClient? - - public init() { - #if arch(arm64) - mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil) - #elseif arch(x86_64) - mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil) - #else - fatalError("unknown architecture") - #endif - mutagenDataDirectory = FileManager.default.urls( - for: .applicationSupportDirectory, - in: .userDomainMask - ).first!.appending(path: "Coder Desktop").appending(path: "Mutagen") + private var client: DaemonClient? + + // Protect start & stop transitions against re-entrancy + private let transition = AsyncSemaphore(value: 1) + + public init(mutagenPath: URL? = nil, + mutagenDataDirectory: URL = FileManager.default.urls( + for: .applicationSupportDirectory, + in: .userDomainMask + ).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")) + { + self.mutagenPath = mutagenPath + self.mutagenDataDirectory = mutagenDataDirectory mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock") // It shouldn't be fatal if the app was built without Mutagen embedded, // but file sync will be unavailable. if mutagenPath == nil { logger.warning("Mutagen not embedded in app, file sync will be unavailable") state = .unavailable + return + } + + // If there are sync sessions, the daemon should be running + Task { + do throws(DaemonError) { + try await start() + } catch { + state = .failed(error) + return + } + await stopIfNoSessions() } } - public func start() async { + public func start() async throws(DaemonError) { if case .unavailable = state { return } // Stop an orphaned daemon, if there is one try? await connect() await stop() + await transition.wait() + defer { transition.signal() } + logger.info("starting mutagen daemon") + mutagenProcess = createMutagenProcess() // swiftlint:disable:next large_tuple let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void) do { (standardOutput, standardError, waitForExit) = try mutagenProcess!.run() } catch { - state = .failed(DaemonError.daemonStartFailure(error)) - return + throw .daemonStartFailure(error) } Task { @@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon { do { try await connect() } catch { - state = .failed(DaemonError.daemonStartFailure(error)) - return + throw .daemonStartFailure(error) } + try await waitForDaemonStart() + state = .running logger.info( """ @@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon { ) } + // The daemon takes a moment to open the socket, and we don't want to hog the main actor + // so poll for it on a background thread + private func waitForDaemonStart( + maxAttempts: Int = 5, + attemptInterval: Duration = .milliseconds(100) + ) async throws(DaemonError) { + do { + try await Task.detached(priority: .background) { + for attempt in 0 ... maxAttempts { + do { + _ = try await self.client!.mgmt.version( + Daemon_VersionRequest(), + callOptions: .init(timeLimit: .timeout(.milliseconds(500))) + ) + return + } catch { + if attempt == maxAttempts { + throw error + } + try? await Task.sleep(for: attemptInterval) + } + } + }.value + } catch { + throw .daemonStartFailure(error) + } + } + private func connect() async throws(DaemonError) { guard client == nil else { // Already connected @@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon { transportSecurity: .plaintext, eventLoopGroup: group! ) - client = Daemon_DaemonAsyncClient(channel: channel!) + client = DaemonClient( + mgmt: Daemon_DaemonAsyncClient(channel: channel!), + sync: Synchronization_SynchronizationAsyncClient(channel: channel!) + ) logger.info( "Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)" ) } catch { logger.error("Failed to connect to gRPC: \(error)") try? await cleanupGRPC() - throw DaemonError.connectionFailure(error) + throw .connectionFailure(error) } } @@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon { public func stop() async { if case .unavailable = state { return } + await transition.wait() + defer { transition.signal() } + logger.info("stopping mutagen daemon") + state = .stopped guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else { // Already stopped @@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon { // "We don't check the response or error, because the daemon // may terminate before it has a chance to send the response." - _ = try? await client?.terminate( + _ = try? await client?.mgmt.terminate( Daemon_TerminateRequest(), callOptions: .init(timeLimit: .timeout(.milliseconds(500))) ) @@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon { """ ) state = .failed(.terminatedUnexpectedly) + return } } @@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon { logger.info("\(line, privacy: .public)") } } + + public func listSessions() async throws -> [FileSyncSession] { + guard case .running = state else { + return [] + } + // TODO: Implement + return [] + } + + public func createSession(with _: FileSyncSession) async throws { + if case .stopped = state { + do throws(DaemonError) { + try await start() + } catch { + state = .failed(error) + return + } + } + // TODO: Add Session + } + + public func deleteSession() async throws { + // TODO: Delete session + await stopIfNoSessions() + } + + private func stopIfNoSessions() async { + let sessions: Synchronization_ListResponse + do { + sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in + req.selection = .with { selection in + selection.all = true + } + }) + } catch { + state = .failed(.daemonStartFailure(error)) + return + } + // If there's no configured sessions, the daemon doesn't need to be running + if sessions.sessionStates.isEmpty { + logger.info("No sync sessions found") + await stop() + } + } +} + +struct DaemonClient { + let mgmt: Daemon_DaemonAsyncClient + let sync: Synchronization_SynchronizationAsyncClient } public enum DaemonState { @@ -191,7 +312,7 @@ public enum DaemonState { case failed(DaemonError) case unavailable - var description: String { + public var description: String { switch self { case .running: "Running" @@ -203,12 +324,27 @@ public enum DaemonState { "Unavailable" } } + + public var color: Color { + switch self { + case .running: + .green + case .stopped: + .gray + case .failed: + .red + case .unavailable: + .gray + } + } } public enum DaemonError: Error { + case daemonNotRunning case daemonStartFailure(Error) case connectionFailure(Error) case terminatedUnexpectedly + case grpcFailure(Error) var description: String { switch self { @@ -218,6 +354,10 @@ public enum DaemonError: Error { "Connection failure: \(error)" case .terminatedUnexpectedly: "Daemon terminated unexpectedly" + case .daemonNotRunning: + "The daemon must be started first" + case let .grpcFailure(error): + "Failed to communicate with daemon: \(error)" } } diff --git a/Coder-Desktop/project.yml b/Coder-Desktop/project.yml index c3c53f99..fb38d35a 100644 --- a/Coder-Desktop/project.yml +++ b/Coder-Desktop/project.yml @@ -116,7 +116,10 @@ packages: exactVersion: 1.24.2 Subprocess: url: https://github.com/jamf/Subprocess - revision: 9d67b79 + revision: 9d67b79 + Semaphore: + url: https://github.com/groue/Semaphore/ + exactVersion: 0.1.0 targets: Coder Desktop: @@ -276,6 +279,7 @@ targets: product: SwiftProtobufPluginLibrary - package: GRPC - package: Subprocess + - package: Semaphore - target: CoderSDK embed: false