Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit866b721

Browse files
committed
Move update predicates to push phase
Instead of the pop phase. This ensures we do not queue up updates thatwill just end up being discarded once they are popped (which could takesome time due to latency to coderd).It also has the side effect of preserving summaries even when the queuegets too big, because now we preserve them as part of pushing, beforethey might get lost due to getting dropped while we wait on coderd.
1 parent6d40d40 commit866b721

File tree

3 files changed

+163
-68
lines changed

3 files changed

+163
-68
lines changed

‎cli/cliutil/queue.go‎

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"sync"
55

66
"golang.org/x/xerrors"
7+
8+
"github.com/coder/coder/v2/codersdk"
79
)
810

911
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
@@ -14,6 +16,7 @@ type Queue[T any] struct {
1416
mu sync.Mutex
1517
sizeint
1618
closedbool
19+
predfunc(xT) (T,bool)
1720
}
1821

1922
// NewQueue creates a queue with the given size.
@@ -26,6 +29,13 @@ func NewQueue[T any](size int) *Queue[T] {
2629
returnq
2730
}
2831

32+
// WithPredicate adds the given predicate function, which can control what is
33+
// pushed to the queue.
34+
func (q*Queue[T])WithPredicate(predfunc(xT) (T,bool))*Queue[T] {
35+
q.pred=pred
36+
returnq
37+
}
38+
2939
// Close aborts any pending pops and makes future pushes error.
3040
func (q*Queue[T])Close() {
3141
q.mu.Lock()
@@ -41,6 +51,15 @@ func (q *Queue[T]) Push(x T) error {
4151
ifq.closed {
4252
returnxerrors.New("queue has been closed")
4353
}
54+
// Potentially mutate or skip the push using the predicate.
55+
ifq.pred!=nil {
56+
varokbool
57+
x,ok=q.pred(x)
58+
if!ok {
59+
returnnil
60+
}
61+
}
62+
// Remove the first item from the queue if it has gotten too big.
4463
iflen(q.items)>=q.size {
4564
q.items=q.items[1:]
4665
}
@@ -70,3 +89,72 @@ func (q *Queue[T]) Len() int {
7089
deferq.mu.Unlock()
7190
returnlen(q.items)
7291
}
92+
93+
typereportTaskstruct {
94+
linkstring
95+
messageIDint64
96+
selfReportedbool
97+
state codersdk.WorkspaceAppStatusState
98+
summarystring
99+
}
100+
101+
// statusQueue is a Queue that:
102+
// 1. Only pushes items that are not duplicates.
103+
// 2. Preserves the existing message and URI when one a message is not provided.
104+
// 3. Ignores "working" updates from the status watcher.
105+
typeStatusQueuestruct {
106+
Queue[reportTask]
107+
// lastMessageID is the ID of the last *user* message that we saw. A user
108+
// message only happens when interacting via the API (as opposed to
109+
// interacting with the terminal directly).
110+
lastMessageIDint64
111+
}
112+
113+
func (q*StatusQueue)Push(reportreportTask)error {
114+
q.mu.Lock()
115+
deferq.mu.Unlock()
116+
ifq.closed {
117+
returnxerrors.New("queue has been closed")
118+
}
119+
varlastReportreportTask
120+
iflen(q.items)>0 {
121+
lastReport=q.items[len(q.items)-1]
122+
}
123+
// Use "working" status if this is a new user message. If this is not a new
124+
// user message, and the status is "working" and not self-reported (meaning it
125+
// came from the screen watcher), then it means one of two things:
126+
// 1. The LLM is still working, in which case our last status will already
127+
// have been "working", so there is nothing to do.
128+
// 2. The user has interacted with the terminal directly. For now, we are
129+
// ignoring these updates. This risks missing cases where the user
130+
// manually submits a new prompt and the LLM becomes active and does not
131+
// update itself, but it avoids spamming useless status updates as the user
132+
// is typing, so the tradeoff is worth it. In the future, if we can
133+
// reliably distinguish between user and LLM activity, we can change this.
134+
ifreport.messageID>q.lastMessageID {
135+
report.state=codersdk.WorkspaceAppStatusStateWorking
136+
}elseifreport.state==codersdk.WorkspaceAppStatusStateWorking&&!report.selfReported {
137+
q.mu.Unlock()
138+
returnnil
139+
}
140+
// Preserve previous message and URI if there was no message.
141+
ifreport.summary=="" {
142+
report.summary=lastReport.summary
143+
ifreport.link=="" {
144+
report.link=lastReport.link
145+
}
146+
}
147+
// Avoid queueing duplicate updates.
148+
ifreport.state==lastReport.state&&
149+
report.link==lastReport.link&&
150+
report.summary==lastReport.summary {
151+
returnnil
152+
}
153+
// Drop the first item if the queue has gotten too big.
154+
iflen(q.items)>=q.size {
155+
q.items=q.items[1:]
156+
}
157+
q.items=append(q.items,report)
158+
q.cond.Broadcast()
159+
returnnil
160+
}

‎cli/cliutil/queue_test.go‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,29 @@ func TestQueue(t *testing.T) {
8282
err:=q.Push(10)
8383
require.Error(t,err)
8484
})
85+
86+
t.Run("WithPredicate",func(t*testing.T) {
87+
t.Parallel()
88+
89+
q:= cliutil.NewQueue[int](10)
90+
q.WithPredicate(func(nint) (int,bool) {
91+
ifn==2 {
92+
returnn,false
93+
}
94+
returnn+1,true
95+
})
96+
97+
fori:=0;i<5;i++ {
98+
err:=q.Push(i)
99+
require.NoError(t,err)
100+
}
101+
102+
got:= []int{}
103+
fori:=0;i<4;i++ {
104+
val,ok:=q.Pop()
105+
require.True(t,ok)
106+
got=append(got,val)
107+
}
108+
require.Equal(t, []int{1,2,4,5},got)
109+
})
85110
}

‎cli/exp_mcp.go‎

Lines changed: 50 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (*RootCmd) mcpConfigureCursor() *serpent.Command {
361361
returncmd
362362
}
363363

364-
typereportTaskstruct {
364+
typetaskReportstruct {
365365
linkstring
366366
messageIDint64
367367
selfReportedbool
@@ -374,7 +374,7 @@ type mcpServer struct {
374374
appStatusSlugstring
375375
client*codersdk.Client
376376
llmClient*agentapi.Client
377-
queue*cliutil.Queue[reportTask]
377+
queue*cliutil.Queue[taskReport]
378378
}
379379

380380
func (r*RootCmd)mcpServer()*serpent.Command {
@@ -388,9 +388,50 @@ func (r *RootCmd) mcpServer() *serpent.Command {
388388
return&serpent.Command{
389389
Use:"server",
390390
Handler:func(inv*serpent.Invocation)error {
391+
// lastUserMessageID is the ID of the last *user* message that we saw. A
392+
// user message only happens when interacting via the LLM agent API (as
393+
// opposed to interacting with the terminal directly).
394+
varlastUserMessageIDint64
395+
varlastReporttaskReport
396+
// Create a queue that skips duplicates and preserves summaries.
397+
queue:= cliutil.NewQueue[taskReport](512).WithPredicate(func(reporttaskReport) (taskReport,bool) {
398+
// Use "working" status if this is a new user message. If this is not a
399+
// new user message, and the status is "working" and not self-reported
400+
// (meaning it came from the screen watcher), then it means one of two
401+
// things:
402+
// 1. The LLM is still working, so there is nothing to update.
403+
// 2. The LLM stopped working, then the user has interacted with the
404+
// terminal directly. For now, we are ignoring these updates. This
405+
// risks missing cases where the user manually submits a new prompt
406+
// and the LLM becomes active and does not update itself, but it
407+
// avoids spamming useless status updates as the user is typing, so
408+
// the tradeoff is worth it. In the future, if we can reliably
409+
// distinguish between user and LLM activity, we can change this.
410+
ifreport.messageID>lastUserMessageID {
411+
report.state=codersdk.WorkspaceAppStatusStateWorking
412+
}elseifreport.state==codersdk.WorkspaceAppStatusStateWorking&&!report.selfReported {
413+
returnreport,false
414+
}
415+
// Preserve previous message and URI if there was no message.
416+
ifreport.summary=="" {
417+
report.summary=lastReport.summary
418+
ifreport.link=="" {
419+
report.link=lastReport.link
420+
}
421+
}
422+
// Avoid queueing duplicate updates.
423+
ifreport.state==lastReport.state&&
424+
report.link==lastReport.link&&
425+
report.summary==lastReport.summary {
426+
returnreport,false
427+
}
428+
lastReport=report
429+
returnreport,true
430+
})
431+
391432
srv:=&mcpServer{
392433
appStatusSlug:appStatusSlug,
393-
queue:cliutil.NewQueue[reportTask](100),
434+
queue:queue,
394435
}
395436

396437
// Display client URL separately from authentication status.
@@ -505,35 +546,6 @@ func (r *RootCmd) mcpServer() *serpent.Command {
505546
}
506547

507548
func (s*mcpServer)startReporter(ctx context.Context,inv*serpent.Invocation) {
508-
// lastMessageID is the ID of the last *user* message that we saw. A user
509-
// message only happens when interacting via the API (as opposed to
510-
// interacting with the terminal directly).
511-
varlastMessageIDint64
512-
shouldUpdate:=func(itemreportTask) codersdk.WorkspaceAppStatusState {
513-
// Always send self-reported updates.
514-
ifitem.selfReported {
515-
returnitem.state
516-
}
517-
// Always send completed states.
518-
switchitem.state {
519-
casecodersdk.WorkspaceAppStatusStateComplete,
520-
codersdk.WorkspaceAppStatusStateFailure:
521-
returnitem.state
522-
}
523-
// Always send "working" when there is a new user message, since we know the
524-
// LLM will begin work soon if it has not already.
525-
ifitem.messageID>lastMessageID {
526-
returncodersdk.WorkspaceAppStatusStateWorking
527-
}
528-
// Otherwise, if the state is "working" and there have been no new user
529-
// messages, it means either that the LLM is still working or it means the
530-
// user has interacted with the terminal directly. For now, we are ignoring
531-
// these updates. This risks missing cases where the user manually submits
532-
// a new prompt and the LLM becomes active and does not update itself, but
533-
// it avoids spamming useless status updates.
534-
return""
535-
}
536-
varlastPayload agentsdk.PatchAppStatus
537549
gofunc() {
538550
for {
539551
// TODO: Even with the queue, there is still the potential that a message
@@ -545,45 +557,15 @@ func (s *mcpServer) startReporter(ctx context.Context, inv *serpent.Invocation)
545557
return
546558
}
547559

548-
state:=shouldUpdate(item)
549-
ifstate=="" {
550-
continue
551-
}
552-
553-
ifitem.messageID!=0 {
554-
lastMessageID=item.messageID
555-
}
556-
557-
payload:= agentsdk.PatchAppStatus{
560+
err:=s.agentClient.PatchAppStatus(ctx, agentsdk.PatchAppStatus{
558561
AppSlug:s.appStatusSlug,
559562
Message:item.summary,
560563
URI:item.link,
561-
State:state,
562-
}
563-
564-
// Preserve previous message and URI if there was no message.
565-
ifpayload.Message=="" {
566-
payload.Message=lastPayload.Message
567-
ifpayload.URI=="" {
568-
payload.URI=lastPayload.URI
569-
}
570-
}
571-
572-
// Avoid sending duplicate updates.
573-
iflastPayload.State==payload.State&&
574-
lastPayload.URI==payload.URI&&
575-
lastPayload.Message==payload.Message {
576-
continue
577-
}
578-
579-
err:=s.agentClient.PatchAppStatus(ctx,payload)
564+
State:item.state,
565+
})
580566
iferr!=nil&&!errors.Is(err,context.Canceled) {
581567
cliui.Warnf(inv.Stderr,"Failed to report task status: %s",err)
582568
}
583-
584-
iferr==nil {
585-
lastPayload=payload
586-
}
587569
}
588570
}()
589571
}
@@ -607,7 +589,7 @@ func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
607589
ifev.Status==agentapi.StatusStable {
608590
state=codersdk.WorkspaceAppStatusStateComplete
609591
}
610-
err:=s.queue.Push(reportTask{
592+
err:=s.queue.Push(taskReport{
611593
state:state,
612594
})
613595
iferr!=nil {
@@ -616,7 +598,7 @@ func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
616598
}
617599
case agentapi.EventMessageUpdate:
618600
ifev.Role==agentapi.RoleUser {
619-
err:=s.queue.Push(reportTask{
601+
err:=s.queue.Push(taskReport{
620602
messageID:ev.Id,
621603
})
622604
iferr!=nil {
@@ -667,7 +649,7 @@ func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, in
667649
// Add tool dependencies.
668650
toolOpts:= []func(*toolsdk.Deps){
669651
toolsdk.WithTaskReporter(func(args toolsdk.ReportTaskArgs)error {
670-
returns.queue.Push(reportTask{
652+
returns.queue.Push(taskReport{
671653
link:args.Link,
672654
selfReported:true,
673655
state:codersdk.WorkspaceAppStatusState(args.State),

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp