Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf696ff4

Browse files
committed
Add queue util
1 parentea55f84 commitf696ff4

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

‎cli/cliutil/queue.go‎

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cliutil
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/xerrors"
7+
)
8+
9+
// Queue is a FIFO queue with a fixed size. If the size is exceeded, the first
10+
// item is dropped.
11+
typeQueue[Tany]struct {
12+
cond*sync.Cond
13+
items []T
14+
mu sync.Mutex
15+
sizeint
16+
closedbool
17+
}
18+
19+
// NewQueue creates a queue with the given size.
20+
funcNewQueue[Tany](sizeint)*Queue[T] {
21+
q:=&Queue[T]{
22+
items:make([]T,0,size),
23+
size:size,
24+
}
25+
q.cond=sync.NewCond(&q.mu)
26+
returnq
27+
}
28+
29+
// Close aborts any pending pops and makes future pushes error.
30+
func (q*Queue[T])Close() {
31+
q.mu.Lock()
32+
deferq.mu.Unlock()
33+
q.closed=true
34+
q.cond.Broadcast()
35+
}
36+
37+
// Push adds an item to the queue. If closed, returns an error.
38+
func (q*Queue[T])Push(xT)error {
39+
q.mu.Lock()
40+
deferq.mu.Unlock()
41+
ifq.closed {
42+
returnxerrors.New("queue has been closed")
43+
}
44+
iflen(q.items)>=q.size {
45+
q.items=q.items[1:]
46+
}
47+
q.items=append(q.items,x)
48+
q.cond.Broadcast()
49+
returnnil
50+
}
51+
52+
// Pop removes and returns the first item from the queue, waiting until there is
53+
// something to pop if necessary. If closed, returns false.
54+
func (q*Queue[T])Pop() (T,bool) {
55+
varheadT
56+
q.mu.Lock()
57+
deferq.mu.Unlock()
58+
forlen(q.items)==0&&!q.closed {
59+
q.cond.Wait()
60+
}
61+
ifq.closed {
62+
returnhead,false
63+
}
64+
head,q.items=q.items[0],q.items[1:]
65+
returnhead,true
66+
}
67+
68+
func (q*Queue[T])Len()int {
69+
q.mu.Lock()
70+
deferq.mu.Unlock()
71+
returnlen(q.items)
72+
}

‎cli/cliutil/queue_test.go‎

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package cliutil_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/coder/coder/v2/cli/cliutil"
10+
)
11+
12+
funcTestQueue(t*testing.T) {
13+
t.Parallel()
14+
15+
t.Run("DropsFirst",func(t*testing.T) {
16+
t.Parallel()
17+
18+
q:= cliutil.NewQueue[int](10)
19+
require.Equal(t,0,q.Len())
20+
21+
fori:=0;i<20;i++ {
22+
err:=q.Push(i)
23+
require.NoError(t,err)
24+
ifi<10 {
25+
require.Equal(t,i+1,q.Len())
26+
}else {
27+
require.Equal(t,10,q.Len())
28+
}
29+
}
30+
31+
val,ok:=q.Pop()
32+
require.True(t,ok)
33+
require.Equal(t,10,val)
34+
require.Equal(t,9,q.Len())
35+
})
36+
37+
t.Run("Pop",func(t*testing.T) {
38+
t.Parallel()
39+
40+
q:= cliutil.NewQueue[int](10)
41+
fori:=0;i<5;i++ {
42+
err:=q.Push(i)
43+
require.NoError(t,err)
44+
}
45+
46+
// No blocking, should pop immediately.
47+
fori:=0;i<5;i++ {
48+
val,ok:=q.Pop()
49+
require.True(t,ok)
50+
require.Equal(t,i,val)
51+
}
52+
53+
// Pop should block until the next push.
54+
gofunc() {
55+
err:=q.Push(55)
56+
assert.NoError(t,err)
57+
}()
58+
59+
item,ok:=q.Pop()
60+
require.True(t,ok)
61+
require.Equal(t,55,item)
62+
})
63+
64+
t.Run("Close",func(t*testing.T) {
65+
t.Parallel()
66+
67+
q:= cliutil.NewQueue[int](10)
68+
69+
done:=make(chanbool)
70+
gofunc() {
71+
_,ok:=q.Pop()
72+
done<-ok
73+
}()
74+
75+
q.Close()
76+
77+
require.False(t,<-done)
78+
79+
_,ok:=q.Pop()
80+
require.False(t,ok)
81+
82+
err:=q.Push(10)
83+
require.Error(t,err)
84+
})
85+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp