diff --git a/Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift b/Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift index 8db30e3..082c144 100644 --- a/Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift +++ b/Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift @@ -21,4 +21,8 @@ final class PreviewFileSync: FileSyncDaemon { func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {} func deleteSessions(ids _: [String]) async throws(VPNLib.DaemonError) {} + + func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {} + + func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {} } diff --git a/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift b/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift index dc83c17..5a7257b 100644 --- a/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift +++ b/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift @@ -51,11 +51,15 @@ struct FileSyncConfig: View { loading = true defer { loading = false } do throws(DaemonError) { + // TODO: Support selecting & deleting multiple sessions at once try await fileSync.deleteSessions(ids: [selection!]) + if fileSync.sessionState.isEmpty { + // Last session was deleted, stop the daemon + await fileSync.stop() + } } catch { deleteError = error } - await fileSync.refreshSessions() selection = nil } } label: { @@ -65,7 +69,17 @@ struct FileSyncConfig: View { if let selectedSession = fileSync.sessionState.first(where: { $0.id == selection }) { Divider() Button { - // TODO: Pause & Unpause + Task { + // TODO: Support pausing & resuming multiple sessions at once + loading = true + defer { loading = false } + switch selectedSession.status { + case .paused: + try await fileSync.resumeSessions(ids: [selectedSession.id]) + default: + try await fileSync.pauseSessions(ids: [selectedSession.id]) + } + } } label: { switch selectedSession.status { case .paused: diff --git a/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift b/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift index c0c7a35..d398172 100644 --- a/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift +++ b/Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift @@ -68,7 +68,7 @@ struct FileSyncSessionModal: View { }.disabled(loading) .alert("Error", isPresented: Binding( get: { createError != nil }, - set: { if $0 { createError = nil } } + set: { if !$0 { createError = nil } } )) {} message: { Text(createError?.description ?? "An unknown error occurred.") } @@ -83,7 +83,6 @@ struct FileSyncSessionModal: View { defer { loading = false } do throws(DaemonError) { if let existingSession { - // TODO: Support selecting & deleting multiple sessions at once try await fileSync.deleteSessions(ids: [existingSession.id]) } try await fileSync.createSession( diff --git a/Coder-Desktop/Coder-DesktopTests/Util.swift b/Coder-Desktop/Coder-DesktopTests/Util.swift index e38fe33..cad7eac 100644 --- a/Coder-Desktop/Coder-DesktopTests/Util.swift +++ b/Coder-Desktop/Coder-DesktopTests/Util.swift @@ -48,6 +48,10 @@ class MockFileSyncDaemon: FileSyncDaemon { } func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {} + + func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {} + + func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {} } extension Inspection: @unchecked Sendable, @retroactive InspectionEmissary {} diff --git a/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift b/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift index eafd4dc..2adce4b 100644 --- a/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift +++ b/Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift @@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject { func refreshSessions() async func createSession(localPath: String, agentHost: String, remotePath: String) async throws(DaemonError) func deleteSessions(ids: [String]) async throws(DaemonError) + func pauseSessions(ids: [String]) async throws(DaemonError) + func resumeSessions(ids: [String]) async throws(DaemonError) } @MainActor @@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon { private let mutagenDataDirectory: URL private let mutagenDaemonSocket: URL + // Managing sync sessions could take a while, especially with prompting + let sessionMgmtReqTimeout: TimeAmount = .seconds(15) + // Non-nil when the daemon is running var client: DaemonClient? private var group: MultiThreadedEventLoopGroup? @@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon { return } await refreshSessions() + if sessionState.isEmpty { + logger.info("No sync sessions found on startup, stopping daemon") + await stop() + } } } @@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon { // Already connected return } - group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + group = MultiThreadedEventLoopGroup(numberOfThreads: 2) do { channel = try GRPCChannelPool.with( target: .unixDomainSocket(mutagenDaemonSocket.path), @@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon { logger.info("\(line, privacy: .public)") } } - - public func refreshSessions() async { - guard case .running = state else { return } - // TODO: Implement - } - - public func createSession( - localPath _: String, - agentHost _: String, - remotePath _: String - ) async throws(DaemonError) { - if case .stopped = state { - do throws(DaemonError) { - try await start() - } catch { - state = .failed(error) - throw error - } - } - // TODO: Add session - } - - public func deleteSessions(ids _: [String]) async throws(DaemonError) { - // TODO: Delete session - await stopIfNoSessions() - } - - private func stopIfNoSessions() async { - let sessions: Synchronization_ListResponse - do { - sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in - req.selection = .with { selection in - selection.all = true - } - }) - } catch { - state = .failed(.daemonStartFailure(error)) - return - } - // If there's no configured sessions, the daemon doesn't need to be running - if sessions.sessionStates.isEmpty { - logger.info("No sync sessions found") - await stop() - } - } } struct DaemonClient { diff --git a/Coder-Desktop/VPNLib/FileSync/FileSyncManagement.swift b/Coder-Desktop/VPNLib/FileSync/FileSyncManagement.swift new file mode 100644 index 0000000..c826fa7 --- /dev/null +++ b/Coder-Desktop/VPNLib/FileSync/FileSyncManagement.swift @@ -0,0 +1,120 @@ +import NIOCore + +public extension MutagenDaemon { + func refreshSessions() async { + guard case .running = state else { return } + let sessions: Synchronization_ListResponse + do { + sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in + req.selection = .with { selection in + selection.all = true + } + }) + } catch { + state = .failed(.grpcFailure(error)) + return + } + sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) } + } + + func createSession( + localPath: String, + agentHost: String, + remotePath: String + ) async throws(DaemonError) { + if case .stopped = state { + do throws(DaemonError) { + try await start() + } catch { + state = .failed(error) + throw error + } + } + let (stream, promptID) = try await host() + defer { stream.cancel() } + let req = Synchronization_CreateRequest.with { req in + req.prompter = promptID + req.specification = .with { spec in + spec.alpha = .with { alpha in + alpha.protocol = .local + alpha.path = localPath + } + spec.beta = .with { beta in + beta.protocol = .ssh + beta.host = agentHost + beta.path = remotePath + } + // TODO: Ingest a config from somewhere + spec.configuration = Synchronization_Configuration() + spec.configurationAlpha = Synchronization_Configuration() + spec.configurationBeta = Synchronization_Configuration() + } + } + do { + // The first creation will need to transfer the agent binary + // TODO: Because this is pretty long, we should show progress updates + // using the prompter messages + _ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4))) + } catch { + throw .grpcFailure(error) + } + await refreshSessions() + } + + func deleteSessions(ids: [String]) async throws(DaemonError) { + // Terminating sessions does not require prompting, according to the + // Mutagen CLI + let (stream, promptID) = try await host(allowPrompts: false) + defer { stream.cancel() } + guard case .running = state else { return } + do { + _ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in + req.prompter = promptID + req.selection = .with { selection in + selection.specifications = ids + } + }, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) + } catch { + throw .grpcFailure(error) + } + await refreshSessions() + } + + func pauseSessions(ids: [String]) async throws(DaemonError) { + // Pausing sessions does not require prompting, according to the + // Mutagen CLI + let (stream, promptID) = try await host(allowPrompts: false) + defer { stream.cancel() } + guard case .running = state else { return } + do { + _ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in + req.prompter = promptID + req.selection = .with { selection in + selection.specifications = ids + } + }, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) + } catch { + throw .grpcFailure(error) + } + await refreshSessions() + } + + func resumeSessions(ids: [String]) async throws(DaemonError) { + // Resuming sessions does not require prompting, according to the + // Mutagen CLI + let (stream, promptID) = try await host(allowPrompts: false) + defer { stream.cancel() } + guard case .running = state else { return } + do { + _ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in + req.prompter = promptID + req.selection = .with { selection in + selection.specifications = ids + } + }, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) + } catch { + throw .grpcFailure(error) + } + await refreshSessions() + } +}