8
8
"io"
9
9
"net"
10
10
"sync"
11
+ "sync/atomic"
11
12
"time"
12
13
13
14
"github.com/hashicorp/yamux"
@@ -60,6 +61,8 @@ func Listen(ctx context.Context, log slog.Logger, broker string, tcpProxy proxy.
60
61
for {
61
62
err := <- ch
62
63
if errors .Is (err ,io .EOF )|| errors .Is (err ,yamux .ErrKeepAliveTimeout ) {
64
+ l .log .Warn (ctx ,"disconnected from broker" ,slog .Error (err ))
65
+
63
66
// If we hit an EOF, then the connection to the broker
64
67
// was interrupted. We'll take a short break then dial
65
68
// again.
@@ -96,12 +99,16 @@ type listener struct {
96
99
ws * websocket.Conn
97
100
connClosers []io.Closer
98
101
connClosersMut sync.Mutex
102
+
103
+ nextConnNumber int64
99
104
}
100
105
101
106
func (l * listener )dial (ctx context.Context ) (<- chan error ,error ) {
107
+ l .log .Info (ctx ,"connecting to broker" ,slog .F ("broker_url" ,l .broker ))
102
108
if l .ws != nil {
103
109
_ = l .ws .Close (websocket .StatusNormalClosure ,"new connection inbound" )
104
110
}
111
+
105
112
conn ,resp ,err := websocket .Dial (ctx ,l .broker ,nil )
106
113
if err != nil {
107
114
if resp != nil {
@@ -110,13 +117,16 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
110
117
return nil ,err
111
118
}
112
119
l .ws = conn
120
+
113
121
nconn := websocket .NetConn (ctx ,conn ,websocket .MessageBinary )
114
122
config := yamux .DefaultConfig ()
115
123
config .LogOutput = io .Discard
116
124
session ,err := yamux .Server (nconn ,config )
117
125
if err != nil {
118
126
return nil ,fmt .Errorf ("create multiplex: %w" ,err )
119
127
}
128
+
129
+ l .log .Info (ctx ,"broker connection established" )
120
130
errCh := make (chan error )
121
131
go func () {
122
132
defer close (errCh )
@@ -126,19 +136,21 @@ func (l *listener) dial(ctx context.Context) (<-chan error, error) {
126
136
errCh <- err
127
137
break
128
138
}
129
- go l .negotiate (conn )
139
+ go l .negotiate (ctx , conn )
130
140
}
131
141
}()
142
+
132
143
return errCh ,nil
133
144
}
134
145
135
146
// Negotiates the handshake protocol over the connection provided.
136
147
// This functions control-flow is important to readability,
137
148
// so the cognitive overload linter has been disabled.
138
149
// nolint:gocognit,nestif
139
- func (l * listener )negotiate (conn net.Conn ) {
150
+ func (l * listener )negotiate (ctx context. Context , conn net.Conn ) {
140
151
var (
141
152
err error
153
+ id = atomic .AddInt64 (& l .nextConnNumber ,1 )
142
154
decoder = json .NewDecoder (conn )
143
155
rtc * webrtc.PeerConnection
144
156
// If candidates are sent before an offer, we place them here.
@@ -148,6 +160,8 @@ func (l *listener) negotiate(conn net.Conn) {
148
160
// Sends the error provided then closes the connection.
149
161
// If RTC isn't connected, we'll close it.
150
162
closeError = func (err error ) {
163
+ l .log .Warn (ctx ,"negotiation error, closing connection" ,slog .Error (err ))
164
+
151
165
d ,_ := json .Marshal (& BrokerMessage {
152
166
Error :err .Error (),
153
167
})
@@ -162,13 +176,17 @@ func (l *listener) negotiate(conn net.Conn) {
162
176
}
163
177
)
164
178
179
+ ctx = slog .With (ctx ,slog .F ("conn_id" ,id ))
180
+ l .log .Info (ctx ,"accepted new session from broker connection, negotiating" )
181
+
165
182
for {
166
183
var msg BrokerMessage
167
184
err = decoder .Decode (& msg )
168
185
if err != nil {
169
186
closeError (err )
170
187
return
171
188
}
189
+ l .log .Debug (ctx ,"received broker message" ,slog .F ("msg" ,msg ))
172
190
173
191
if msg .Candidate != "" {
174
192
c := webrtc.ICECandidateInit {
@@ -180,6 +198,7 @@ func (l *listener) negotiate(conn net.Conn) {
180
198
continue
181
199
}
182
200
201
+ l .log .Debug (ctx ,"adding ICE candidate" ,slog .F ("c" ,c ))
183
202
err = rtc .AddICECandidate (c )
184
203
if err != nil {
185
204
closeError (fmt .Errorf ("accept ice candidate: %w" ,err ))
@@ -198,59 +217,73 @@ func (l *listener) negotiate(conn net.Conn) {
198
217
// so it will not validate.
199
218
continue
200
219
}
220
+
221
+ l .log .Debug (ctx ,"validating ICE server" ,slog .F ("s" ,server ))
201
222
err = DialICE (server ,nil )
202
223
if err != nil {
203
224
closeError (fmt .Errorf ("dial server %+v: %w" ,server .URLs ,err ))
204
225
return
205
226
}
206
227
}
228
+
207
229
rtc ,err = newPeerConnection (msg .Servers ,l .tcpProxy )
208
230
if err != nil {
209
231
closeError (err )
210
232
return
211
233
}
212
234
rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
235
+ l .log .Debug (ctx ,"connection state change" ,slog .F ("state" ,pcs .String ()))
213
236
if pcs == webrtc .PeerConnectionStateConnecting {
214
237
return
215
238
}
216
239
_ = conn .Close ()
217
240
})
241
+
218
242
flushCandidates := proxyICECandidates (rtc ,conn )
219
243
l .connClosersMut .Lock ()
220
244
l .connClosers = append (l .connClosers ,rtc )
221
245
l .connClosersMut .Unlock ()
222
- rtc .OnDataChannel (l .handle (msg ))
246
+ rtc .OnDataChannel (l .handle (ctx ,msg ))
247
+
248
+ l .log .Debug (ctx ,"set remote description" ,slog .F ("offer" ,* msg .Offer ))
223
249
err = rtc .SetRemoteDescription (* msg .Offer )
224
250
if err != nil {
225
251
closeError (fmt .Errorf ("apply offer: %w" ,err ))
226
252
return
227
253
}
254
+
228
255
answer ,err := rtc .CreateAnswer (nil )
229
256
if err != nil {
230
257
closeError (fmt .Errorf ("create answer: %w" ,err ))
231
258
return
232
259
}
260
+
261
+ l .log .Debug (ctx ,"set local description" ,slog .F ("answer" ,answer ))
233
262
err = rtc .SetLocalDescription (answer )
234
263
if err != nil {
235
264
closeError (fmt .Errorf ("set local answer: %w" ,err ))
236
265
return
237
266
}
238
267
flushCandidates ()
239
268
240
- data , err := json . Marshal ( & BrokerMessage {
269
+ bmsg := & BrokerMessage {
241
270
Answer :rtc .LocalDescription (),
242
- })
271
+ }
272
+ data ,err := json .Marshal (bmsg )
243
273
if err != nil {
244
274
closeError (fmt .Errorf ("marshal: %w" ,err ))
245
275
return
246
276
}
277
+
278
+ l .log .Debug (ctx ,"writing message" ,slog .F ("msg" ,bmsg ))
247
279
_ ,err = conn .Write (data )
248
280
if err != nil {
249
281
closeError (fmt .Errorf ("write: %w" ,err ))
250
282
return
251
283
}
252
284
253
285
for _ ,candidate := range pendingCandidates {
286
+ l .log .Debug (ctx ,"adding pending ICE candidate" ,slog .F ("c" ,candidate ))
254
287
err = rtc .AddICECandidate (candidate )
255
288
if err != nil {
256
289
closeError (fmt .Errorf ("add pending candidate: %w" ,err ))
@@ -262,19 +295,25 @@ func (l *listener) negotiate(conn net.Conn) {
262
295
}
263
296
}
264
297
265
- func (l * listener )handle (msg BrokerMessage )func (dc * webrtc.DataChannel ) {
298
+ // nolint:gocognit
299
+ func (l * listener )handle (ctx context.Context ,msg BrokerMessage )func (dc * webrtc.DataChannel ) {
266
300
return func (dc * webrtc.DataChannel ) {
267
301
if dc .Protocol ()== controlChannel {
268
302
// The control channel handles pings.
269
303
dc .OnOpen (func () {
304
+ l .log .Debug (ctx ,"control channel open" )
270
305
rw ,err := dc .Detach ()
271
306
if err != nil {
272
307
return
273
308
}
274
309
// We'll read and write back a single byte for ping/pongin'.
275
310
d := make ([]byte ,1 )
276
311
for {
312
+ l .log .Debug (ctx ,"sending ping" )
277
313
_ ,err = rw .Read (d )
314
+ if err != nil {
315
+ l .log .Debug (ctx ,"reading ping response failed" ,slog .Error (err ))
316
+ }
278
317
if errors .Is (err ,io .EOF ) {
279
318
return
280
319
}
@@ -287,25 +326,36 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
287
326
return
288
327
}
289
328
329
+ ctx = slog .With (ctx ,
330
+ slog .F ("dc_id" ,dc .ID ()),
331
+ slog .F ("dc_label" ,dc .Label ()),
332
+ slog .F ("dc_proto" ,dc .Protocol ()),
333
+ )
334
+
290
335
dc .OnOpen (func () {
336
+ l .log .Info (ctx ,"data channel opened" )
291
337
rw ,err := dc .Detach ()
292
338
if err != nil {
293
339
return
294
340
}
295
341
296
342
var init DialChannelResponse
297
343
sendInitMessage := func () {
344
+ l .log .Debug (ctx ,"sending dc init message" ,slog .F ("msg" ,init ))
298
345
initData ,err := json .Marshal (& init )
299
346
if err != nil {
347
+ l .log .Debug (ctx ,"failed to marshal dc init message" ,slog .Error (err ))
300
348
rw .Close ()
301
349
return
302
350
}
303
351
_ ,err = rw .Write (initData )
304
352
if err != nil {
353
+ l .log .Debug (ctx ,"failed to write dc init message" ,slog .Error (err ))
305
354
return
306
355
}
307
356
if init .Err != "" {
308
357
// If an error occurred, we're safe to close the connection.
358
+ l .log .Debug (ctx ,"closing data channel due to error" ,slog .F ("msg" ,init .Err ))
309
359
dc .Close ()
310
360
return
311
361
}
@@ -323,8 +373,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
323
373
return
324
374
}
325
375
376
+ l .log .Debug (ctx ,"dialing remote address" ,slog .F ("network" ,network ),slog .F ("addr" ,addr ))
326
377
nc ,err := net .Dial (network ,addr )
327
378
if err != nil {
379
+ l .log .Debug (ctx ,"failed to dial remote address" )
328
380
init .Code = CodeDialErr
329
381
init .Err = err .Error ()
330
382
if op ,ok := err .(* net.OpError );ok {
@@ -336,8 +388,10 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
336
388
if init .Err != "" {
337
389
return
338
390
}
391
+
339
392
// Must wrap the data channel inside this connection
340
393
// for buffering from the dialed endpoint to the client.
394
+ l .log .Debug (ctx ,"data channel initialized, tunnelling" )
341
395
co := & dataChannelConn {
342
396
addr :nil ,
343
397
dc :dc ,
@@ -357,6 +411,8 @@ func (l *listener) handle(msg BrokerMessage) func(dc *webrtc.DataChannel) {
357
411
358
412
// Close closes the broker socket and all created RTC connections.
359
413
func (l * listener )Close ()error {
414
+ l .log .Info (context .Background (),"listener closed" )
415
+
360
416
l .connClosersMut .Lock ()
361
417
for _ ,connCloser := range l .connClosers {
362
418
// We can ignore the error here... it doesn't