@@ -2,6 +2,7 @@ import Foundation
22import GRPC
33import NIO
44import os
5+ import Subprocess
56
67@MainActor
78public protocol FileSyncDaemon : ObservableObject {
@@ -20,8 +21,7 @@ public class MutagenDaemon: FileSyncDaemon {
2021}
2122}
2223
23- private var mutagenProcess : Process ?
24- private var mutagenPipe : Pipe ?
24+ private var mutagenProcess : Subprocess ?
2525private let mutagenPath : URL !
2626private let mutagenDataDirectory : URL
2727private let mutagenDaemonSocket : URL
@@ -58,24 +58,42 @@ public class MutagenDaemon: FileSyncDaemon {
5858try ? await connect ( )
5959await stop ( )
6060
61- ( mutagenProcess, mutagenPipe) = createMutagenProcess ( )
61+ mutagenProcess= createMutagenProcess ( )
62+ // swiftlint:disable:next large_tuple
63+ let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
6264do {
63- try mutagenProcess? . run ( )
65+ ( standardOutput , standardError , waitForExit ) = try mutagenProcess! . run ( )
6466} catch {
6567 state= . failed( DaemonError . daemonStartFailure ( error) )
68+ return
69+ }
70+
71+ Task {
72+ await streamHandler ( io: standardOutput)
73+ logger. info ( " standard output stream closed " )
74+ }
75+
76+ Task {
77+ await streamHandler ( io: standardError)
78+ logger. info ( " standard error stream closed " )
79+ }
80+
81+ Task {
82+ await terminationHandler ( waitForExit: waitForExit)
6683}
6784
6885do {
6986try await connect ( )
7087} catch {
7188 state= . failed( DaemonError . daemonStartFailure ( error) )
89+ return
7290}
7391
7492 state= . running
7593 logger. info (
7694"""
7795 mutagen daemon started, pid:
78- \( self . mutagenProcess? . processIdentifier . description?? " unknown " , privacy: . public)
96+ \( self . mutagenProcess? . pid . description?? " unknown " , privacy: . public)
7997"""
8098)
8199}
@@ -129,46 +147,39 @@ public class MutagenDaemon: FileSyncDaemon {
129147
130148try ? await cleanupGRPC ( )
131149
132- mutagenProcess? . terminate ( )
150+ mutagenProcess? . kill ( )
151+ mutagenProcess= nil
133152 logger. info ( " Daemon stopped and gRPC connection closed " )
134153}
135154
136- private func createMutagenProcess( ) -> ( Process , Pipe ) {
137- let outputPipe = Pipe ( )
138- outputPipe. fileHandleForReading. readabilityHandler= logOutput
139- let process = Process ( )
140- process. executableURL= mutagenPath
141- process. arguments= [ " daemon " , " run " ]
142- logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
155+ private func createMutagenProcess( ) -> Subprocess {
156+ let process = Subprocess ( [ mutagenPath. path, " daemon " , " run " ] )
143157 process. environment= [
144158" MUTAGEN_DATA_DIRECTORY " : mutagenDataDirectory. path,
145159]
146- process. standardOutput= outputPipe
147- process. standardError= outputPipe
148- process. terminationHandler= terminationHandler
149- return ( process, outputPipe)
160+ logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
161+ return process
150162}
151163
152- private nonisolated func terminationHandler( process _: Process ) {
153- Task { @MainActor in
154- self . mutagenPipe? . fileHandleForReading. readabilityHandler= nil
155- mutagenProcess= nil
164+ private func terminationHandler( waitForExit: @Sendable ( ) async -> Void ) async {
165+ await waitForExit ( )
156166
157- try ? await cleanupGRPC ( )
158-
159- switch self . state{
160- case . stopped:
161- logger. info ( " mutagen daemon stopped " )
162- return
163- default :
164- logger. error ( " mutagen daemon exited unexpectedly " )
165- self . state= . failed( . terminatedUnexpectedly)
166- }
167+ switch state{
168+ case . stopped:
169+ logger. info ( " mutagen daemon stopped " )
170+ default :
171+ logger. error (
172+ """
173+ mutagen daemon exited unexpectedly with code:
174+ \( self . mutagenProcess? . exitCode. description?? " unknown " )
175+ """
176+ )
177+ state= . failed( . terminatedUnexpectedly)
167178}
168179}
169180
170- private nonisolated func logOutput ( pipe : FileHandle ) {
171- if let line= String ( data : pipe . availableData , encoding : . utf8 ) , line != " " {
181+ private func streamHandler ( io : Pipe . AsyncBytes ) async {
182+ for await line in io . lines {
172183 logger. info ( " \( line, privacy: . public) " )
173184}
174185}