import Foundation import GRPC import NIO import os import Semaphore import Subprocess import SwiftUI @MainActor public protocol FileSyncDaemon: ObservableObject { var state: DaemonState { get } var sessionState: [FileSyncSession] { get } var logFile: URL { get } func tryStart() async func stop() async func refreshSessions() async func createSession( arg: CreateSyncSessionRequest, promptCallback: (@MainActor (String) -> Void)? ) async throws(DaemonError) func deleteSessions(ids: [String]) async throws(DaemonError) func pauseSessions(ids: [String]) async throws(DaemonError) func resumeSessions(ids: [String]) async throws(DaemonError) func resetSessions(ids: [String]) async throws(DaemonError) } @MainActor public class MutagenDaemon: FileSyncDaemon { let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen") @Published public var state: DaemonState = .stopped { didSet { logger.info("daemon state set: \(self.state.description, privacy: .public)") if case .failed = state { Task { try? await cleanupGRPC() } mutagenProcess?.kill() mutagenProcess = nil } } } @Published public var sessionState: [FileSyncSession] = [] private var mutagenProcess: Subprocess? private let mutagenPath: URL! private let mutagenDataDirectory: URL private let mutagenDaemonSocket: URL public let logFile: URL // Managing sync sessions could take a while, especially with prompting let sessionMgmtReqTimeout: TimeAmount = .seconds(15) // Non-nil when the daemon is running var client: DaemonClient? private var group: MultiThreadedEventLoopGroup? private var channel: GRPCChannel? private var waitForExit: (@Sendable () async -> Void)? // Protect start & stop transitions against re-entrancy private let transition = AsyncSemaphore(value: 1) public init(mutagenPath: URL? = nil, mutagenDataDirectory: URL = FileManager.default.urls( for: .applicationSupportDirectory, in: .userDomainMask ).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")) { self.mutagenPath = mutagenPath self.mutagenDataDirectory = mutagenDataDirectory mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock") logFile = mutagenDataDirectory.appending(path: "daemon.log") // It shouldn't be fatal if the app was built without Mutagen embedded, // but file sync will be unavailable. if mutagenPath == nil { logger.warning("Mutagen not embedded in app, file sync will be unavailable") state = .unavailable return } } public func tryStart() async { if case .failed = state { state = .stopped } do throws(DaemonError) { try await start() } catch { state = .failed(error) return } await refreshSessions() if sessionState.isEmpty { logger.info("No sync sessions found on startup, stopping daemon") await stop() } } func start() async throws(DaemonError) { if case .unavailable = state { return } // Stop an orphaned daemon, if there is one try? await connect() await stop() // Creating the same process twice from Swift will crash the MainActor, // so we need to wait for an earlier process to die await waitForExit?() await transition.wait() defer { transition.signal() } logger.info("starting mutagen daemon") mutagenProcess = createMutagenProcess() let (standardError, waitForExit): (Pipe.AsyncBytes, @Sendable () async -> Void) do { (_, standardError, waitForExit) = try mutagenProcess!.run() } catch { throw .daemonStartFailure(error) } self.waitForExit = waitForExit Task { await handleDaemonLogs(io: standardError) logger.info("standard error stream closed") } Task { await terminationHandler(waitForExit: waitForExit) } do { try await connect() } catch { throw .daemonStartFailure(error) } try await waitForDaemonStart() state = .running logger.info( """ mutagen daemon started, pid: \(self.mutagenProcess?.pid.description ?? "unknown", privacy: .public) """ ) } // The daemon takes a moment to open the socket, and we don't want to hog the main actor // so poll for it on a background thread private func waitForDaemonStart( maxAttempts: Int = 5, attemptInterval: Duration = .milliseconds(100) ) async throws(DaemonError) { do { try await Task.detached(priority: .background) { for attempt in 0 ... maxAttempts { do { _ = try await self.client!.mgmt.version( Daemon_VersionRequest(), callOptions: .init(timeLimit: .timeout(.milliseconds(500))) ) return } catch { if attempt == maxAttempts { throw error } try? await Task.sleep(for: attemptInterval) } } }.value } catch { throw .daemonStartFailure(error) } } private func connect() async throws(DaemonError) { guard client == nil else { // Already connected return } group = MultiThreadedEventLoopGroup(numberOfThreads: 2) do { channel = try GRPCChannelPool.with( target: .unixDomainSocket(mutagenDaemonSocket.path), transportSecurity: .plaintext, eventLoopGroup: group! ) client = DaemonClient( mgmt: Daemon_DaemonAsyncClient(channel: channel!), sync: Synchronization_SynchronizationAsyncClient(channel: channel!), prompt: Prompting_PromptingAsyncClient(channel: channel!) ) logger.info( "Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)" ) } catch { logger.error("Failed to connect to gRPC: \(error)") try? await cleanupGRPC() throw .connectionFailure(error) } } private func cleanupGRPC() async throws { try? await channel?.close().get() try? await group?.shutdownGracefully() client = nil channel = nil group = nil } public func stop() async { if case .unavailable = state { return } await transition.wait() defer { transition.signal() } logger.info("stopping mutagen daemon") state = .stopped guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else { // Already stopped return } // "We don't check the response or error, because the daemon // may terminate before it has a chance to send the response." _ = try? await client?.mgmt.terminate( Daemon_TerminateRequest(), callOptions: .init(timeLimit: .timeout(.milliseconds(500))) ) try? await cleanupGRPC() mutagenProcess?.kill() mutagenProcess = nil logger.info("Daemon stopped and gRPC connection closed") } private func createMutagenProcess() -> Subprocess { let process = Subprocess([mutagenPath.path, "daemon", "run"]) process.environment = [ "MUTAGEN_DATA_DIRECTORY": mutagenDataDirectory.path, "MUTAGEN_SSH_PATH": "/usr/bin", ] logger.info("setting mutagen data directory: \(self.mutagenDataDirectory.path, privacy: .public)") return process } private func terminationHandler(waitForExit: @Sendable () async -> Void) async { await waitForExit() switch state { case .stopped: logger.info("mutagen daemon stopped") default: logger.error( """ mutagen daemon exited unexpectedly with code: \(self.mutagenProcess?.exitCode.description ?? "unknown") """ ) state = .failed(.terminatedUnexpectedly) return } } private func handleDaemonLogs(io: Pipe.AsyncBytes) async { if !FileManager.default.fileExists(atPath: logFile.path) { guard FileManager.default.createFile(atPath: logFile.path, contents: nil) else { logger.error("Failed to create log file") return } } guard let fileHandle = try? FileHandle(forWritingTo: logFile) else { logger.error("Failed to open log file for writing") return } for await line in io.lines { logger.info("\(line, privacy: .public)") do { try fileHandle.write(contentsOf: Data("\(line)\n".utf8)) } catch { logger.error("Failed to write to daemon log file: \(error)") } } try? fileHandle.close() } } struct DaemonClient { let mgmt: Daemon_DaemonAsyncClient let sync: Synchronization_SynchronizationAsyncClient let prompt: Prompting_PromptingAsyncClient } public enum DaemonState { case running case stopped case failed(DaemonError) case unavailable public var description: String { switch self { case .running: "Running" case .stopped: "Stopped" case let .failed(error): "\(error.description)" case .unavailable: "Unavailable" } } public var color: Color { switch self { case .running: .green case .stopped: .gray case .failed: .red case .unavailable: .gray } } // `if case`s are a pain to work with: they're not bools (such as for ORing) // and you can't negate them without doing `if case .. {} else`. public var isFailed: Bool { if case .failed = self { return true } return false } } public enum DaemonError: Error { case daemonNotRunning case daemonStartFailure(Error) case connectionFailure(Error) case terminatedUnexpectedly case grpcFailure(Error) case invalidGrpcResponse(String) case unexpectedStreamClosure public var description: String { switch self { case let .daemonStartFailure(error): "Daemon start failure: \(error)" case let .connectionFailure(error): "Connection failure: \(error)" case .terminatedUnexpectedly: "Daemon terminated unexpectedly" case .daemonNotRunning: "The daemon must be started first" case let .grpcFailure(error): "Failed to communicate with daemon: \(error)" case let .invalidGrpcResponse(response): "Invalid gRPC response: \(response)" case .unexpectedStreamClosure: "Unexpected stream closure" } } public var localizedDescription: String { description } }