-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathFileSyncManagement.swift
137 lines (131 loc) · 5.19 KB
/
FileSyncManagement.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import NIOCore
public extension MutagenDaemon {
func refreshSessions() async {
guard case .running = state else { return }
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.grpcFailure(error))
return
}
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) }
}
func createSession(
localPath: String,
agentHost: String,
remotePath: String
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
let (stream, promptID) = try await host()
defer { stream.cancel() }
let req = Synchronization_CreateRequest.with { req in
req.prompter = promptID
req.specification = .with { spec in
spec.alpha = .with { alpha in
alpha.protocol = .local
alpha.path = localPath
}
spec.beta = .with { beta in
beta.protocol = .ssh
beta.host = agentHost
beta.path = remotePath
}
// TODO: Ingest a config from somewhere
spec.configuration = Synchronization_Configuration()
spec.configurationAlpha = Synchronization_Configuration()
spec.configurationBeta = Synchronization_Configuration()
}
}
do {
// The first creation will need to transfer the agent binary
// TODO: Because this is pretty long, we should show progress updates
// using the prompter messages
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func deleteSessions(ids: [String]) async throws(DaemonError) {
// Terminating sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func pauseSessions(ids: [String]) async throws(DaemonError) {
// Pausing sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func resumeSessions(ids: [String]) async throws(DaemonError) {
// Resuming sessions does use prompting, as it may start a new SSH connection
let (stream, promptID) = try await host(allowPrompts: true)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func resetSessions(ids: [String]) async throws(DaemonError) {
// Resetting a session involves pausing & resuming, so it does use prompting
let (stream, promptID) = try await host(allowPrompts: true)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.reset(Synchronization_ResetRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
}