Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

chore: create & delete sync sessions over gRPC#119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
ethanndickson merged 6 commits intomainfromethan/create-delete-grpc
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,4 +21,8 @@ final class PreviewFileSync: FileSyncDaemon {
func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}

func deleteSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
}
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -51,11 +51,15 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
loading = true
defer { loading = false }
do throws(DaemonError) {
// TODO: Support selecting & deleting multiple sessions at once
try await fileSync.deleteSessions(ids: [selection!])
if fileSync.sessionState.isEmpty {
// Last session was deleted, stop the daemon
await fileSync.stop()
}
} catch {
deleteError = error
}
await fileSync.refreshSessions()
selection = nil
}
} label: {
Expand All@@ -65,7 +69,17 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
if let selectedSession = fileSync.sessionState.first(where: { $0.id == selection }) {
Divider()
Button {
// TODO: Pause & Unpause
Task {
// TODO: Support pausing & resuming multiple sessions at once
loading = true
defer { loading = false }
switch selectedSession.status {
case .paused:
try await fileSync.resumeSessions(ids: [selectedSession.id])
default:
try await fileSync.pauseSessions(ids: [selectedSession.id])
}
}
} label: {
switch selectedSession.status {
case .paused:
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -68,7 +68,7 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
}.disabled(loading)
.alert("Error", isPresented: Binding(
get: { createError != nil },
set: { if $0 { createError = nil } }
set: { if!$0 { createError = nil } }
)) {} message: {
Text(createError?.description ?? "An unknown error occurred.")
}
Expand All@@ -83,7 +83,6 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
defer { loading = false }
do throws(DaemonError) {
if let existingSession {
// TODO: Support selecting & deleting multiple sessions at once
try await fileSync.deleteSessions(ids: [existingSession.id])
}
try await fileSync.createSession(
Expand Down
4 changes: 4 additions & 0 deletionsCoder-Desktop/Coder-DesktopTests/Util.swift
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -48,6 +48,10 @@ class MockFileSyncDaemon: FileSyncDaemon {
}

func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}

func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}

func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
}

extension Inspection: @unchecked Sendable, @retroactive InspectionEmissary {}
56 changes: 10 additions & 46 deletionsCoder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject {
func refreshSessions() async
func createSession(localPath: String, agentHost: String, remotePath: String) async throws(DaemonError)
func deleteSessions(ids: [String]) async throws(DaemonError)
func pauseSessions(ids: [String]) async throws(DaemonError)
func resumeSessions(ids: [String]) async throws(DaemonError)
}

@MainActor
Expand All@@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon {
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL

// Managing sync sessions could take a while, especially with prompting
let sessionMgmtReqTimeout: TimeAmount = .seconds(15)

// Non-nil when the daemon is running
var client: DaemonClient?
private var group: MultiThreadedEventLoopGroup?
Expand DownExpand Up@@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon {
return
}
await refreshSessions()
if sessionState.isEmpty {
logger.info("No sync sessions found on startup, stopping daemon")
await stop()
}
}
}

Expand DownExpand Up@@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon {
// Already connected
return
}
group = MultiThreadedEventLoopGroup(numberOfThreads:1)
group = MultiThreadedEventLoopGroup(numberOfThreads:2)
Copy link
MemberAuthor

@ethanndicksonethanndicksonMar 24, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We always have two outgoing gRPC requests at once, the prompter, and the actual request we're trying to make. This creates a second OS thread for gRPC to use.

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

tbh 2 os threads might be overkill

do {
channel = try GRPCChannelPool.with(
target: .unixDomainSocket(mutagenDaemonSocket.path),
Expand DownExpand Up@@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon {
logger.info("\(line, privacy: .public)")
}
}

public func refreshSessions() async {
guard case .running = state else { return }
// TODO: Implement
}

public func createSession(
localPath _: String,
agentHost _: String,
remotePath _: String
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
// TODO: Add session
}

public func deleteSessions(ids _: [String]) async throws(DaemonError) {
// TODO: Delete session
await stopIfNoSessions()
}

private func stopIfNoSessions() async {
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.daemonStartFailure(error))
return
}
// If there's no configured sessions, the daemon doesn't need to be running
if sessions.sessionStates.isEmpty {
logger.info("No sync sessions found")
await stop()
}
}
}

struct DaemonClient {
Expand Down
120 changes: 120 additions & 0 deletionsCoder-Desktop/VPNLib/FileSync/FileSyncManagement.swift
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
import NIOCore

public extension MutagenDaemon {
func refreshSessions() async {
guard case .running = state else { return }
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.grpcFailure(error))
return
}
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) }
}

func createSession(
localPath: String,
agentHost: String,
remotePath: String
) async throws(DaemonError) {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
throw error
}
}
let (stream, promptID) = try await host()
defer { stream.cancel() }
let req = Synchronization_CreateRequest.with { req in
req.prompter = promptID
req.specification = .with { spec in
spec.alpha = .with { alpha in
alpha.protocol = .local
alpha.path = localPath
}
spec.beta = .with { beta in
beta.protocol = .ssh
beta.host = agentHost
beta.path = remotePath
}
// TODO: Ingest a config from somewhere
spec.configuration = Synchronization_Configuration()
spec.configurationAlpha = Synchronization_Configuration()
spec.configurationBeta = Synchronization_Configuration()
}
}
do {
// The first creation will need to transfer the agent binary
// TODO: Because this is pretty long, we should show progress updates
// using the prompter messages
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func deleteSessions(ids: [String]) async throws(DaemonError) {
// Terminating sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func pauseSessions(ids: [String]) async throws(DaemonError) {
// Pausing sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}

func resumeSessions(ids: [String]) async throws(DaemonError) {
// Resuming sessions does not require prompting, according to the
// Mutagen CLI
let (stream, promptID) = try await host(allowPrompts: false)
defer { stream.cancel() }
guard case .running = state else { return }
do {
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in
req.prompter = promptID
req.selection = .with { selection in
selection.specifications = ids
}
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
} catch {
throw .grpcFailure(error)
}
await refreshSessions()
}
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp