Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf8d7628

Browse files
committed
Add status watcher to MCP server
Since we can now get status updates from two places, they are placed ina queue so we can handle them one at a time.
1 parentb02460d commitf8d7628

File tree

6 files changed

+456
-35
lines changed

6 files changed

+456
-35
lines changed

‎cli/exp_mcp.go

Lines changed: 187 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8+
"net/url"
89
"os"
910
"path/filepath"
1011
"slices"
@@ -15,8 +16,10 @@ import (
1516
"github.com/spf13/afero"
1617
"golang.org/x/xerrors"
1718

19+
agentapi"github.com/coder/agentapi-sdk-go"
1820
"github.com/coder/coder/v2/buildinfo"
1921
"github.com/coder/coder/v2/cli/cliui"
22+
"github.com/coder/coder/v2/cli/cliutil"
2023
"github.com/coder/coder/v2/codersdk"
2124
"github.com/coder/coder/v2/codersdk/agentsdk"
2225
"github.com/coder/coder/v2/codersdk/toolsdk"
@@ -25,6 +28,7 @@ import (
2528

2629
const (
2730
envAppStatusSlug="CODER_MCP_APP_STATUS_SLUG"
31+
envLLMAgentURL="CODER_MCP_LLM_AGENT_URL"
2832
)
2933

3034
func (r*RootCmd)mcpCommand()*serpent.Command {
@@ -347,10 +351,20 @@ func (*RootCmd) mcpConfigureCursor() *serpent.Command {
347351
returncmd
348352
}
349353

354+
typereportTaskstruct {
355+
linkstring
356+
messageIDint64
357+
selfReportedbool
358+
state codersdk.WorkspaceAppStatusState
359+
summarystring
360+
}
361+
350362
typemcpServerstruct {
351363
agentClient*agentsdk.Client
352364
appStatusSlugstring
353365
client*codersdk.Client
366+
llmClient*agentapi.Client
367+
queue*cliutil.Queue[reportTask]
354368
}
355369

356370
func (r*RootCmd)mcpServer()*serpent.Command {
@@ -359,12 +373,14 @@ func (r *RootCmd) mcpServer() *serpent.Command {
359373
instructionsstring
360374
allowedTools []string
361375
appStatusSlugstring
376+
llmAgentURL url.URL
362377
)
363378
return&serpent.Command{
364379
Use:"server",
365380
Handler:func(inv*serpent.Invocation)error {
366381
srv:=&mcpServer{
367382
appStatusSlug:appStatusSlug,
383+
queue: cliutil.NewQueue[reportTask](10),
368384
}
369385

370386
// Display client URL separately from authentication status.
@@ -408,8 +424,36 @@ func (r *RootCmd) mcpServer() *serpent.Command {
408424
cliui.Infof(inv.Stderr,"Task reporter : Enabled")
409425
}
410426

411-
// Start the server.
412-
returnsrv.start(inv,instructions,allowedTools)
427+
// Try to create a client for the LLM agent API, which is used to get the
428+
// screen status to make the status reporting more robust. No auth
429+
// needed, so no validation.
430+
ifllmAgentURL.String()=="" {
431+
cliui.Infof(inv.Stderr,"LLM agent URL : Not configured")
432+
}else {
433+
cliui.Infof(inv.Stderr,"LLM agent URL : %s",llmAgentURL.String())
434+
llmClient,err:=agentapi.NewClient(llmAgentURL.String())
435+
iferr!=nil {
436+
cliui.Infof(inv.Stderr,"Screen events : Disabled")
437+
cliui.Warnf(inv.Stderr,"%s must be set",envLLMAgentURL)
438+
}else {
439+
cliui.Infof(inv.Stderr,"Screen events : Enabled")
440+
srv.llmClient=llmClient
441+
}
442+
}
443+
444+
ctx,cancel:=context.WithCancel(inv.Context())
445+
defercancel()
446+
defersrv.queue.Close()
447+
448+
cliui.Infof(inv.Stderr,"Failed to watch screen events")
449+
// Start the reporter, watcher, and server.
450+
ifsrv.agentClient!=nil&&appStatusSlug!="" {
451+
srv.startReporter(ctx,inv)
452+
ifsrv.llmClient!=nil {
453+
srv.startWatcher(ctx,inv)
454+
}
455+
}
456+
returnsrv.startServer(ctx,inv,instructions,allowedTools)
413457
},
414458
Short:"Start the Coder MCP server.",
415459
Middleware:serpent.Chain(
@@ -438,14 +482,142 @@ func (r *RootCmd) mcpServer() *serpent.Command {
438482
Value:serpent.StringOf(&appStatusSlug),
439483
Default:"",
440484
},
485+
{
486+
Flag:"llm-agent-url",
487+
Description:"The URL of the LLM agent API, used to listen for status updates.",
488+
Env:envLLMAgentURL,
489+
Value:serpent.URLOf(&llmAgentURL),
490+
},
441491
},
442492
}
443493
}
444494

445-
func (s*mcpServer)start(inv*serpent.Invocation,instructionsstring,allowedTools []string)error {
446-
ctx,cancel:=context.WithCancel(inv.Context())
447-
defercancel()
495+
func (s*mcpServer)startReporter(ctx context.Context,inv*serpent.Invocation) {
496+
varlastMessageIDint64
497+
shouldUpdate:=func(itemreportTask) codersdk.WorkspaceAppStatusState {
498+
// Always send self-reported updates.
499+
ifitem.selfReported {
500+
returnitem.state
501+
}
502+
// Always send completed states.
503+
switchitem.state {
504+
casecodersdk.WorkspaceAppStatusStateComplete,
505+
codersdk.WorkspaceAppStatusStateFailure:
506+
returnitem.state
507+
}
508+
// Always send "working" when there is a new message, since this means the
509+
// user submitted a message through the API and we know the LLM will begin
510+
// work soon if it has not already.
511+
ifitem.messageID>lastMessageID {
512+
returncodersdk.WorkspaceAppStatusStateWorking
513+
}
514+
// Otherwise, if the state is "working" and there have been no new messages,
515+
// it means either that the LLM is still working or it means the user has
516+
// interacted with the terminal directly. For now, we are ignoring these
517+
// updates. This risks missing cases where the user manually submits a new
518+
// prompt and the LLM becomes active and does not update itself, but it
519+
// avoids spamming useless status updates.
520+
return""
521+
}
522+
varlastPayload agentsdk.PatchAppStatus
523+
gofunc() {
524+
for {
525+
// TODO: Even with the queue, there is still the potential that a message
526+
// from the screen watcher and a message from the LLM could arrive out of
527+
// order if the timing is just right. We might want to wait a bit, then
528+
// check if the status has changed before committing.
529+
item,ok:=s.queue.Pop()
530+
if!ok {
531+
return
532+
}
533+
534+
state:=shouldUpdate(item)
535+
ifstate=="" {
536+
continue
537+
}
538+
539+
ifitem.messageID!=0 {
540+
lastMessageID=item.messageID
541+
}
542+
543+
payload:= agentsdk.PatchAppStatus{
544+
AppSlug:s.appStatusSlug,
545+
Message:item.summary,
546+
URI:item.link,
547+
State:state,
548+
}
549+
550+
// Preserve previous message and URI.
551+
ifpayload.Message=="" {
552+
payload.Message=lastPayload.Message
553+
}
554+
ifpayload.URI=="" {
555+
payload.URI=lastPayload.URI
556+
}
557+
558+
// Avoid sending duplicate updates.
559+
iflastPayload.State==payload.State&&
560+
lastPayload.URI==payload.URI&&
561+
lastPayload.Message==payload.Message {
562+
continue
563+
}
564+
565+
err:=s.agentClient.PatchAppStatus(ctx,payload)
566+
iferr!=nil&&!errors.Is(err,context.Canceled) {
567+
cliui.Warnf(inv.Stderr,"Failed to report task status: %s",err)
568+
}
569+
570+
lastPayload=payload
571+
}
572+
}()
573+
}
574+
575+
func (s*mcpServer)startWatcher(ctx context.Context,inv*serpent.Invocation) {
576+
eventsCh,errCh,err:=s.llmClient.SubscribeEvents(ctx)
577+
iferr!=nil {
578+
cliui.Warnf(inv.Stderr,"Failed to watch screen events: %s",err)
579+
return
580+
}
581+
gofunc() {
582+
for {
583+
select {
584+
case<-ctx.Done():
585+
return
586+
caseevent:=<-eventsCh:
587+
switchev:=event.(type) {
588+
case agentapi.EventStatusChange:
589+
// If the screen is stable, assume complete.
590+
state:=codersdk.WorkspaceAppStatusStateWorking
591+
ifev.Status==agentapi.StatusStable {
592+
state=codersdk.WorkspaceAppStatusStateComplete
593+
}
594+
err:=s.queue.Push(reportTask{
595+
state:state,
596+
})
597+
iferr!=nil {
598+
cliui.Warnf(inv.Stderr,"Failed to queue update: %s",err)
599+
return
600+
}
601+
case agentapi.EventMessageUpdate:
602+
err:=s.queue.Push(reportTask{
603+
messageID:ev.Id,
604+
})
605+
iferr!=nil {
606+
cliui.Warnf(inv.Stderr,"Failed to queue update: %s",err)
607+
return
608+
}
609+
}
610+
caseerr:=<-errCh:
611+
if!errors.Is(err,context.Canceled) {
612+
cliui.Warnf(inv.Stderr,"Received error from screen event watcher: %s",err)
613+
}
614+
return
615+
}
616+
}
617+
}()
618+
}
448619

620+
func (s*mcpServer)startServer(ctx context.Context,inv*serpent.Invocation,instructionsstring,allowedTools []string)error {
449621
cliui.Infof(inv.Stderr,"Starting MCP server")
450622

451623
cliui.Infof(inv.Stderr,"Instructions : %q",instructions)
@@ -476,8 +648,16 @@ func (s *mcpServer) start(inv *serpent.Invocation, instructions string, allowedT
476648

477649
// Add tool dependencies.
478650
toolOpts:= []func(*toolsdk.Deps){
479-
toolsdk.WithAgentClient(s.agentClient),
480-
toolsdk.WithAppStatusSlug(s.appStatusSlug),
651+
toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs)error {
652+
// TODO: Is it OK to just push and return or should we wait for it to
653+
// actually get disatched to return any request errors?
654+
returns.queue.Push(reportTask{
655+
link:args.Link,
656+
selfReported:true,
657+
state:codersdk.WorkspaceAppStatusState(args.State),
658+
summary:args.Summary,
659+
})
660+
}),
481661
}
482662

483663
toolDeps,err:=toolsdk.NewDeps(s.client,toolOpts...)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp