Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6463de0

Browse files
chore: create & delete sync sessions over gRPC (#119)
Closes#63.
1 parent2669a1c commit6463de0

File tree

6 files changed

+155
-50
lines changed

6 files changed

+155
-50
lines changed

‎Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,8 @@ final class PreviewFileSync: FileSyncDaemon {
2121
func createSession(localPath _:String, agentHost _:String, remotePath _:String)asyncthrows(DaemonError){}
2222

2323
func deleteSessions(ids _:[String])asyncthrows(VPNLib.DaemonError){}
24+
25+
func pauseSessions(ids _:[String])asyncthrows(VPNLib.DaemonError){}
26+
27+
func resumeSessions(ids _:[String])asyncthrows(VPNLib.DaemonError){}
2428
}

‎Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,15 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
5151
loading=true
5252
defer{ loading=false}
5353
dothrows(DaemonError){
54+
// TODO: Support selecting & deleting multiple sessions at once
5455
tryawait fileSync.deleteSessions(ids:[selection!])
56+
if fileSync.sessionState.isEmpty{
57+
// Last session was deleted, stop the daemon
58+
await fileSync.stop()
59+
}
5560
} catch{
5661
deleteError= error
5762
}
58-
await fileSync.refreshSessions()
5963
selection=nil
6064
}
6165
} label:{
@@ -65,7 +69,17 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
6569
iflet selectedSession= fileSync.sessionState.first(where:{ $0.id== selection}){
6670
Divider()
6771
Button{
68-
// TODO: Pause & Unpause
72+
Task{
73+
// TODO: Support pausing & resuming multiple sessions at once
74+
loading=true
75+
defer{ loading=false}
76+
switch selectedSession.status{
77+
case.paused:
78+
tryawait fileSync.resumeSessions(ids:[selectedSession.id])
79+
default:
80+
tryawait fileSync.pauseSessions(ids:[selectedSession.id])
81+
}
82+
}
6983
} label:{
7084
switch selectedSession.status{
7185
case.paused:

‎Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
6868
}.disabled(loading)
6969
.alert("Error", isPresented:Binding(
7070
get:{ createError!=nil},
71-
set:{if $0{ createError=nil}}
71+
set:{if!$0{ createError=nil}}
7272
)){} message:{
7373
Text(createError?.description??"An unknown error occurred.")
7474
}
@@ -83,7 +83,6 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
8383
defer{ loading=false}
8484
dothrows(DaemonError){
8585
iflet existingSession{
86-
// TODO: Support selecting & deleting multiple sessions at once
8786
tryawait fileSync.deleteSessions(ids:[existingSession.id])
8887
}
8988
tryawait fileSync.createSession(

‎Coder-Desktop/Coder-DesktopTests/Util.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ class MockFileSyncDaemon: FileSyncDaemon {
4848
}
4949

5050
func createSession(localPath _:String, agentHost _:String, remotePath _:String)asyncthrows(DaemonError){}
51+
52+
func pauseSessions(ids _:[String])asyncthrows(VPNLib.DaemonError){}
53+
54+
func resumeSessions(ids _:[String])asyncthrows(VPNLib.DaemonError){}
5155
}
5256

5357
extensionInspection:@uncheckedSendable,@retroactiveInspectionEmissary{}

‎Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject {
1515
func refreshSessions()async
1616
func createSession(localPath:String, agentHost:String, remotePath:String)asyncthrows(DaemonError)
1717
func deleteSessions(ids:[String])asyncthrows(DaemonError)
18+
func pauseSessions(ids:[String])asyncthrows(DaemonError)
19+
func resumeSessions(ids:[String])asyncthrows(DaemonError)
1820
}
1921

2022
@MainActor
@@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon {
4143
privateletmutagenDataDirectory:URL
4244
privateletmutagenDaemonSocket:URL
4345

46+
// Managing sync sessions could take a while, especially with prompting
47+
letsessionMgmtReqTimeout:TimeAmount=.seconds(15)
48+
4449
// Non-nil when the daemon is running
4550
varclient:DaemonClient?
4651
privatevargroup:MultiThreadedEventLoopGroup?
@@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon {
7580
return
7681
}
7782
awaitrefreshSessions()
83+
if sessionState.isEmpty{
84+
logger.info("No sync sessions found on startup, stopping daemon")
85+
awaitstop()
86+
}
7887
}
7988
}
8089

@@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon {
162171
// Already connected
163172
return
164173
}
165-
group=MultiThreadedEventLoopGroup(numberOfThreads:1)
174+
group=MultiThreadedEventLoopGroup(numberOfThreads:2)
166175
do{
167176
channel=tryGRPCChannelPool.with(
168177
target:.unixDomainSocket(mutagenDaemonSocket.path),
@@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon {
252261
logger.info("\(line, privacy:.public)")
253262
}
254263
}
255-
256-
publicfunc refreshSessions()async{
257-
guard case.running= stateelse{return}
258-
// TODO: Implement
259-
}
260-
261-
publicfunc createSession(
262-
localPath _:String,
263-
agentHost _:String,
264-
remotePath _:String
265-
)asyncthrows(DaemonError){
266-
if case.stopped= state{
267-
dothrows(DaemonError){
268-
tryawaitstart()
269-
} catch{
270-
state=.failed(error)
271-
throw error
272-
}
273-
}
274-
// TODO: Add session
275-
}
276-
277-
publicfunc deleteSessions(ids _:[String])asyncthrows(DaemonError){
278-
// TODO: Delete session
279-
awaitstopIfNoSessions()
280-
}
281-
282-
privatefunc stopIfNoSessions()async{
283-
letsessions:Synchronization_ListResponse
284-
do{
285-
sessions=tryawait client!.sync.list(Synchronization_ListRequest.with{ reqin
286-
req.selection=.with{ selectionin
287-
selection.all=true
288-
}
289-
})
290-
}catch{
291-
state=.failed(.daemonStartFailure(error))
292-
return
293-
}
294-
// If there's no configured sessions, the daemon doesn't need to be running
295-
if sessions.sessionStates.isEmpty{
296-
logger.info("No sync sessions found")
297-
awaitstop()
298-
}
299-
}
300264
}
301265

302266
structDaemonClient{
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import NIOCore
2+
3+
publicextensionMutagenDaemon{
4+
func refreshSessions()async{
5+
guard case.running= stateelse{return}
6+
letsessions:Synchronization_ListResponse
7+
do{
8+
sessions=tryawait client!.sync.list(Synchronization_ListRequest.with{ reqin
9+
req.selection=.with{ selectionin
10+
selection.all=true
11+
}
12+
})
13+
}catch{
14+
state=.failed(.grpcFailure(error))
15+
return
16+
}
17+
sessionState= sessions.sessionStates.map{FileSyncSession(state: $0)}
18+
}
19+
20+
func createSession(
21+
localPath:String,
22+
agentHost:String,
23+
remotePath:String
24+
)asyncthrows(DaemonError){
25+
if case.stopped= state{
26+
dothrows(DaemonError){
27+
tryawaitstart()
28+
} catch{
29+
state=.failed(error)
30+
throw error
31+
}
32+
}
33+
let(stream, promptID)=tryawaithost()
34+
defer{ stream.cancel()}
35+
letreq=Synchronization_CreateRequest.with{ reqin
36+
req.prompter= promptID
37+
req.specification=.with{ specin
38+
spec.alpha=.with{ alphain
39+
alpha.protocol=.local
40+
alpha.path= localPath
41+
}
42+
spec.beta=.with{ betain
43+
beta.protocol=.ssh
44+
beta.host= agentHost
45+
beta.path= remotePath
46+
}
47+
// TODO: Ingest a config from somewhere
48+
spec.configuration=Synchronization_Configuration()
49+
spec.configurationAlpha=Synchronization_Configuration()
50+
spec.configurationBeta=Synchronization_Configuration()
51+
}
52+
}
53+
do{
54+
// The first creation will need to transfer the agent binary
55+
// TODO: Because this is pretty long, we should show progress updates
56+
// using the prompter messages
57+
_=tryawait client!.sync.create(req, callOptions:.init(timeLimit:.timeout(sessionMgmtReqTimeout*4)))
58+
}catch{
59+
throw.grpcFailure(error)
60+
}
61+
awaitrefreshSessions()
62+
}
63+
64+
func deleteSessions(ids:[String])asyncthrows(DaemonError){
65+
// Terminating sessions does not require prompting, according to the
66+
// Mutagen CLI
67+
let(stream, promptID)=tryawaithost(allowPrompts:false)
68+
defer{ stream.cancel()}
69+
guard case.running= stateelse{return}
70+
do{
71+
_=tryawait client!.sync.terminate(Synchronization_TerminateRequest.with{ reqin
72+
req.prompter= promptID
73+
req.selection=.with{ selectionin
74+
selection.specifications= ids
75+
}
76+
}, callOptions:.init(timeLimit:.timeout(sessionMgmtReqTimeout)))
77+
}catch{
78+
throw.grpcFailure(error)
79+
}
80+
awaitrefreshSessions()
81+
}
82+
83+
func pauseSessions(ids:[String])asyncthrows(DaemonError){
84+
// Pausing sessions does not require prompting, according to the
85+
// Mutagen CLI
86+
let(stream, promptID)=tryawaithost(allowPrompts:false)
87+
defer{ stream.cancel()}
88+
guard case.running= stateelse{return}
89+
do{
90+
_=tryawait client!.sync.pause(Synchronization_PauseRequest.with{ reqin
91+
req.prompter= promptID
92+
req.selection=.with{ selectionin
93+
selection.specifications= ids
94+
}
95+
}, callOptions:.init(timeLimit:.timeout(sessionMgmtReqTimeout)))
96+
}catch{
97+
throw.grpcFailure(error)
98+
}
99+
awaitrefreshSessions()
100+
}
101+
102+
func resumeSessions(ids:[String])asyncthrows(DaemonError){
103+
// Resuming sessions does not require prompting, according to the
104+
// Mutagen CLI
105+
let(stream, promptID)=tryawaithost(allowPrompts:false)
106+
defer{ stream.cancel()}
107+
guard case.running= stateelse{return}
108+
do{
109+
_=tryawait client!.sync.resume(Synchronization_ResumeRequest.with{ reqin
110+
req.prompter= promptID
111+
req.selection=.with{ selectionin
112+
selection.specifications= ids
113+
}
114+
}, callOptions:.init(timeLimit:.timeout(sessionMgmtReqTimeout)))
115+
}catch{
116+
throw.grpcFailure(error)
117+
}
118+
awaitrefreshSessions()
119+
}
120+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp