-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathFileSyncManagement.swift
180 lines (168 loc) · 6.28 KB
/
FileSyncManagement.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import NIOCore
public extension MutagenDaemon {
func refreshSessions() async {
guard case .running = state else { return }
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.grpcFailure(error))
return
}
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) }
}
func createSession(
arg: CreateSyncSessionRequest,
promptCallback: (@MainActor (String) -> Void)? = nil
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
let (stream, promptID) = try await host(promptCallback: promptCallback)
defer { stream.cancel() }
let req = Synchronization_CreateRequest.with { req in
req.prompter = promptID
req.specification = .with { spec in
spec.alpha = arg.alpha.mutagenURL
spec.beta = arg.beta.mutagenURL
// TODO: Ingest configs from somewhere
spec.configuration = .with {
// ALWAYS ignore VCS directories for now
// https://mutagen.io/documentation/synchronization/version-control-systems/
$0.ignoreVcsmode = .ignore
}
spec.configurationAlpha = Synchronization_Configuration()
spec.configurationBeta = Synchronization_Configuration()
}
}
do {
// The first creation will need to transfer the agent binary
// TODO: Because this is pretty long, we should show progress updates
// using the prompter messages
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func deleteSessions(ids: [String]) async throws(DaemonError) {
// Terminating sessions does not require prompting, according to the
// Mutagen CLI
do {
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
}
await refreshSessions()
if sessionState.isEmpty {
// Last session was deleted, stop the daemon
await stop()
}
}
func pauseSessions(ids: [String]) async throws(DaemonError) {
// Pausing sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func resumeSessions(ids: [String]) async throws(DaemonError) {
// Resuming sessions does use prompting, as it may start a new SSH connection
let (stream, promptID) = try await host(allowPrompts: true)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
func resetSessions(ids: [String]) async throws(DaemonError) {
// Resetting a session involves pausing & resuming, so it does use prompting
let (stream, promptID) = try await host(allowPrompts: true)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.reset(Synchronization_ResetRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
}
public struct CreateSyncSessionRequest {
public let alpha: Endpoint
public let beta: Endpoint
public init(alpha: Endpoint, beta: Endpoint) {
self.alpha = alpha
self.beta = beta
}
}
public struct Endpoint {
public let path: String
public let protocolKind: ProtocolKind
public init(path: String, protocolKind: ProtocolKind) {
self.path = path
self.protocolKind = protocolKind
}
public enum ProtocolKind {
case local
case ssh(host: String)
}
var mutagenURL: Url_URL {
switch protocolKind {
case .local:
.with { url in
url.path = path
url.protocol = .local
}
case let .ssh(host):
.with { url in
url.path = path
url.protocol = .ssh
url.host = host
}
}
}
}