Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Aug 30, 2024. It is now read-only.
/coder-v1-cliPublic archive

Commit6af7193

Browse files
committed
Add more logging to wsnet listener
1 parentb87b4c6 commit6af7193

File tree

1 file changed

+62
-6
lines changed

1 file changed

+62
-6
lines changed

‎wsnet/listen.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net"
1010
"net/url"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/hashicorp/yamux"
@@ -61,6 +62,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, turnProxyAuthTo
6162
for {
6263
err:=<-ch
6364
iferrors.Is(err,io.EOF)||errors.Is(err,yamux.ErrKeepAliveTimeout) {
65+
l.log.Warn(ctx,"disconnected from broker",slog.Error(err))
66+
6467
// If we hit an EOF, then the connection to the broker
6568
// was interrupted. We'll take a short break then dial
6669
// again.
@@ -97,12 +100,16 @@ type listener struct {
97100
ws*websocket.Conn
98101
connClosers []io.Closer
99102
connClosersMut sync.Mutex
103+
104+
nextConnNumberint64
100105
}
101106

102107
func (l*listener)dial(ctx context.Context) (<-chanerror,error) {
108+
l.log.Info(ctx,"connecting to broker",slog.F("broker_url",l.broker))
103109
ifl.ws!=nil {
104110
_=l.ws.Close(websocket.StatusNormalClosure,"new connection inbound")
105111
}
112+
106113
conn,resp,err:=websocket.Dial(ctx,l.broker,nil)
107114
iferr!=nil {
108115
ifresp!=nil {
@@ -111,13 +118,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
111118
returnnil,err
112119
}
113120
l.ws=conn
121+
114122
nconn:=websocket.NetConn(ctx,conn,websocket.MessageBinary)
115123
config:=yamux.DefaultConfig()
116124
config.LogOutput=io.Discard
117125
session,err:=yamux.Server(nconn,config)
118126
iferr!=nil {
119127
returnnil,fmt.Errorf("create multiplex: %w",err)
120128
}
129+
130+
l.log.Info(ctx,"broker connection established")
121131
errCh:=make(chanerror)
122132
gofunc() {
123133
deferclose(errCh)
@@ -127,19 +137,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
127137
errCh<-err
128138
break
129139
}
130-
gol.negotiate(conn)
140+
gol.negotiate(ctx,conn)
131141
}
132142
}()
143+
133144
returnerrCh,nil
134145
}
135146

136147
// Negotiates the handshake protocol over the connection provided.
137148
// This functions control-flow is important to readability,
138149
// so the cognitive overload linter has been disabled.
139150
// nolint:gocognit,nestif
140-
func (l*listener)negotiate(conn net.Conn) {
151+
func (l*listener)negotiate(ctx context.Context,conn net.Conn) {
141152
var (
142153
errerror
154+
id=atomic.AddInt64(&l.nextConnNumber,1)
143155
decoder=json.NewDecoder(conn)
144156
rtc*webrtc.PeerConnection
145157
// If candidates are sent before an offer, we place them here.
@@ -149,6 +161,8 @@ func (l *listener) negotiate(conn net.Conn) {
149161
// Sends the error provided then closes the connection.
150162
// If RTC isn't connected, we'll close it.
151163
closeError=func(errerror) {
164+
l.log.Warn(ctx,"negotiation error, closing connection",slog.Error(err))
165+
152166
d,_:=json.Marshal(&BrokerMessage{
153167
Error:err.Error(),
154168
})
@@ -163,13 +177,17 @@ func (l *listener) negotiate(conn net.Conn) {
163177
}
164178
)
165179

180+
ctx=slog.With(ctx,slog.F("conn_id",id))
181+
l.log.Info(ctx,"accepted new session from broker connection, negotiating")
182+
166183
for {
167184
varmsgBrokerMessage
168185
err=decoder.Decode(&msg)
169186
iferr!=nil {
170187
closeError(err)
171188
return
172189
}
190+
l.log.Debug(ctx,"received broker message",slog.F("msg",msg))
173191

174192
ifmsg.Candidate!="" {
175193
c:= webrtc.ICECandidateInit{
@@ -181,6 +199,7 @@ func (l *listener) negotiate(conn net.Conn) {
181199
continue
182200
}
183201

202+
l.log.Debug(ctx,"adding ICE candidate",slog.F("c",c))
184203
err=rtc.AddICECandidate(c)
185204
iferr!=nil {
186205
closeError(fmt.Errorf("accept ice candidate: %w",err))
@@ -199,12 +218,15 @@ func (l *listener) negotiate(conn net.Conn) {
199218
// so it will not validate.
200219
continue
201220
}
221+
222+
l.log.Debug(ctx,"validating ICE server",slog.F("s",server))
202223
err=DialICE(server,nil)
203224
iferr!=nil {
204225
closeError(fmt.Errorf("dial server %+v: %w",server.URLs,err))
205226
return
206227
}
207228
}
229+
208230
varturnProxy proxy.Dialer
209231
ifmsg.TURNProxyURL!="" {
210232
u,err:=url.Parse(msg.TURNProxyURL)
@@ -223,47 +245,58 @@ func (l *listener) negotiate(conn net.Conn) {
223245
return
224246
}
225247
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
248+
l.log.Debug(ctx,"connection state change",slog.F("state",pcs.String()))
226249
ifpcs==webrtc.PeerConnectionStateConnecting {
227250
return
228251
}
229252
_=conn.Close()
230253
})
254+
231255
flushCandidates:=proxyICECandidates(rtc,conn)
232256
l.connClosersMut.Lock()
233257
l.connClosers=append(l.connClosers,rtc)
234258
l.connClosersMut.Unlock()
235-
rtc.OnDataChannel(l.handle(msg))
259+
rtc.OnDataChannel(l.handle(ctx,msg))
260+
261+
l.log.Debug(ctx,"set remote description",slog.F("offer",*msg.Offer))
236262
err=rtc.SetRemoteDescription(*msg.Offer)
237263
iferr!=nil {
238264
closeError(fmt.Errorf("apply offer: %w",err))
239265
return
240266
}
267+
241268
answer,err:=rtc.CreateAnswer(nil)
242269
iferr!=nil {
243270
closeError(fmt.Errorf("create answer: %w",err))
244271
return
245272
}
273+
274+
l.log.Debug(ctx,"set local description",slog.F("answer",answer))
246275
err=rtc.SetLocalDescription(answer)
247276
iferr!=nil {
248277
closeError(fmt.Errorf("set local answer: %w",err))
249278
return
250279
}
251280
flushCandidates()
252281

253-
data,err:=json.Marshal(&BrokerMessage{
282+
bmsg:=&BrokerMessage{
254283
Answer:rtc.LocalDescription(),
255-
})
284+
}
285+
data,err:=json.Marshal(bmsg)
256286
iferr!=nil {
257287
closeError(fmt.Errorf("marshal: %w",err))
258288
return
259289
}
290+
291+
l.log.Debug(ctx,"writing message",slog.F("msg",bmsg))
260292
_,err=conn.Write(data)
261293
iferr!=nil {
262294
closeError(fmt.Errorf("write: %w",err))
263295
return
264296
}
265297

266298
for_,candidate:=rangependingCandidates {
299+
l.log.Debug(ctx,"adding pending ICE candidate",slog.F("c",candidate))
267300
err=rtc.AddICECandidate(candidate)
268301
iferr!=nil {
269302
closeError(fmt.Errorf("add pending candidate: %w",err))
@@ -275,19 +308,25 @@ func (l *listener) negotiate(conn net.Conn) {
275308
}
276309
}
277310

278-
func (l*listener)handle(msgBrokerMessage)func(dc*webrtc.DataChannel) {
311+
// nolint:gocognit
312+
func (l*listener)handle(ctx context.Context,msgBrokerMessage)func(dc*webrtc.DataChannel) {
279313
returnfunc(dc*webrtc.DataChannel) {
280314
ifdc.Protocol()==controlChannel {
281315
// The control channel handles pings.
282316
dc.OnOpen(func() {
317+
l.log.Debug(ctx,"control channel open")
283318
rw,err:=dc.Detach()
284319
iferr!=nil {
285320
return
286321
}
287322
// We'll read and write back a single byte for ping/pongin'.
288323
d:=make([]byte,1)
289324
for {
325+
l.log.Debug(ctx,"sending ping")
290326
_,err=rw.Read(d)
327+
iferr!=nil {
328+
l.log.Debug(ctx,"reading ping response failed",slog.Error(err))
329+
}
291330
iferrors.Is(err,io.EOF) {
292331
return
293332
}
@@ -300,25 +339,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
300339
return
301340
}
302341

342+
ctx=slog.With(ctx,
343+
slog.F("dc_id",dc.ID()),
344+
slog.F("dc_label",dc.Label()),
345+
slog.F("dc_proto",dc.Protocol()),
346+
)
347+
303348
dc.OnOpen(func() {
349+
l.log.Info(ctx,"data channel opened")
304350
rw,err:=dc.Detach()
305351
iferr!=nil {
306352
return
307353
}
308354

309355
varinitDialChannelResponse
310356
sendInitMessage:=func() {
357+
l.log.Debug(ctx,"sending dc init message",slog.F("msg",init))
311358
initData,err:=json.Marshal(&init)
312359
iferr!=nil {
360+
l.log.Debug(ctx,"failed to marshal dc init message",slog.Error(err))
313361
rw.Close()
314362
return
315363
}
316364
_,err=rw.Write(initData)
317365
iferr!=nil {
366+
l.log.Debug(ctx,"failed to write dc init message",slog.Error(err))
318367
return
319368
}
320369
ifinit.Err!="" {
321370
// If an error occurred, we're safe to close the connection.
371+
l.log.Debug(ctx,"closing data channel due to error",slog.F("msg",init.Err))
322372
dc.Close()
323373
return
324374
}
@@ -336,8 +386,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336386
return
337387
}
338388

389+
l.log.Debug(ctx,"dialing remote address",slog.F("network",network),slog.F("addr",addr))
339390
nc,err:=net.Dial(network,addr)
340391
iferr!=nil {
392+
l.log.Debug(ctx,"failed to dial remote address")
341393
init.Code=CodeDialErr
342394
init.Err=err.Error()
343395
ifop,ok:=err.(*net.OpError);ok {
@@ -349,8 +401,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
349401
ifinit.Err!="" {
350402
return
351403
}
404+
352405
// Must wrap the data channel inside this connection
353406
// for buffering from the dialed endpoint to the client.
407+
l.log.Debug(ctx,"data channel initialized, tunnelling")
354408
co:=&dataChannelConn{
355409
addr:nil,
356410
dc:dc,
@@ -370,6 +424,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
370424

371425
// Close closes the broker socket and all created RTC connections.
372426
func (l*listener)Close()error {
427+
l.log.Info(context.Background(),"listener closed")
428+
373429
l.connClosersMut.Lock()
374430
for_,connCloser:=rangel.connClosers {
375431
// We can ignore the error here... it doesn't

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp