9
9
"net"
10
10
"net/url"
11
11
"sync"
12
+ "sync/atomic"
12
13
"time"
13
14
14
15
"github.com/hashicorp/yamux"
@@ -61,6 +62,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, turnProxyAuthTo
61
62
for {
62
63
err := <- ch
63
64
if errors .Is (err ,io .EOF )|| errors .Is (err ,yamux .ErrKeepAliveTimeout ) {
65
+ l .log .Warn (ctx ,"disconnected from broker" ,slog .Error (err ))
66
+
64
67
// If we hit an EOF, then the connection to the broker
65
68
// was interrupted. We'll take a short break then dial
66
69
// again.
@@ -97,12 +100,16 @@ type listener struct {
97
100
ws * websocket.Conn
98
101
connClosers []io.Closer
99
102
connClosersMut sync.Mutex
103
+
104
+ nextConnNumber int64
100
105
}
101
106
102
107
func (l * listener )dial (ctx context.Context ) (<- chan error ,error ) {
108
+ l .log .Info (ctx ,"connecting to broker" ,slog .F ("broker_url" ,l .broker ))
103
109
if l .ws != nil {
104
110
_ = l .ws .Close (websocket .StatusNormalClosure ,"new connection inbound" )
105
111
}
112
+
106
113
conn ,resp ,err := websocket .Dial (ctx ,l .broker ,nil )
107
114
if err != nil {
108
115
if resp != nil {
@@ -111,13 +118,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
111
118
return nil ,err
112
119
}
113
120
l .ws = conn
121
+
114
122
nconn := websocket .NetConn (ctx ,conn ,websocket .MessageBinary )
115
123
config := yamux .DefaultConfig ()
116
124
config .LogOutput = io .Discard
117
125
session ,err := yamux .Server (nconn ,config )
118
126
if err != nil {
119
127
return nil ,fmt .Errorf ("create multiplex: %w" ,err )
120
128
}
129
+
130
+ l .log .Info (ctx ,"broker connection established" )
121
131
errCh := make (chan error )
122
132
go func () {
123
133
defer close (errCh )
@@ -127,19 +137,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
127
137
errCh <- err
128
138
break
129
139
}
130
- go l .negotiate (conn )
140
+ go l .negotiate (ctx , conn )
131
141
}
132
142
}()
143
+
133
144
return errCh ,nil
134
145
}
135
146
136
147
// Negotiates the handshake protocol over the connection provided.
137
148
// This functions control-flow is important to readability,
138
149
// so the cognitive overload linter has been disabled.
139
150
// nolint:gocognit,nestif
140
- func (l * listener )negotiate (conn net.Conn ) {
151
+ func (l * listener )negotiate (ctx context. Context , conn net.Conn ) {
141
152
var (
142
153
err error
154
+ id = atomic .AddInt64 (& l .nextConnNumber ,1 )
143
155
decoder = json .NewDecoder (conn )
144
156
rtc * webrtc.PeerConnection
145
157
// If candidates are sent before an offer, we place them here.
@@ -149,6 +161,8 @@ func (l *listener) negotiate(conn net.Conn) {
149
161
// Sends the error provided then closes the connection.
150
162
// If RTC isn't connected, we'll close it.
151
163
closeError = func (err error ) {
164
+ l .log .Warn (ctx ,"negotiation error, closing connection" ,slog .Error (err ))
165
+
152
166
d ,_ := json .Marshal (& BrokerMessage {
153
167
Error :err .Error (),
154
168
})
@@ -163,13 +177,17 @@ func (l *listener) negotiate(conn net.Conn) {
163
177
}
164
178
)
165
179
180
+ ctx = slog .With (ctx ,slog .F ("conn_id" ,id ))
181
+ l .log .Info (ctx ,"accepted new session from broker connection, negotiating" )
182
+
166
183
for {
167
184
var msg BrokerMessage
168
185
err = decoder .Decode (& msg )
169
186
if err != nil {
170
187
closeError (err )
171
188
return
172
189
}
190
+ l .log .Debug (ctx ,"received broker message" ,slog .F ("msg" ,msg ))
173
191
174
192
if msg .Candidate != "" {
175
193
c := webrtc.ICECandidateInit {
@@ -181,6 +199,7 @@ func (l *listener) negotiate(conn net.Conn) {
181
199
continue
182
200
}
183
201
202
+ l .log .Debug (ctx ,"adding ICE candidate" ,slog .F ("c" ,c ))
184
203
err = rtc .AddICECandidate (c )
185
204
if err != nil {
186
205
closeError (fmt .Errorf ("accept ice candidate: %w" ,err ))
@@ -199,12 +218,15 @@ func (l *listener) negotiate(conn net.Conn) {
199
218
// so it will not validate.
200
219
continue
201
220
}
221
+
222
+ l .log .Debug (ctx ,"validating ICE server" ,slog .F ("s" ,server ))
202
223
err = DialICE (server ,nil )
203
224
if err != nil {
204
225
closeError (fmt .Errorf ("dial server %+v: %w" ,server .URLs ,err ))
205
226
return
206
227
}
207
228
}
229
+
208
230
var turnProxy proxy.Dialer
209
231
if msg .TURNProxyURL != "" {
210
232
u ,err := url .Parse (msg .TURNProxyURL )
@@ -223,47 +245,58 @@ func (l *listener) negotiate(conn net.Conn) {
223
245
return
224
246
}
225
247
rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
248
+ l .log .Debug (ctx ,"connection state change" ,slog .F ("state" ,pcs .String ()))
226
249
if pcs == webrtc .PeerConnectionStateConnecting {
227
250
return
228
251
}
229
252
_ = conn .Close ()
230
253
})
254
+
231
255
flushCandidates := proxyICECandidates (rtc ,conn )
232
256
l .connClosersMut .Lock ()
233
257
l .connClosers = append (l .connClosers ,rtc )
234
258
l .connClosersMut .Unlock ()
235
- rtc .OnDataChannel (l .handle (msg ))
259
+ rtc .OnDataChannel (l .handle (ctx ,msg ))
260
+
261
+ l .log .Debug (ctx ,"set remote description" ,slog .F ("offer" ,* msg .Offer ))
236
262
err = rtc .SetRemoteDescription (* msg .Offer )
237
263
if err != nil {
238
264
closeError (fmt .Errorf ("apply offer: %w" ,err ))
239
265
return
240
266
}
267
+
241
268
answer ,err := rtc .CreateAnswer (nil )
242
269
if err != nil {
243
270
closeError (fmt .Errorf ("create answer: %w" ,err ))
244
271
return
245
272
}
273
+
274
+ l .log .Debug (ctx ,"set local description" ,slog .F ("answer" ,answer ))
246
275
err = rtc .SetLocalDescription (answer )
247
276
if err != nil {
248
277
closeError (fmt .Errorf ("set local answer: %w" ,err ))
249
278
return
250
279
}
251
280
flushCandidates ()
252
281
253
- data , err := json . Marshal ( & BrokerMessage {
282
+ bmsg := & BrokerMessage {
254
283
Answer :rtc .LocalDescription (),
255
- })
284
+ }
285
+ data ,err := json .Marshal (bmsg )
256
286
if err != nil {
257
287
closeError (fmt .Errorf ("marshal: %w" ,err ))
258
288
return
259
289
}
290
+
291
+ l .log .Debug (ctx ,"writing message" ,slog .F ("msg" ,bmsg ))
260
292
_ ,err = conn .Write (data )
261
293
if err != nil {
262
294
closeError (fmt .Errorf ("write: %w" ,err ))
263
295
return
264
296
}
265
297
266
298
for _ ,candidate := range pendingCandidates {
299
+ l .log .Debug (ctx ,"adding pending ICE candidate" ,slog .F ("c" ,candidate ))
267
300
err = rtc .AddICECandidate (candidate )
268
301
if err != nil {
269
302
closeError (fmt .Errorf ("add pending candidate: %w" ,err ))
@@ -275,19 +308,25 @@ func (l *listener) negotiate(conn net.Conn) {
275
308
}
276
309
}
277
310
278
- func (l * listener )handle (msg BrokerMessage )func (dc * webrtc.DataChannel ) {
311
+ // nolint:gocognit
312
+ func (l * listener )handle (ctx context.Context ,msg BrokerMessage )func (dc * webrtc.DataChannel ) {
279
313
return func (dc * webrtc.DataChannel ) {
280
314
if dc .Protocol ()== controlChannel {
281
315
// The control channel handles pings.
282
316
dc .OnOpen (func () {
317
+ l .log .Debug (ctx ,"control channel open" )
283
318
rw ,err := dc .Detach ()
284
319
if err != nil {
285
320
return
286
321
}
287
322
// We'll read and write back a single byte for ping/pongin'.
288
323
d := make ([]byte ,1 )
289
324
for {
325
+ l .log .Debug (ctx ,"sending ping" )
290
326
_ ,err = rw .Read (d )
327
+ if err != nil {
328
+ l .log .Debug (ctx ,"reading ping response failed" ,slog .Error (err ))
329
+ }
291
330
if errors .Is (err ,io .EOF ) {
292
331
return
293
332
}
@@ -300,25 +339,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
300
339
return
301
340
}
302
341
342
+ ctx = slog .With (ctx ,
343
+ slog .F ("dc_id" ,dc .ID ()),
344
+ slog .F ("dc_label" ,dc .Label ()),
345
+ slog .F ("dc_proto" ,dc .Protocol ()),
346
+ )
347
+
303
348
dc .OnOpen (func () {
349
+ l .log .Info (ctx ,"data channel opened" )
304
350
rw ,err := dc .Detach ()
305
351
if err != nil {
306
352
return
307
353
}
308
354
309
355
var init DialChannelResponse
310
356
sendInitMessage := func () {
357
+ l .log .Debug (ctx ,"sending dc init message" ,slog .F ("msg" ,init ))
311
358
initData ,err := json .Marshal (& init )
312
359
if err != nil {
360
+ l .log .Debug (ctx ,"failed to marshal dc init message" ,slog .Error (err ))
313
361
rw .Close ()
314
362
return
315
363
}
316
364
_ ,err = rw .Write (initData )
317
365
if err != nil {
366
+ l .log .Debug (ctx ,"failed to write dc init message" ,slog .Error (err ))
318
367
return
319
368
}
320
369
if init .Err != "" {
321
370
// If an error occurred, we're safe to close the connection.
371
+ l .log .Debug (ctx ,"closing data channel due to error" ,slog .F ("msg" ,init .Err ))
322
372
dc .Close ()
323
373
return
324
374
}
@@ -336,8 +386,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336
386
return
337
387
}
338
388
389
+ l .log .Debug (ctx ,"dialing remote address" ,slog .F ("network" ,network ),slog .F ("addr" ,addr ))
339
390
nc ,err := net .Dial (network ,addr )
340
391
if err != nil {
392
+ l .log .Debug (ctx ,"failed to dial remote address" )
341
393
init .Code = CodeDialErr
342
394
init .Err = err .Error ()
343
395
if op ,ok := err .(* net.OpError );ok {
@@ -349,8 +401,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
349
401
if init .Err != "" {
350
402
return
351
403
}
404
+
352
405
// Must wrap the data channel inside this connection
353
406
// for buffering from the dialed endpoint to the client.
407
+ l .log .Debug (ctx ,"data channel initialized, tunnelling" )
354
408
co := & dataChannelConn {
355
409
addr :nil ,
356
410
dc :dc ,
@@ -370,6 +424,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
370
424
371
425
// Close closes the broker socket and all created RTC connections.
372
426
func (l * listener )Close ()error {
427
+ l .log .Info (context .Background (),"listener closed" )
428
+
373
429
l .connClosersMut .Lock ()
374
430
for _ ,connCloser := range l .connClosers {
375
431
// We can ignore the error here... it doesn't