Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4bd5609

Browse files
authored
feat: add status watcher to MCP server (#18320)
This is meant to complement the existing task reporter since the LLMdoes not call it reliably.It also includes refactoring to use the common agent flags/env vars.
1 parent5bcde58 commit4bd5609

File tree

12 files changed

+929
-184
lines changed

12 files changed

+929
-184
lines changed

‎cli/cliutil/queue.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package cliutil
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/xerrors"
7+
8+
"github.com/coder/coder/v2/codersdk"
9+
)
10+
11+
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
12+
// item is dropped.
13+
typeQueue[Tany]struct {
14+
cond*sync.Cond
15+
items []T
16+
mu sync.Mutex
17+
sizeint
18+
closedbool
19+
predfunc(xT) (T,bool)
20+
}
21+
22+
// NewQueue creates a queue with the given size.
23+
funcNewQueue[Tany](sizeint)*Queue[T] {
24+
q:=&Queue[T]{
25+
items:make([]T,0,size),
26+
size:size,
27+
}
28+
q.cond=sync.NewCond(&q.mu)
29+
returnq
30+
}
31+
32+
// WithPredicate adds the given predicate function, which can control what is
33+
// pushed to the queue.
34+
func (q*Queue[T])WithPredicate(predfunc(xT) (T,bool))*Queue[T] {
35+
q.pred=pred
36+
returnq
37+
}
38+
39+
// Close aborts any pending pops and makes future pushes error.
40+
func (q*Queue[T])Close() {
41+
q.mu.Lock()
42+
deferq.mu.Unlock()
43+
q.closed=true
44+
q.cond.Broadcast()
45+
}
46+
47+
// Push adds an item to the queue. If closed, returns an error.
48+
func (q*Queue[T])Push(xT)error {
49+
q.mu.Lock()
50+
deferq.mu.Unlock()
51+
ifq.closed {
52+
returnxerrors.New("queue has been closed")
53+
}
54+
// Potentially mutate or skip the push using the predicate.
55+
ifq.pred!=nil {
56+
varokbool
57+
x,ok=q.pred(x)
58+
if!ok {
59+
returnnil
60+
}
61+
}
62+
// Remove the first item from the queue if it has gotten too big.
63+
iflen(q.items)>=q.size {
64+
q.items=q.items[1:]
65+
}
66+
q.items=append(q.items,x)
67+
q.cond.Broadcast()
68+
returnnil
69+
}
70+
71+
// Pop removes and returns the first item from the queue, waiting until there is
72+
// something to pop if necessary. If closed, returns false.
73+
func (q*Queue[T])Pop() (T,bool) {
74+
varheadT
75+
q.mu.Lock()
76+
deferq.mu.Unlock()
77+
forlen(q.items)==0&&!q.closed {
78+
q.cond.Wait()
79+
}
80+
ifq.closed {
81+
returnhead,false
82+
}
83+
head,q.items=q.items[0],q.items[1:]
84+
returnhead,true
85+
}
86+
87+
func (q*Queue[T])Len()int {
88+
q.mu.Lock()
89+
deferq.mu.Unlock()
90+
returnlen(q.items)
91+
}
92+
93+
typereportTaskstruct {
94+
linkstring
95+
messageIDint64
96+
selfReportedbool
97+
state codersdk.WorkspaceAppStatusState
98+
summarystring
99+
}
100+
101+
// statusQueue is a Queue that:
102+
// 1. Only pushes items that are not duplicates.
103+
// 2. Preserves the existing message and URI when one a message is not provided.
104+
// 3. Ignores "working" updates from the status watcher.
105+
typeStatusQueuestruct {
106+
Queue[reportTask]
107+
// lastMessageID is the ID of the last *user* message that we saw. A user
108+
// message only happens when interacting via the API (as opposed to
109+
// interacting with the terminal directly).
110+
lastMessageIDint64
111+
}
112+
113+
func (q*StatusQueue)Push(reportreportTask)error {
114+
q.mu.Lock()
115+
deferq.mu.Unlock()
116+
ifq.closed {
117+
returnxerrors.New("queue has been closed")
118+
}
119+
varlastReportreportTask
120+
iflen(q.items)>0 {
121+
lastReport=q.items[len(q.items)-1]
122+
}
123+
// Use "working" status if this is a new user message. If this is not a new
124+
// user message, and the status is "working" and not self-reported (meaning it
125+
// came from the screen watcher), then it means one of two things:
126+
// 1. The LLM is still working, in which case our last status will already
127+
// have been "working", so there is nothing to do.
128+
// 2. The user has interacted with the terminal directly. For now, we are
129+
// ignoring these updates. This risks missing cases where the user
130+
// manually submits a new prompt and the LLM becomes active and does not
131+
// update itself, but it avoids spamming useless status updates as the user
132+
// is typing, so the tradeoff is worth it. In the future, if we can
133+
// reliably distinguish between user and LLM activity, we can change this.
134+
ifreport.messageID>q.lastMessageID {
135+
report.state=codersdk.WorkspaceAppStatusStateWorking
136+
}elseifreport.state==codersdk.WorkspaceAppStatusStateWorking&&!report.selfReported {
137+
q.mu.Unlock()
138+
returnnil
139+
}
140+
// Preserve previous message and URI if there was no message.
141+
ifreport.summary=="" {
142+
report.summary=lastReport.summary
143+
ifreport.link=="" {
144+
report.link=lastReport.link
145+
}
146+
}
147+
// Avoid queueing duplicate updates.
148+
ifreport.state==lastReport.state&&
149+
report.link==lastReport.link&&
150+
report.summary==lastReport.summary {
151+
returnnil
152+
}
153+
// Drop the first item if the queue has gotten too big.
154+
iflen(q.items)>=q.size {
155+
q.items=q.items[1:]
156+
}
157+
q.items=append(q.items,report)
158+
q.cond.Broadcast()
159+
returnnil
160+
}

‎cli/cliutil/queue_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package cliutil_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/coder/coder/v2/cli/cliutil"
10+
)
11+
12+
funcTestQueue(t*testing.T) {
13+
t.Parallel()
14+
15+
t.Run("DropsFirst",func(t*testing.T) {
16+
t.Parallel()
17+
18+
q:= cliutil.NewQueue[int](10)
19+
require.Equal(t,0,q.Len())
20+
21+
fori:=0;i<20;i++ {
22+
err:=q.Push(i)
23+
require.NoError(t,err)
24+
ifi<10 {
25+
require.Equal(t,i+1,q.Len())
26+
}else {
27+
require.Equal(t,10,q.Len())
28+
}
29+
}
30+
31+
val,ok:=q.Pop()
32+
require.True(t,ok)
33+
require.Equal(t,10,val)
34+
require.Equal(t,9,q.Len())
35+
})
36+
37+
t.Run("Pop",func(t*testing.T) {
38+
t.Parallel()
39+
40+
q:= cliutil.NewQueue[int](10)
41+
fori:=0;i<5;i++ {
42+
err:=q.Push(i)
43+
require.NoError(t,err)
44+
}
45+
46+
// No blocking, should pop immediately.
47+
fori:=0;i<5;i++ {
48+
val,ok:=q.Pop()
49+
require.True(t,ok)
50+
require.Equal(t,i,val)
51+
}
52+
53+
// Pop should block until the next push.
54+
gofunc() {
55+
err:=q.Push(55)
56+
assert.NoError(t,err)
57+
}()
58+
59+
item,ok:=q.Pop()
60+
require.True(t,ok)
61+
require.Equal(t,55,item)
62+
})
63+
64+
t.Run("Close",func(t*testing.T) {
65+
t.Parallel()
66+
67+
q:= cliutil.NewQueue[int](10)
68+
69+
done:=make(chanbool)
70+
gofunc() {
71+
_,ok:=q.Pop()
72+
done<-ok
73+
}()
74+
75+
q.Close()
76+
77+
require.False(t,<-done)
78+
79+
_,ok:=q.Pop()
80+
require.False(t,ok)
81+
82+
err:=q.Push(10)
83+
require.Error(t,err)
84+
})
85+
86+
t.Run("WithPredicate",func(t*testing.T) {
87+
t.Parallel()
88+
89+
q:= cliutil.NewQueue[int](10)
90+
q.WithPredicate(func(nint) (int,bool) {
91+
ifn==2 {
92+
returnn,false
93+
}
94+
returnn+1,true
95+
})
96+
97+
fori:=0;i<5;i++ {
98+
err:=q.Push(i)
99+
require.NoError(t,err)
100+
}
101+
102+
got:= []int{}
103+
fori:=0;i<4;i++ {
104+
val,ok:=q.Pop()
105+
require.True(t,ok)
106+
got=append(got,val)
107+
}
108+
require.Equal(t, []int{1,2,4,5},got)
109+
})
110+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp