Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Aug 30, 2024. It is now read-only.
/coder-v1-cliPublic archive

Commit27567c4

Browse files
committed
Add more logging to wsnet listener
1 parent38b6e71 commit27567c4

File tree

1 file changed

+62
-6
lines changed

1 file changed

+62
-6
lines changed

‎wsnet/listen.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/hashicorp/yamux"
@@ -60,6 +61,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, tcpProxy proxy.
6061
for {
6162
err:=<-ch
6263
iferrors.Is(err,io.EOF)||errors.Is(err,yamux.ErrKeepAliveTimeout) {
64+
l.log.Warn(ctx,"disconnected from broker",slog.Error(err))
65+
6366
// If we hit an EOF, then the connection to the broker
6467
// was interrupted. We'll take a short break then dial
6568
// again.
@@ -96,12 +99,16 @@ type listener struct {
9699
ws*websocket.Conn
97100
connClosers []io.Closer
98101
connClosersMut sync.Mutex
102+
103+
nextConnNumberint64
99104
}
100105

101106
func (l*listener)dial(ctx context.Context) (<-chanerror,error) {
107+
l.log.Info(ctx,"connecting to broker",slog.F("broker_url",l.broker))
102108
ifl.ws!=nil {
103109
_=l.ws.Close(websocket.StatusNormalClosure,"new connection inbound")
104110
}
111+
105112
conn,resp,err:=websocket.Dial(ctx,l.broker,nil)
106113
iferr!=nil {
107114
ifresp!=nil {
@@ -110,13 +117,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
110117
returnnil,err
111118
}
112119
l.ws=conn
120+
113121
nconn:=websocket.NetConn(ctx,conn,websocket.MessageBinary)
114122
config:=yamux.DefaultConfig()
115123
config.LogOutput=io.Discard
116124
session,err:=yamux.Server(nconn,config)
117125
iferr!=nil {
118126
returnnil,fmt.Errorf("create multiplex: %w",err)
119127
}
128+
129+
l.log.Info(ctx,"broker connection established")
120130
errCh:=make(chanerror)
121131
gofunc() {
122132
deferclose(errCh)
@@ -126,19 +136,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
126136
errCh<-err
127137
break
128138
}
129-
gol.negotiate(conn)
139+
gol.negotiate(ctx,conn)
130140
}
131141
}()
142+
132143
returnerrCh,nil
133144
}
134145

135146
// Negotiates the handshake protocol over the connection provided.
136147
// This functions control-flow is important to readability,
137148
// so the cognitive overload linter has been disabled.
138149
// nolint:gocognit,nestif
139-
func (l*listener)negotiate(conn net.Conn) {
150+
func (l*listener)negotiate(ctx context.Context,conn net.Conn) {
140151
var (
141152
errerror
153+
id=atomic.AddInt64(&l.nextConnNumber,1)
142154
decoder=json.NewDecoder(conn)
143155
rtc*webrtc.PeerConnection
144156
// If candidates are sent before an offer, we place them here.
@@ -148,6 +160,8 @@ func (l *listener) negotiate(conn net.Conn) {
148160
// Sends the error provided then closes the connection.
149161
// If RTC isn't connected, we'll close it.
150162
closeError=func(errerror) {
163+
l.log.Warn(ctx,"negotiation error, closing connection",slog.Error(err))
164+
151165
d,_:=json.Marshal(&BrokerMessage{
152166
Error:err.Error(),
153167
})
@@ -162,13 +176,17 @@ func (l *listener) negotiate(conn net.Conn) {
162176
}
163177
)
164178

179+
ctx=slog.With(ctx,slog.F("conn_id",id))
180+
l.log.Info(ctx,"accepted new session from broker connection, negotiating")
181+
165182
for {
166183
varmsgBrokerMessage
167184
err=decoder.Decode(&msg)
168185
iferr!=nil {
169186
closeError(err)
170187
return
171188
}
189+
l.log.Debug(ctx,"received broker message",slog.F("msg",msg))
172190

173191
ifmsg.Candidate!="" {
174192
c:= webrtc.ICECandidateInit{
@@ -180,6 +198,7 @@ func (l *listener) negotiate(conn net.Conn) {
180198
continue
181199
}
182200

201+
l.log.Debug(ctx,"adding ICE candidate",slog.F("c",c))
183202
err=rtc.AddICECandidate(c)
184203
iferr!=nil {
185204
closeError(fmt.Errorf("accept ice candidate: %w",err))
@@ -198,59 +217,73 @@ func (l *listener) negotiate(conn net.Conn) {
198217
// so it will not validate.
199218
continue
200219
}
220+
221+
l.log.Debug(ctx,"validating ICE server",slog.F("s",server))
201222
err=DialICE(server,nil)
202223
iferr!=nil {
203224
closeError(fmt.Errorf("dial server %+v: %w",server.URLs,err))
204225
return
205226
}
206227
}
228+
207229
rtc,err=newPeerConnection(msg.Servers,l.tcpProxy)
208230
iferr!=nil {
209231
closeError(err)
210232
return
211233
}
212234
rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
235+
l.log.Debug(ctx,"connection state change",slog.F("state",pcs.String()))
213236
ifpcs==webrtc.PeerConnectionStateConnecting {
214237
return
215238
}
216239
_=conn.Close()
217240
})
241+
218242
flushCandidates:=proxyICECandidates(rtc,conn)
219243
l.connClosersMut.Lock()
220244
l.connClosers=append(l.connClosers,rtc)
221245
l.connClosersMut.Unlock()
222-
rtc.OnDataChannel(l.handle(msg))
246+
rtc.OnDataChannel(l.handle(ctx,msg))
247+
248+
l.log.Debug(ctx,"set remote description",slog.F("offer",*msg.Offer))
223249
err=rtc.SetRemoteDescription(*msg.Offer)
224250
iferr!=nil {
225251
closeError(fmt.Errorf("apply offer: %w",err))
226252
return
227253
}
254+
228255
answer,err:=rtc.CreateAnswer(nil)
229256
iferr!=nil {
230257
closeError(fmt.Errorf("create answer: %w",err))
231258
return
232259
}
260+
261+
l.log.Debug(ctx,"set local description",slog.F("answer",answer))
233262
err=rtc.SetLocalDescription(answer)
234263
iferr!=nil {
235264
closeError(fmt.Errorf("set local answer: %w",err))
236265
return
237266
}
238267
flushCandidates()
239268

240-
data,err:=json.Marshal(&BrokerMessage{
269+
bmsg:=&BrokerMessage{
241270
Answer:rtc.LocalDescription(),
242-
})
271+
}
272+
data,err:=json.Marshal(bmsg)
243273
iferr!=nil {
244274
closeError(fmt.Errorf("marshal: %w",err))
245275
return
246276
}
277+
278+
l.log.Debug(ctx,"writing message",slog.F("msg",bmsg))
247279
_,err=conn.Write(data)
248280
iferr!=nil {
249281
closeError(fmt.Errorf("write: %w",err))
250282
return
251283
}
252284

253285
for_,candidate:=rangependingCandidates {
286+
l.log.Debug(ctx,"adding pending ICE candidate",slog.F("c",candidate))
254287
err=rtc.AddICECandidate(candidate)
255288
iferr!=nil {
256289
closeError(fmt.Errorf("add pending candidate: %w",err))
@@ -262,19 +295,25 @@ func (l *listener) negotiate(conn net.Conn) {
262295
}
263296
}
264297

265-
func (l*listener)handle(msgBrokerMessage)func(dc*webrtc.DataChannel) {
298+
// nolint:gocognit
299+
func (l*listener)handle(ctx context.Context,msgBrokerMessage)func(dc*webrtc.DataChannel) {
266300
returnfunc(dc*webrtc.DataChannel) {
267301
ifdc.Protocol()==controlChannel {
268302
// The control channel handles pings.
269303
dc.OnOpen(func() {
304+
l.log.Debug(ctx,"control channel open")
270305
rw,err:=dc.Detach()
271306
iferr!=nil {
272307
return
273308
}
274309
// We'll read and write back a single byte for ping/pongin'.
275310
d:=make([]byte,1)
276311
for {
312+
l.log.Debug(ctx,"sending ping")
277313
_,err=rw.Read(d)
314+
iferr!=nil {
315+
l.log.Debug(ctx,"reading ping response failed",slog.Error(err))
316+
}
278317
iferrors.Is(err,io.EOF) {
279318
return
280319
}
@@ -287,25 +326,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
287326
return
288327
}
289328

329+
ctx=slog.With(ctx,
330+
slog.F("dc_id",dc.ID()),
331+
slog.F("dc_label",dc.Label()),
332+
slog.F("dc_proto",dc.Protocol()),
333+
)
334+
290335
dc.OnOpen(func() {
336+
l.log.Info(ctx,"data channel opened")
291337
rw,err:=dc.Detach()
292338
iferr!=nil {
293339
return
294340
}
295341

296342
varinitDialChannelResponse
297343
sendInitMessage:=func() {
344+
l.log.Debug(ctx,"sending dc init message",slog.F("msg",init))
298345
initData,err:=json.Marshal(&init)
299346
iferr!=nil {
347+
l.log.Debug(ctx,"failed to marshal dc init message",slog.Error(err))
300348
rw.Close()
301349
return
302350
}
303351
_,err=rw.Write(initData)
304352
iferr!=nil {
353+
l.log.Debug(ctx,"failed to write dc init message",slog.Error(err))
305354
return
306355
}
307356
ifinit.Err!="" {
308357
// If an error occurred, we're safe to close the connection.
358+
l.log.Debug(ctx,"closing data channel due to error",slog.F("msg",init.Err))
309359
dc.Close()
310360
return
311361
}
@@ -323,8 +373,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
323373
return
324374
}
325375

376+
l.log.Debug(ctx,"dialing remote address",slog.F("network",network),slog.F("addr",addr))
326377
nc,err:=net.Dial(network,addr)
327378
iferr!=nil {
379+
l.log.Debug(ctx,"failed to dial remote address")
328380
init.Code=CodeDialErr
329381
init.Err=err.Error()
330382
ifop,ok:=err.(*net.OpError);ok {
@@ -336,8 +388,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336388
ifinit.Err!="" {
337389
return
338390
}
391+
339392
// Must wrap the data channel inside this connection
340393
// for buffering from the dialed endpoint to the client.
394+
l.log.Debug(ctx,"data channel initialized, tunnelling")
341395
co:=&dataChannelConn{
342396
addr:nil,
343397
dc:dc,
@@ -357,6 +411,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
357411

358412
// Close closes the broker socket and all created RTC connections.
359413
func (l*listener)Close()error {
414+
l.log.Info(context.Background(),"listener closed")
415+
360416
l.connClosersMut.Lock()
361417
for_,connCloser:=rangel.connClosers {
362418
// We can ignore the error here... it doesn't

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp