- Notifications
You must be signed in to change notification settings - Fork5
chore: add mutagen prompting gRPC#118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -19,7 +19,7 @@ public protocol FileSyncDaemon: ObservableObject { | ||
@MainActor | ||
public class MutagenDaemon: FileSyncDaemon { | ||
let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen") | ||
@Published public var state: DaemonState = .stopped { | ||
didSet { | ||
@@ -42,9 +42,9 @@ public class MutagenDaemon: FileSyncDaemon { | ||
private let mutagenDaemonSocket: URL | ||
// Non-nil when the daemon is running | ||
var client: DaemonClient? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Making this | ||
private var group: MultiThreadedEventLoopGroup? | ||
private var channel: GRPCChannel? | ||
// Protect start & stop transitions against re-entrancy | ||
private let transition = AsyncSemaphore(value: 1) | ||
@@ -171,7 +171,8 @@ public class MutagenDaemon: FileSyncDaemon { | ||
) | ||
client = DaemonClient( | ||
mgmt: Daemon_DaemonAsyncClient(channel: channel!), | ||
sync: Synchronization_SynchronizationAsyncClient(channel: channel!), | ||
prompt: Prompting_PromptingAsyncClient(channel: channel!) | ||
) | ||
logger.info( | ||
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)" | ||
@@ -301,6 +302,7 @@ public class MutagenDaemon: FileSyncDaemon { | ||
struct DaemonClient { | ||
let mgmt: Daemon_DaemonAsyncClient | ||
let sync: Synchronization_SynchronizationAsyncClient | ||
let prompt: Prompting_PromptingAsyncClient | ||
} | ||
public enum DaemonState { | ||
@@ -342,6 +344,8 @@ public enum DaemonError: Error { | ||
case connectionFailure(Error) | ||
case terminatedUnexpectedly | ||
case grpcFailure(Error) | ||
case invalidGrpcResponse(String) | ||
case unexpectedStreamClosure | ||
public var description: String { | ||
switch self { | ||
@@ -355,6 +359,10 @@ public enum DaemonError: Error { | ||
"The daemon must be started first" | ||
case let .grpcFailure(error): | ||
"Failed to communicate with daemon: \(error)" | ||
case let .invalidGrpcResponse(response): | ||
"Invalid gRPC response: \(response)" | ||
case .unexpectedStreamClosure: | ||
"Unexpected stream closure" | ||
} | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import GRPC | ||
extension MutagenDaemon { | ||
typealias PromptStream = GRPCAsyncBidirectionalStreamingCall<Prompting_HostRequest, Prompting_HostResponse> | ||
func host(allowPrompts: Bool = true) async throws(DaemonError) -> (PromptStream, identifier: String) { | ||
let stream = client!.prompt.makeHostCall() | ||
do { | ||
try await stream.requestStream.send(.with { req in req.allowPrompts = allowPrompts }) | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
// We can't make call `makeAsyncIterator` more than once | ||
// (as a for-loop would do implicitly) | ||
var iter = stream.responseStream.makeAsyncIterator() | ||
let initResp: Prompting_HostResponse? | ||
do { | ||
initResp = try await iter.next() | ||
} catch { | ||
throw .grpcFailure(error) | ||
} | ||
guard let initResp else { | ||
throw .unexpectedStreamClosure | ||
} | ||
try initResp.ensureValid(first: true, allowPrompts: allowPrompts) | ||
Task.detached(priority: .background) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. This block is self-contained, and we're currently doing a lot on the main actor already. I was previously running into issues on startup with the VPN code getting starved by file sync code. | ||
do { | ||
while let msg = try await iter.next() { | ||
try msg.ensureValid(first: false, allowPrompts: allowPrompts) | ||
var reply: Prompting_HostRequest = .init() | ||
if msg.isPrompt { | ||
// Handle SSH key prompts | ||
if msg.message.contains("yes/no/[fingerprint]") { | ||
reply.response = "yes" | ||
} | ||
// Any other messages that require a non-empty response will | ||
// cause the create op to fail, showing an error. This is ok for now. | ||
} | ||
try await stream.requestStream.send(reply) | ||
} | ||
} catch let error as GRPCStatus where error.code == .cancelled { | ||
return | ||
} catch { | ||
self.logger.critical("Prompt stream failed: \(error)") | ||
ethanndickson marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
} | ||
} | ||
return (stream, identifier: initResp.identifier) | ||
} | ||
} |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.