- Notifications
You must be signed in to change notification settings - Fork3
chore: add mutagen prompting gRPC#118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -19,7 +19,7 @@ public protocol FileSyncDaemon: ObservableObject { | ||
@MainActor | ||
publicclassMutagenDaemon:FileSyncDaemon{ | ||
letlogger=Logger(subsystem:Bundle.main.bundleIdentifier!, category:"mutagen") | ||
@Publishedpublicvarstate:DaemonState=.stopped{ | ||
didSet{ | ||
@@ -42,9 +42,9 @@ public class MutagenDaemon: FileSyncDaemon { | ||
privateletmutagenDaemonSocket:URL | ||
// Non-nil when the daemon is running | ||
varclient:DaemonClient? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Making this | ||
privatevargroup:MultiThreadedEventLoopGroup? | ||
privatevarchannel:GRPCChannel? | ||
// Protect start & stop transitions against re-entrancy | ||
privatelettransition=AsyncSemaphore(value:1) | ||
@@ -171,7 +171,8 @@ public class MutagenDaemon: FileSyncDaemon { | ||
) | ||
client=DaemonClient( | ||
mgmt:Daemon_DaemonAsyncClient(channel: channel!), | ||
sync:Synchronization_SynchronizationAsyncClient(channel: channel!), | ||
prompt:Prompting_PromptingAsyncClient(channel: channel!) | ||
) | ||
logger.info( | ||
"Successfully connected to mutagen daemon, socket:\(self.mutagenDaemonSocket.path, privacy:.public)" | ||
@@ -301,6 +302,7 @@ public class MutagenDaemon: FileSyncDaemon { | ||
structDaemonClient{ | ||
letmgmt:Daemon_DaemonAsyncClient | ||
letsync:Synchronization_SynchronizationAsyncClient | ||
letprompt:Prompting_PromptingAsyncClient | ||
} | ||
publicenumDaemonState{ | ||
@@ -342,6 +344,8 @@ public enum DaemonError: Error { | ||
case connectionFailure(Error) | ||
case terminatedUnexpectedly | ||
case grpcFailure(Error) | ||
case invalidGrpcResponse(String) | ||
case unexpectedStreamClosure | ||
publicvardescription:String{ | ||
switchself{ | ||
@@ -355,6 +359,10 @@ public enum DaemonError: Error { | ||
"The daemon must be started first" | ||
caselet.grpcFailure(error): | ||
"Failed to communicate with daemon:\(error)" | ||
caselet.invalidGrpcResponse(response): | ||
"Invalid gRPC response:\(response)" | ||
case.unexpectedStreamClosure: | ||
"Unexpected stream closure" | ||
} | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import GRPC | ||
extensionMutagenDaemon{ | ||
typealiasPromptStream=GRPCAsyncBidirectionalStreamingCall<Prompting_HostRequest,Prompting_HostResponse> | ||
func host(allowPrompts:Bool=true)asyncthrows(DaemonError)->(PromptStream, identifier:String){ | ||
letstream= client!.prompt.makeHostCall() | ||
do{ | ||
tryawait stream.requestStream.send(.with{ reqin req.allowPrompts= allowPrompts}) | ||
}catch{ | ||
throw.grpcFailure(error) | ||
} | ||
// We can't make call `makeAsyncIterator` more than once | ||
// (as a for-loop would do implicitly) | ||
variter= stream.responseStream.makeAsyncIterator() | ||
letinitResp:Prompting_HostResponse? | ||
do{ | ||
initResp=tryawait iter.next() | ||
}catch{ | ||
throw.grpcFailure(error) | ||
} | ||
guardlet initRespelse{ | ||
throw.unexpectedStreamClosure | ||
} | ||
try initResp.ensureValid(first:true, allowPrompts: allowPrompts) | ||
Task.detached(priority:.background){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. This block is self-contained, and we're currently doing a lot on the main actor already. I was previously running into issues on startup with the VPN code getting starved by file sync code. | ||
do{ | ||
whilelet msg=tryawait iter.next(){ | ||
try msg.ensureValid(first:false, allowPrompts: allowPrompts) | ||
varreply:Prompting_HostRequest=.init() | ||
if msg.isPrompt{ | ||
// Handle SSH key prompts | ||
if msg.message.contains("yes/no/[fingerprint]"){ | ||
reply.response="yes" | ||
} | ||
// Any other messages that require a non-empty response will | ||
// cause the create op to fail, showing an error. This is ok for now. | ||
} | ||
tryawait stream.requestStream.send(reply) | ||
} | ||
}catchlet errorasGRPCStatuswhere error.code==.cancelled{ | ||
return | ||
} catch{ | ||
self.logger.critical("Prompt stream failed:\(error)") | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
} | ||
} | ||
return(stream, identifier: initResp.identifier) | ||
} | ||
} |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.