- Notifications
You must be signed in to change notification settings - Fork3
chore: create & delete sync sessions over gRPC#119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
cb25fe5
1278070
dfd1bc8
1b64444
91a5b36
58f9775
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject { | ||
func refreshSessions() async | ||
func createSession(localPath: String, agentHost: String, remotePath: String) async throws(DaemonError) | ||
func deleteSessions(ids: [String]) async throws(DaemonError) | ||
func pauseSessions(ids: [String]) async throws(DaemonError) | ||
func resumeSessions(ids: [String]) async throws(DaemonError) | ||
} | ||
@MainActor | ||
@@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon { | ||
private let mutagenDataDirectory: URL | ||
private let mutagenDaemonSocket: URL | ||
// Managing sync sessions could take a while, especially with prompting | ||
let sessionMgmtReqTimeout: TimeAmount = .seconds(15) | ||
// Non-nil when the daemon is running | ||
var client: DaemonClient? | ||
private var group: MultiThreadedEventLoopGroup? | ||
@@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon { | ||
return | ||
} | ||
await refreshSessions() | ||
if sessionState.isEmpty { | ||
logger.info("No sync sessions found on startup, stopping daemon") | ||
await stop() | ||
} | ||
} | ||
} | ||
@@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon { | ||
// Already connected | ||
return | ||
} | ||
group = MultiThreadedEventLoopGroup(numberOfThreads:2) | ||
MemberAuthor
| ||
do { | ||
channel = try GRPCChannelPool.with( | ||
target: .unixDomainSocket(mutagenDaemonSocket.path), | ||
@@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon { | ||
logger.info("\(line, privacy: .public)") | ||
} | ||
} | ||
} | ||
struct DaemonClient { | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import NIOCore | ||
public extension MutagenDaemon { | ||
func refreshSessions() async { | ||
guard case .running = state else { return } | ||
let sessions: Synchronization_ListResponse | ||
do { | ||
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in | ||
req.selection = .with { selection in | ||
selection.all = true | ||
} | ||
}) | ||
} catch { | ||
state = .failed(.grpcFailure(error)) | ||
return | ||
} | ||
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) } | ||
} | ||
func createSession( | ||
localPath: String, | ||
agentHost: String, | ||
remotePath: String | ||
) async throws(DaemonError) { | ||
if case .stopped = state { | ||
do throws(DaemonError) { | ||
try await start() | ||
} catch { | ||
state = .failed(error) | ||
throw error | ||
} | ||
} | ||
let (stream, promptID) = try await host() | ||
defer { stream.cancel() } | ||
let req = Synchronization_CreateRequest.with { req in | ||
req.prompter = promptID | ||
req.specification = .with { spec in | ||
spec.alpha = .with { alpha in | ||
alpha.protocol = .local | ||
alpha.path = localPath | ||
} | ||
spec.beta = .with { beta in | ||
beta.protocol = .ssh | ||
beta.host = agentHost | ||
beta.path = remotePath | ||
} | ||
// TODO: Ingest a config from somewhere | ||
spec.configuration = Synchronization_Configuration() | ||
spec.configurationAlpha = Synchronization_Configuration() | ||
spec.configurationBeta = Synchronization_Configuration() | ||
} | ||
} | ||
do { | ||
// The first creation will need to transfer the agent binary | ||
// TODO: Because this is pretty long, we should show progress updates | ||
// using the prompter messages | ||
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4))) | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
await refreshSessions() | ||
} | ||
func deleteSessions(ids: [String]) async throws(DaemonError) { | ||
// Terminating sessions does not require prompting, according to the | ||
// Mutagen CLI | ||
let (stream, promptID) = try await host(allowPrompts: false) | ||
defer { stream.cancel() } | ||
guard case .running = state else { return } | ||
do { | ||
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in | ||
req.prompter = promptID | ||
req.selection = .with { selection in | ||
selection.specifications = ids | ||
} | ||
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
await refreshSessions() | ||
} | ||
func pauseSessions(ids: [String]) async throws(DaemonError) { | ||
// Pausing sessions does not require prompting, according to the | ||
// Mutagen CLI | ||
let (stream, promptID) = try await host(allowPrompts: false) | ||
defer { stream.cancel() } | ||
guard case .running = state else { return } | ||
do { | ||
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in | ||
req.prompter = promptID | ||
req.selection = .with { selection in | ||
selection.specifications = ids | ||
} | ||
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
await refreshSessions() | ||
} | ||
func resumeSessions(ids: [String]) async throws(DaemonError) { | ||
// Resuming sessions does not require prompting, according to the | ||
// Mutagen CLI | ||
let (stream, promptID) = try await host(allowPrompts: false) | ||
defer { stream.cancel() } | ||
guard case .running = state else { return } | ||
do { | ||
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in | ||
req.prompter = promptID | ||
req.selection = .with { selection in | ||
selection.specifications = ids | ||
} | ||
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout))) | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
await refreshSessions() | ||
} | ||
} |
Uh oh!
There was an error while loading.Please reload this page.