Expand Up @@ -2,13 +2,26 @@ import Foundation import GRPC import NIO import os import Semaphore import Subprocess import SwiftUI @MainActor public protocol FileSyncDaemon: ObservableObject { var state: DaemonState { get } func start() async func start() async throws(DaemonError) func stop() async func listSessions() async throws -> [FileSyncSession] func createSession(with: FileSyncSession) async throws } public struct FileSyncSession { public let id: String public let name: String public let localPath: URL public let workspace: String public let agent: String public let remotePath: URL } @MainActor Expand All @@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon { @Published public var state: DaemonState = .stopped { didSet { logger.info("daemon state changed: \(self.state.description, privacy: .public)") logger.info("daemon state set: \(self.state.description, privacy: .public)") if case .failed = state { Task { try? await cleanupGRPC() } mutagenProcess?.kill() mutagenProcess = nil } } } Expand All @@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon { private let mutagenDataDirectory: URL private let mutagenDaemonSocket: URL // Non-nil when the daemon is running private var group: MultiThreadedEventLoopGroup? private var channel: GRPCChannel? private var client: Daemon_DaemonAsyncClient? public init() { #if arch(arm64) mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil) #elseif arch(x86_64) mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil) #else fatalError("unknown architecture") #endif mutagenDataDirectory = FileManager.default.urls( for: .applicationSupportDirectory, in: .userDomainMask ).first!.appending(path: "Coder Desktop").appending(path: "Mutagen") private var client: DaemonClient? // Protect start & stop transitions against re-entrancy private let transition = AsyncSemaphore(value: 1) public init(mutagenPath: URL? = nil, mutagenDataDirectory: URL = FileManager.default.urls( for: .applicationSupportDirectory, in: .userDomainMask ).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")) { self.mutagenPath = mutagenPath self.mutagenDataDirectory = mutagenDataDirectory mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock") // It shouldn't be fatal if the app was built without Mutagen embedded, // but file sync will be unavailable. if mutagenPath == nil { logger.warning("Mutagen not embedded in app, file sync will be unavailable") state = .unavailable return } // If there are sync sessions, the daemon should be running Task { do throws(DaemonError) { try await start() } catch { state = .failed(error) return } await stopIfNoSessions() } } public func start() async { public func start() asyncthrows(DaemonError) { if case .unavailable = state { return } // Stop an orphaned daemon, if there is one try? await connect() await stop() await transition.wait() defer { transition.signal() } logger.info("starting mutagen daemon") mutagenProcess = createMutagenProcess() // swiftlint:disable:next large_tuple let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void) do { (standardOutput, standardError, waitForExit) = try mutagenProcess!.run() } catch { state = .failed(DaemonError.daemonStartFailure(error)) return throw .daemonStartFailure(error) } Task { Expand All @@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon { do { try await connect() } catch { state = .failed(DaemonError.daemonStartFailure(error)) return throw .daemonStartFailure(error) } try await waitForDaemonStart() state = .running logger.info( """ Expand All @@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon { ) } // The daemon takes a moment to open the socket, and we don't want to hog the main actor // so poll for it on a background thread private func waitForDaemonStart( maxAttempts: Int = 5, attemptInterval: Duration = .milliseconds(100) ) async throws(DaemonError) { do { try await Task.detached(priority: .background) { for attempt in 0 ... maxAttempts { do { _ = try await self.client!.mgmt.version( Daemon_VersionRequest(), callOptions: .init(timeLimit: .timeout(.milliseconds(500))) ) return } catch { if attempt == maxAttempts { throw error } try? await Task.sleep(for: attemptInterval) } } }.value } catch { throw .daemonStartFailure(error) } } private func connect() async throws(DaemonError) { guard client == nil else { // Already connected Expand All @@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon { transportSecurity: .plaintext, eventLoopGroup: group! ) client = Daemon_DaemonAsyncClient(channel: channel!) client = DaemonClient( mgmt: Daemon_DaemonAsyncClient(channel: channel!), sync: Synchronization_SynchronizationAsyncClient(channel: channel!) ) logger.info( "Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)" ) } catch { logger.error("Failed to connect to gRPC: \(error)") try? await cleanupGRPC() throwDaemonError .connectionFailure(error) throw .connectionFailure(error) } } Expand All @@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon { public func stop() async { if case .unavailable = state { return } await transition.wait() defer { transition.signal() } logger.info("stopping mutagen daemon") state = .stopped guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else { // Already stopped Expand All @@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon { // "We don't check the response or error, because the daemon // may terminate before it has a chance to send the response." _ = try? await client?.terminate( _ = try? await client?.mgmt. terminate( Daemon_TerminateRequest(), callOptions: .init(timeLimit: .timeout(.milliseconds(500))) ) Expand Down Expand Up @@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon { """ ) state = .failed(.terminatedUnexpectedly) return } } Expand All @@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon { logger.info("\(line, privacy: .public)") } } public func listSessions() async throws -> [FileSyncSession] { guard case .running = state else { return [] } // TODO: Implement return [] } public func createSession(with _: FileSyncSession) async throws { if case .stopped = state { do throws(DaemonError) { try await start() } catch { state = .failed(error) return } } // TODO: Add Session } public func deleteSession() async throws { // TODO: Delete session await stopIfNoSessions() } private func stopIfNoSessions() async { let sessions: Synchronization_ListResponse do { sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in req.selection = .with { selection in selection.all = true } }) } catch { state = .failed(.daemonStartFailure(error)) return } // If there's no configured sessions, the daemon doesn't need to be running if sessions.sessionStates.isEmpty { logger.info("No sync sessions found") await stop() } } } struct DaemonClient { let mgmt: Daemon_DaemonAsyncClient let sync: Synchronization_SynchronizationAsyncClient } public enum DaemonState { Expand All @@ -191,7 +312,7 @@ public enum DaemonState { case failed(DaemonError) case unavailable var description: String { public var description: String { switch self { case .running: "Running" Expand All @@ -203,12 +324,27 @@ public enum DaemonState { "Unavailable" } } public var color: Color { switch self { case .running: .green case .stopped: .gray case .failed: .red case .unavailable: .gray } } } public enum DaemonError: Error { case daemonNotRunning case daemonStartFailure(Error) case connectionFailure(Error) case terminatedUnexpectedly case grpcFailure(Error) var description: String { switch self { Expand All @@ -218,6 +354,10 @@ public enum DaemonError: Error { "Connection failure: \(error)" case .terminatedUnexpectedly: "Daemon terminated unexpectedly" case .daemonNotRunning: "The daemon must be started first" case let .grpcFailure(error): "Failed to communicate with daemon: \(error)" } } Expand Down