@@ -2,13 +2,26 @@ import Foundation
22import GRPC
33import NIO
44import os
5+ import Semaphore
56import Subprocess
7+ import SwiftUI
68
79@MainActor
810public protocol FileSyncDaemon : ObservableObject {
911var state : DaemonState { get }
10- func start( ) async
12+ func start( ) async throws ( DaemonError )
1113func stop( ) async
14+ func listSessions( ) async throws -> [ FileSyncSession ]
15+ func createSession( with: FileSyncSession ) async throws
16+ }
17+
18+ public struct FileSyncSession {
19+ public let id : String
20+ public let name : String
21+ public let localPath : URL
22+ public let workspace : String
23+ public let agent : String
24+ public let remotePath : URL
1225}
1326
1427@MainActor
@@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon {
1730
1831@Published public var state : DaemonState = . stopped{
1932 didSet{
20- logger. info ( " daemon state changed: \( self . state. description, privacy: . public) " )
33+ logger. info ( " daemon state set: \( self . state. description, privacy: . public) " )
34+ if case. failed= state{
35+ Task {
36+ try ? await cleanupGRPC ( )
37+ }
38+ mutagenProcess? . kill ( )
39+ mutagenProcess= nil
40+ }
2141}
2242}
2343
@@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon {
2646private let mutagenDataDirectory : URL
2747private let mutagenDaemonSocket : URL
2848
49+ // Non-nil when the daemon is running
2950private var group : MultiThreadedEventLoopGroup ?
3051private var channel : GRPCChannel ?
31- private var client : Daemon_DaemonAsyncClient ?
32-
33- public init ( ) {
34- #if arch(arm64)
35- mutagenPath= Bundle . main. url ( forResource: " mutagen-darwin-arm64 " , withExtension: nil )
36- #elseif arch(x86_64)
37- mutagenPath= Bundle . main. url ( forResource: " mutagen-darwin-amd64 " , withExtension: nil )
38- #else
39- fatalError ( " unknown architecture " )
40- #endif
41- mutagenDataDirectory= FileManager . default. urls (
42- for: . applicationSupportDirectory,
43- in: . userDomainMask
44- ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " )
52+ private var client : DaemonClient ?
53+
54+ // Protect start & stop transitions against re-entrancy
55+ private let transition = AsyncSemaphore ( value: 1 )
56+
57+ public init ( mutagenPath: URL ? = nil ,
58+ mutagenDataDirectory: URL = FileManager . default. urls (
59+ for: . applicationSupportDirectory,
60+ in: . userDomainMask
61+ ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " ) )
62+ {
63+ self . mutagenPath= mutagenPath
64+ self . mutagenDataDirectory= mutagenDataDirectory
4565 mutagenDaemonSocket= mutagenDataDirectory. appending ( path: " daemon " ) . appending ( path: " daemon.sock " )
4666 // It shouldn't be fatal if the app was built without Mutagen embedded,
4767 // but file sync will be unavailable.
4868if mutagenPath== nil {
4969 logger. warning ( " Mutagen not embedded in app, file sync will be unavailable " )
5070 state= . unavailable
71+ return
72+ }
73+
74+ // If there are sync sessions, the daemon should be running
75+ Task {
76+ do throws ( DaemonError) {
77+ try await start ( )
78+ } catch{
79+ state= . failed( error)
80+ return
81+ }
82+ await stopIfNoSessions ( )
5183}
5284}
5385
54- public func start( ) async {
86+ public func start( ) async throws ( DaemonError ) {
5587if case. unavailable= state{ return }
5688
5789 // Stop an orphaned daemon, if there is one
5890try ? await connect ( )
5991await stop ( )
6092
93+ await transition. wait ( )
94+ defer { transition. signal ( ) }
95+ logger. info ( " starting mutagen daemon " )
96+
6197 mutagenProcess= createMutagenProcess ( )
6298 // swiftlint:disable:next large_tuple
6399let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
64100do {
65101( standardOutput, standardError, waitForExit) = try mutagenProcess!. run ( )
66102} catch {
67- state= . failed( DaemonError . daemonStartFailure ( error) )
68- return
103+ throw . daemonStartFailure( error)
69104}
70105
71106Task {
@@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon {
85120do {
86121try await connect ( )
87122} catch {
88- state= . failed( DaemonError . daemonStartFailure ( error) )
89- return
123+ throw . daemonStartFailure( error)
90124}
91125
126+ try await waitForDaemonStart ( )
127+
92128 state= . running
93129 logger. info (
94130"""
@@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon {
98134)
99135}
100136
137+ // The daemon takes a moment to open the socket, and we don't want to hog the main actor
138+ // so poll for it on a background thread
139+ private func waitForDaemonStart(
140+ maxAttempts: Int = 5 ,
141+ attemptInterval: Duration = . milliseconds( 100 )
142+ ) async throws ( DaemonError) {
143+ do {
144+ try await Task . detached ( priority: . background) {
145+ for attempt in 0 ... maxAttempts{
146+ do {
147+ _= try await self . client!. mgmt. version (
148+ Daemon_VersionRequest ( ) ,
149+ callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
150+ )
151+ return
152+ } catch {
153+ if attempt== maxAttempts{
154+ throw error
155+ }
156+ try ? await Task . sleep ( for: attemptInterval)
157+ }
158+ }
159+ } . value
160+ } catch {
161+ throw . daemonStartFailure( error)
162+ }
163+ }
164+
101165private func connect( ) async throws ( DaemonError) {
102166guard client== nil else {
103167 // Already connected
@@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon {
110174 transportSecurity: . plaintext,
111175 eventLoopGroup: group!
112176)
113- client= Daemon_DaemonAsyncClient ( channel: channel!)
177+ client= DaemonClient (
178+ mgmt: Daemon_DaemonAsyncClient ( channel: channel!) ,
179+ sync: Synchronization_SynchronizationAsyncClient ( channel: channel!)
180+ )
114181 logger. info (
115182" Successfully connected to mutagen daemon, socket: \( self . mutagenDaemonSocket. path, privacy: . public) "
116183)
117184} catch {
118185 logger. error ( " Failed to connect to gRPC: \( error) " )
119186try ? await cleanupGRPC ( )
120- throw DaemonError . connectionFailure ( error)
187+ throw . connectionFailure( error)
121188}
122189}
123190
@@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon {
132199
133200public func stop( ) async {
134201if case. unavailable= state{ return }
202+ await transition. wait ( )
203+ defer { transition. signal ( ) }
204+ logger. info ( " stopping mutagen daemon " )
205+
135206 state= . stopped
136207guard FileManager . default. fileExists ( atPath: mutagenDaemonSocket. path) else {
137208 // Already stopped
@@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon {
140211
141212 // "We don't check the response or error, because the daemon
142213 // may terminate before it has a chance to send the response."
143- _= try ? await client? . terminate (
214+ _= try ? await client? . mgmt . terminate (
144215Daemon_TerminateRequest ( ) ,
145216 callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
146217)
@@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon {
175246"""
176247)
177248 state= . failed( . terminatedUnexpectedly)
249+ return
178250}
179251}
180252
@@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon {
183255 logger. info ( " \( line, privacy: . public) " )
184256}
185257}
258+
259+ public func listSessions( ) async throws -> [ FileSyncSession ] {
260+ guard case. running= stateelse {
261+ return [ ]
262+ }
263+ // TODO: Implement
264+ return [ ]
265+ }
266+
267+ public func createSession( with _: FileSyncSession ) async throws {
268+ if case. stopped= state{
269+ do throws ( DaemonError) {
270+ try await start ( )
271+ } catch{
272+ state= . failed( error)
273+ return
274+ }
275+ }
276+ // TODO: Add Session
277+ }
278+
279+ public func deleteSession( ) async throws {
280+ // TODO: Delete session
281+ await stopIfNoSessions ( )
282+ }
283+
284+ private func stopIfNoSessions( ) async {
285+ let sessions : Synchronization_ListResponse
286+ do {
287+ sessions= try await client!. sync. list ( Synchronization_ListRequest . with { reqin
288+ req. selection= . with{ selectionin
289+ selection. all= true
290+ }
291+ } )
292+ } catch {
293+ state= . failed( . daemonStartFailure( error) )
294+ return
295+ }
296+ // If there's no configured sessions, the daemon doesn't need to be running
297+ if sessions. sessionStates. isEmpty{
298+ logger. info ( " No sync sessions found " )
299+ await stop ( )
300+ }
301+ }
302+ }
303+
304+ struct DaemonClient {
305+ let mgmt : Daemon_DaemonAsyncClient
306+ let sync : Synchronization_SynchronizationAsyncClient
186307}
187308
188309public enum DaemonState {
@@ -191,7 +312,7 @@ public enum DaemonState {
191312case failed( DaemonError )
192313case unavailable
193314
194- var description : String {
315+ public var description : String {
195316switch self {
196317case . running:
197318" Running "
@@ -203,12 +324,27 @@ public enum DaemonState {
203324" Unavailable "
204325}
205326}
327+
328+ public var color : Color {
329+ switch self {
330+ case . running:
331+ . green
332+ case . stopped:
333+ . gray
334+ case . failed:
335+ . red
336+ case . unavailable:
337+ . gray
338+ }
339+ }
206340}
207341
208342public enum DaemonError : Error {
343+ case daemonNotRunning
209344case daemonStartFailure( Error )
210345case connectionFailure( Error )
211346case terminatedUnexpectedly
347+ case grpcFailure( Error )
212348
213349var description : String {
214350switch self {
@@ -218,6 +354,10 @@ public enum DaemonError: Error {
218354" Connection failure: \( error) "
219355case . terminatedUnexpectedly:
220356" Daemon terminated unexpectedly "
357+ case . daemonNotRunning:
358+ " The daemon must be started first "
359+ case let . grpcFailure( error) :
360+ " Failed to communicate with daemon: \( error) "
221361}
222362}
223363