@@ -2,6 +2,7 @@ import Foundation
2
2
import GRPC
3
3
import NIO
4
4
import os
5
+ import Subprocess
5
6
6
7
@MainActor
7
8
public protocol FileSyncDaemon : ObservableObject {
@@ -20,8 +21,7 @@ public class MutagenDaemon: FileSyncDaemon {
20
21
}
21
22
}
22
23
23
- private var mutagenProcess : Process ?
24
- private var mutagenPipe : Pipe ?
24
+ private var mutagenProcess : Subprocess ?
25
25
private let mutagenPath : URL !
26
26
private let mutagenDataDirectory : URL
27
27
private let mutagenDaemonSocket : URL
@@ -58,24 +58,42 @@ public class MutagenDaemon: FileSyncDaemon {
58
58
try ? await connect ( )
59
59
await stop ( )
60
60
61
- ( mutagenProcess, mutagenPipe) = createMutagenProcess ( )
61
+ mutagenProcess = createMutagenProcess ( )
62
+ // swiftlint:disable:next large_tuple
63
+ let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
62
64
do {
63
- try mutagenProcess? . run ( )
65
+ ( standardOutput , standardError , waitForExit ) = try mutagenProcess! . run ( )
64
66
} catch {
65
67
state = . failed( DaemonError . daemonStartFailure ( error) )
68
+ return
69
+ }
70
+
71
+ Task {
72
+ await streamHandler ( io: standardOutput)
73
+ logger. info ( " standard output stream closed " )
74
+ }
75
+
76
+ Task {
77
+ await streamHandler ( io: standardError)
78
+ logger. info ( " standard error stream closed " )
79
+ }
80
+
81
+ Task {
82
+ await terminationHandler ( waitForExit: waitForExit)
66
83
}
67
84
68
85
do {
69
86
try await connect ( )
70
87
} catch {
71
88
state = . failed( DaemonError . daemonStartFailure ( error) )
89
+ return
72
90
}
73
91
74
92
state = . running
75
93
logger. info (
76
94
"""
77
95
mutagen daemon started, pid:
78
- \( self . mutagenProcess? . processIdentifier . description ?? " unknown " , privacy: . public)
96
+ \( self . mutagenProcess? . pid . description ?? " unknown " , privacy: . public)
79
97
"""
80
98
)
81
99
}
@@ -129,46 +147,39 @@ public class MutagenDaemon: FileSyncDaemon {
129
147
130
148
try ? await cleanupGRPC ( )
131
149
132
- mutagenProcess? . terminate ( )
150
+ mutagenProcess? . kill ( )
151
+ mutagenProcess = nil
133
152
logger. info ( " Daemon stopped and gRPC connection closed " )
134
153
}
135
154
136
- private func createMutagenProcess( ) -> ( Process , Pipe ) {
137
- let outputPipe = Pipe ( )
138
- outputPipe. fileHandleForReading. readabilityHandler = logOutput
139
- let process = Process ( )
140
- process. executableURL = mutagenPath
141
- process. arguments = [ " daemon " , " run " ]
142
- logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
155
+ private func createMutagenProcess( ) -> Subprocess {
156
+ let process = Subprocess ( [ mutagenPath. path, " daemon " , " run " ] )
143
157
process. environment = [
144
158
" MUTAGEN_DATA_DIRECTORY " : mutagenDataDirectory. path,
145
159
]
146
- process. standardOutput = outputPipe
147
- process. standardError = outputPipe
148
- process. terminationHandler = terminationHandler
149
- return ( process, outputPipe)
160
+ logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
161
+ return process
150
162
}
151
163
152
- private nonisolated func terminationHandler( process _: Process ) {
153
- Task { @MainActor in
154
- self . mutagenPipe? . fileHandleForReading. readabilityHandler = nil
155
- mutagenProcess = nil
164
+ private func terminationHandler( waitForExit: @Sendable ( ) async -> Void ) async {
165
+ await waitForExit ( )
156
166
157
- try ? await cleanupGRPC ( )
158
-
159
- switch self . state {
160
- case . stopped:
161
- logger. info ( " mutagen daemon stopped " )
162
- return
163
- default :
164
- logger. error ( " mutagen daemon exited unexpectedly " )
165
- self . state = . failed( . terminatedUnexpectedly)
166
- }
167
+ switch state {
168
+ case . stopped:
169
+ logger. info ( " mutagen daemon stopped " )
170
+ default :
171
+ logger. error (
172
+ """
173
+ mutagen daemon exited unexpectedly with code:
174
+ \( self . mutagenProcess? . exitCode. description ?? " unknown " )
175
+ """
176
+ )
177
+ state = . failed( . terminatedUnexpectedly)
167
178
}
168
179
}
169
180
170
- private nonisolated func logOutput ( pipe : FileHandle ) {
171
- if let line = String ( data : pipe . availableData , encoding : . utf8 ) , line != " " {
181
+ private func streamHandler ( io : Pipe . AsyncBytes ) async {
182
+ for await line in io . lines {
172
183
logger. info ( " \( line, privacy: . public) " )
173
184
}
174
185
}
0 commit comments