Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Aug 30, 2024. It is now read-only.
/coder-v1-cliPublic archive

Commitb1d9ef4

Browse files
authored
chore: Remove Closed chan in favor of reconnecting DataChannels (#393)
* chore: Remove Closed chan in favor of reconnecting DataChannels* Evict quickly* Fix closed connection caching* Fix HTTP check* Switch on HTTP(s)
1 parent7f0e87d commitb1d9ef4

File tree

4 files changed

+25
-74
lines changed

4 files changed

+25
-74
lines changed

‎wsnet/cache.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/pion/webrtc/v3"
910
"golang.org/x/sync/singleflight"
1011
)
1112

@@ -39,7 +40,7 @@ type DialerCache struct {
3940

4041
// init starts the ticker for evicting connections.
4142
func (d*DialerCache)init() {
42-
ticker:=time.NewTicker(time.Second*30)
43+
ticker:=time.NewTicker(time.Second*5)
4344
deferticker.Stop()
4445
for {
4546
select {
@@ -62,17 +63,11 @@ func (d *DialerCache) evict() {
6263
gofunc() {
6364
deferwg.Done()
6465

65-
evict:=false
66-
select {
67-
case<-dialer.Closed():
66+
// If we're no longer signaling, the connection is pending close.
67+
evict:=dialer.rtc.SignalingState()==webrtc.SignalingStateClosed
68+
ifdialer.activeConnections()==0&&time.Since(d.atime[key])>=d.ttl {
6869
evict=true
69-
default:
70-
}
71-
ifdialer.ActiveConnections()==0&&time.Since(d.atime[key])>=d.ttl {
72-
evict=true
73-
}
74-
// If we're already evicting there's no point in trying to ping.
75-
if!evict {
70+
}else {
7671
ctx,cancel:=context.WithTimeout(context.Background(),time.Second*15)
7772
defercancel()
7873
err:=dialer.Ping(ctx)
@@ -116,17 +111,12 @@ func (d *DialerCache) Dial(ctx context.Context, key string, dialerFunc func() (*
116111
dialer,ok:=d.dialers[key]
117112
d.mut.RUnlock()
118113
ifok {
119-
closed:=false
120-
select {
121-
case<-dialer.Closed():
122-
closed=true
123-
default:
124-
}
125-
if!closed {
126-
d.mut.Lock()
127-
d.atime[key]=time.Now()
128-
d.mut.Unlock()
114+
d.mut.Lock()
115+
d.atime[key]=time.Now()
116+
d.mut.Unlock()
129117

118+
// The connection is pending close here...
119+
ifdialer.rtc.SignalingState()!=webrtc.SignalingStateClosed {
130120
returndialer,true,nil
131121
}
132122
}

‎wsnet/conn.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package wsnet
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net"
78
"net/http"
@@ -73,9 +74,13 @@ func (t *turnProxyDialer) Dial(network, addr string) (c net.Conn, err error) {
7374

7475
// Copy the baseURL so we can adjust path.
7576
url:=*t.baseURL
76-
url.Scheme="wss"
77-
ifurl.Scheme==httpScheme {
77+
switchurl.Scheme{
78+
case"http":
7879
url.Scheme="ws"
80+
case"https":
81+
url.Scheme="wss"
82+
default:
83+
returnnil,errors.New("invalid turn url addr scheme provided")
7984
}
8085
url.Path="/api/private/turn"
8186
conn,resp,err:=websocket.Dial(ctx,url.String(),&websocket.DialOptions{

‎wsnet/dial.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ func Dial(conn net.Conn, options *DialOptions) (*Dialer, error) {
118118
conn:conn,
119119
ctrl:ctrl,
120120
rtc:rtc,
121-
closedChan:make(chanstruct{}),
122121
connClosers: []io.Closer{ctrl},
123122
}
124123

@@ -134,7 +133,6 @@ type Dialer struct {
134133
ctrlrw datachannel.ReadWriteCloser
135134
rtc*webrtc.PeerConnection
136135

137-
closedChanchanstruct{}
138136
connClosers []io.Closer
139137
connClosersMut sync.Mutex
140138
pingMut sync.Mutex
@@ -161,25 +159,17 @@ func (d *Dialer) negotiate() (err error) {
161159
return
162160
}
163161
d.rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
164-
ifpcs!=webrtc.PeerConnectionStateDisconnected {
162+
ifpcs==webrtc.PeerConnectionStateConnected {
165163
return
166164
}
167165

168-
// Close connections openedwhile the RTC was alive.
166+
// Close connections openedwhen RTC was alive.
169167
d.connClosersMut.Lock()
170168
deferd.connClosersMut.Unlock()
171169
for_,connCloser:=ranged.connClosers {
172170
_=connCloser.Close()
173171
}
174172
d.connClosers=make([]io.Closer,0)
175-
176-
select {
177-
case<-d.closedChan:
178-
return
179-
default:
180-
}
181-
close(d.closedChan)
182-
_=d.rtc.Close()
183173
})
184174
}()
185175

@@ -228,15 +218,9 @@ func (d *Dialer) negotiate() (err error) {
228218
return<-errCh
229219
}
230220

231-
// Closed returns a channel that closes when
232-
// the connection is closed.
233-
func (d*Dialer)Closed()<-chanstruct{} {
234-
returnd.closedChan
235-
}
236-
237221
// ActiveConnections returns the amount of active connections.
238222
// DialContext opens a connection, and close will end it.
239-
func (d*Dialer)ActiveConnections()int {
223+
func (d*Dialer)activeConnections()int {
240224
stats,ok:=d.rtc.GetStats().GetConnectionStats(d.rtc)
241225
if!ok {
242226
return-1
@@ -248,12 +232,6 @@ func (d *Dialer) ActiveConnections() int {
248232
// Close closes the RTC connection.
249233
// All data channels dialed will be closed.
250234
func (d*Dialer)Close()error {
251-
select {
252-
case<-d.closedChan:
253-
returnnil
254-
default:
255-
}
256-
close(d.closedChan)
257235
returnd.rtc.Close()
258236
}
259237

‎wsnet/dial_test.go

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"net"
1010
"strconv"
1111
"testing"
12-
"time"
1312

1413
"cdr.dev/slog/sloggers/slogtest"
1514
"github.com/pion/ice/v2"
@@ -223,27 +222,6 @@ func TestDial(t *testing.T) {
223222
assert.ErrorIs(t,err,io.EOF)
224223
})
225224

226-
t.Run("Closed",func(t*testing.T) {
227-
t.Parallel()
228-
229-
connectAddr,listenAddr:=createDumbBroker(t)
230-
l,err:=Listen(context.Background(),slogtest.Make(t,nil),listenAddr,"")
231-
require.NoError(t,err)
232-
deferl.Close()
233-
234-
dialer,err:=DialWebsocket(context.Background(),connectAddr,nil,nil)
235-
require.NoError(t,err)
236-
gofunc() {
237-
_=dialer.Close()
238-
}()
239-
240-
select {
241-
case<-dialer.Closed():
242-
case<-time.NewTimer(time.Second).C:
243-
t.Error("didn't close in time")
244-
}
245-
})
246-
247225
t.Run("Active Connections",func(t*testing.T) {
248226
t.Parallel()
249227

@@ -266,14 +244,14 @@ func TestDial(t *testing.T) {
266244
t.Error(err)
267245
}
268246
conn,_:=dialer.DialContext(context.Background(),listener.Addr().Network(),listener.Addr().String())
269-
assert.Equal(t,1,dialer.ActiveConnections())
247+
assert.Equal(t,1,dialer.activeConnections())
270248
_=conn.Close()
271-
assert.Equal(t,0,dialer.ActiveConnections())
249+
assert.Equal(t,0,dialer.activeConnections())
272250
_,_=dialer.DialContext(context.Background(),listener.Addr().Network(),listener.Addr().String())
273251
conn,_=dialer.DialContext(context.Background(),listener.Addr().Network(),listener.Addr().String())
274-
assert.Equal(t,2,dialer.ActiveConnections())
252+
assert.Equal(t,2,dialer.activeConnections())
275253
_=conn.Close()
276-
assert.Equal(t,1,dialer.ActiveConnections())
254+
assert.Equal(t,1,dialer.activeConnections())
277255
})
278256
}
279257

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp