Skip to content

Commit 9a2f1cf

Browse files
committed
chore: manage mutagen daemon lifecycle
1 parent b7ccbca commit 9a2f1cf

16 files changed

+693
-7
lines changed

.swiftlint.yml

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# TODO: Remove this once the grpc-swift-protobuf generator adds a lint disable comment
2+
excluded:
3+
- "**/*.pb.swift"
4+
- "**/*.grpc.swift"

Coder Desktop/.swiftformat

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
--selfrequired log,info,error,debug,critical,fault
2-
--exclude **.pb.swift
2+
--exclude **.pb.swift,**.grpc.swift
33
--condassignment always

Coder Desktop/Coder Desktop/Coder_DesktopApp.swift

+15-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ class AppDelegate: NSObject, NSApplicationDelegate {
3030
private var menuBar: MenuBarController?
3131
let vpn: CoderVPNService
3232
let state: AppState
33+
let fileSyncDaemon: MutagenDaemon
3334

3435
override init() {
3536
vpn = CoderVPNService()
3637
state = AppState(onChange: vpn.configureTunnelProviderProtocol)
38+
fileSyncDaemon = MutagenDaemon()
3739
}
3840

3941
func applicationDidFinishLaunching(_: Notification) {
@@ -56,14 +58,25 @@ class AppDelegate: NSObject, NSApplicationDelegate {
5658
state.reconfigure()
5759
}
5860
}
61+
// TODO: Start the daemon only once a file sync is configured
62+
Task {
63+
try? await fileSyncDaemon.start()
64+
}
5965
}
6066

6167
// This function MUST eventually call `NSApp.reply(toApplicationShouldTerminate: true)`
6268
// or return `.terminateNow`
6369
func applicationShouldTerminate(_: NSApplication) -> NSApplication.TerminateReply {
64-
if !state.stopVPNOnQuit { return .terminateNow }
6570
Task {
66-
await vpn.stop()
71+
let vpnStop = Task {
72+
if !state.stopVPNOnQuit {
73+
await vpn.stop()
74+
}
75+
}
76+
let fileSyncStop = Task {
77+
try? await fileSyncDaemon.stop()
78+
}
79+
_ = await (vpnStop.value, fileSyncStop.value)
6780
NSApp.reply(toApplicationShouldTerminate: true)
6881
}
6982
return .terminateLater
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import Foundation
2+
import GRPC
3+
import NIO
4+
import os
5+
6+
@MainActor
7+
protocol FileSyncDaemon: ObservableObject {
8+
var state: DaemonState { get }
9+
func start() async throws
10+
func stop() async throws
11+
}
12+
13+
@MainActor
14+
class MutagenDaemon: FileSyncDaemon {
15+
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")
16+
17+
@Published var state: DaemonState = .stopped
18+
19+
private var mutagenProcess: Process?
20+
private var mutagenPipe: Pipe?
21+
private let mutagenPath: URL
22+
private let mutagenDataDirectory: URL
23+
private let mutagenDaemonSocket: URL
24+
25+
private var group: MultiThreadedEventLoopGroup?
26+
private var channel: GRPCChannel?
27+
private var client: Daemon_DaemonAsyncClient?
28+
29+
init() {
30+
#if arch(arm64)
31+
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)!
32+
#elseif arch(x86_64)
33+
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil)!
34+
#else
35+
fatalError("unknown architecture")
36+
#endif
37+
mutagenDataDirectory = FileManager.default.urls(
38+
for: .applicationSupportDirectory,
39+
in: .userDomainMask
40+
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")
41+
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock")
42+
// It shouldn't be fatal if the app was built without Mutagen embedded,
43+
// but file sync will be unavailable.
44+
if !FileManager.default.fileExists(atPath: mutagenPath.path) {
45+
logger.warning("Mutagen not embedded in app, file sync will be unavailable")
46+
state = .unavailable
47+
}
48+
}
49+
50+
func start() async throws {
51+
if case .unavailable = state { return }
52+
53+
// Stop an orphaned daemon, if there is one
54+
try? await connect()
55+
try? await stop()
56+
57+
(mutagenProcess, mutagenPipe) = createMutagenProcess()
58+
do {
59+
try mutagenProcess?.run()
60+
} catch {
61+
state = .failed("Failed to start file sync daemon: \(error)")
62+
throw MutagenDaemonError.daemonStartFailure(error)
63+
}
64+
65+
try await connect()
66+
67+
state = .running
68+
}
69+
70+
private func connect() async throws {
71+
guard client == nil else {
72+
// Already connected
73+
return
74+
}
75+
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
76+
do {
77+
channel = try GRPCChannelPool.with(
78+
target: .unixDomainSocket(mutagenDaemonSocket.path),
79+
transportSecurity: .plaintext,
80+
eventLoopGroup: group!
81+
)
82+
client = Daemon_DaemonAsyncClient(channel: channel!)
83+
logger.info("Successfully connected to mutagen daemon via gRPC")
84+
} catch {
85+
logger.error("Failed to connect to gRPC: \(error)")
86+
try await cleanupGRPC()
87+
throw MutagenDaemonError.connectionFailure(error)
88+
}
89+
}
90+
91+
private func cleanupGRPC() async throws {
92+
try? await channel?.close().get()
93+
try? await group?.shutdownGracefully()
94+
95+
client = nil
96+
channel = nil
97+
group = nil
98+
}
99+
100+
func stop() async throws {
101+
if case .unavailable = state { return }
102+
state = .stopped
103+
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else {
104+
return
105+
}
106+
107+
// "We don't check the response or error, because the daemon
108+
// may terminate before it has a chance to send the response."
109+
_ = try? await client?.terminate(
110+
Daemon_TerminateRequest(),
111+
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
112+
)
113+
114+
// Clean up gRPC connection
115+
try? await cleanupGRPC()
116+
117+
// Ensure the process is terminated
118+
mutagenProcess?.terminate()
119+
logger.info("Daemon stopped and gRPC connection closed")
120+
}
121+
122+
private func createMutagenProcess() -> (Process, Pipe) {
123+
let outputPipe = Pipe()
124+
outputPipe.fileHandleForReading.readabilityHandler = logOutput
125+
let process = Process()
126+
process.executableURL = mutagenPath
127+
process.arguments = ["daemon", "run"]
128+
process.environment = [
129+
"MUTAGEN_DATA_DIRECTORY": mutagenDataDirectory.path,
130+
]
131+
process.standardOutput = outputPipe
132+
process.standardError = outputPipe
133+
process.terminationHandler = terminationHandler
134+
return (process, outputPipe)
135+
}
136+
137+
private nonisolated func terminationHandler(process _: Process) {
138+
Task { @MainActor in
139+
self.mutagenPipe?.fileHandleForReading.readabilityHandler = nil
140+
mutagenProcess = nil
141+
142+
try? await cleanupGRPC()
143+
144+
switch self.state {
145+
case .stopped:
146+
logger.info("mutagen daemon stopped")
147+
return
148+
default:
149+
logger.error("mutagen daemon exited unexpectedly")
150+
self.state = .failed("File sync daemon terminated unexpectedly")
151+
}
152+
}
153+
}
154+
155+
private nonisolated func logOutput(pipe: FileHandle) {
156+
if let line = String(data: pipe.availableData, encoding: .utf8), line != "" {
157+
logger.info("\(line)")
158+
}
159+
}
160+
}
161+
162+
enum DaemonState {
163+
case running
164+
case stopped
165+
case failed(String)
166+
case unavailable
167+
}
168+
169+
enum MutagenDaemonError: Error {
170+
case daemonStartFailure(Error)
171+
case connectionFailure(Error)
172+
}

0 commit comments

Comments
 (0)