8
8
"io"
9
9
"net"
10
10
"net/url"
11
+ "os"
11
12
"sync"
12
13
"time"
13
14
@@ -16,11 +17,18 @@ import (
16
17
"golang.org/x/net/proxy"
17
18
"nhooyr.io/websocket"
18
19
20
+ "cdr.dev/slog"
21
+ "cdr.dev/slog/sloggers/sloghuman"
22
+
19
23
"cdr.dev/coder-cli/coder-sdk"
20
24
)
21
25
22
26
// DialOptions are configurable options for a wsnet connection.
23
27
type DialOptions struct {
28
+ // Logger is an optional logger to use for logging mostly debug messages. If
29
+ // set to nil, nothing will be logged.
30
+ Log * slog.Logger
31
+
24
32
// ICEServers is an array of STUN or TURN servers to use for negotiation purposes.
25
33
// See: https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration/iceServers
26
34
ICEServers []webrtc.ICEServer
@@ -36,6 +44,17 @@ type DialOptions struct {
36
44
37
45
// DialWebsocket dials the broker with a WebSocket and negotiates a connection.
38
46
func DialWebsocket (ctx context.Context ,broker string ,netOpts * DialOptions ,wsOpts * websocket.DialOptions ) (* Dialer ,error ) {
47
+ if netOpts == nil {
48
+ netOpts = & DialOptions {}
49
+ }
50
+ if netOpts .Log == nil {
51
+ // This logger will log nothing.
52
+ log := slog .Make ()
53
+ netOpts .Log = & log
54
+ }
55
+ log := * netOpts .Log
56
+
57
+ log .Debug (ctx ,"connecting to broker" ,slog .F ("broker" ,broker ))
39
58
conn ,resp ,err := websocket .Dial (ctx ,broker ,wsOpts )
40
59
if err != nil {
41
60
if resp != nil {
@@ -46,6 +65,8 @@ func DialWebsocket(ctx context.Context, broker string, netOpts *DialOptions, wsO
46
65
}
47
66
return nil ,fmt .Errorf ("dial websocket: %w" ,err )
48
67
}
68
+ log .Debug (ctx ,"connected to broker" )
69
+
49
70
nconn := websocket .NetConn (ctx ,conn ,websocket .MessageBinary )
50
71
defer func () {
51
72
_ = nconn .Close ()
@@ -60,6 +81,11 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
60
81
if options == nil {
61
82
options = & DialOptions {}
62
83
}
84
+ if options .Log == nil {
85
+ log := slog .Make (sloghuman .Sink (os .Stderr )).Leveled (slog .LevelInfo ).Named ("wsnet_dial" )
86
+ options .Log = & log
87
+ }
88
+ log := * options .Log
63
89
if options .ICEServers == nil {
64
90
options .ICEServers = []webrtc.ICEServer {}
65
91
}
@@ -71,13 +97,20 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
71
97
token :options .TURNProxyAuthToken ,
72
98
}
73
99
}
100
+
101
+ log .Debug (ctx ,"creating peer connection" ,slog .F ("options" ,options ),slog .F ("turn_proxy" ,turnProxy ))
74
102
rtc ,err := newPeerConnection (options .ICEServers ,turnProxy )
75
103
if err != nil {
76
104
return nil ,fmt .Errorf ("create peer connection: %w" ,err )
77
105
}
106
+ log .Debug (ctx ,"created peer connection" )
107
+ rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
108
+ log .Debug (ctx ,"connection state change" ,slog .F ("state" ,pcs .String ()))
109
+ })
78
110
79
111
flushCandidates := proxyICECandidates (rtc ,conn )
80
112
113
+ log .Debug (ctx ,"creating control channel" ,slog .F ("proto" ,controlChannel ))
81
114
ctrl ,err := rtc .CreateDataChannel (controlChannel ,& webrtc.DataChannelInit {
82
115
Protocol :stringPtr (controlChannel ),
83
116
Ordered :boolPtr (true ),
@@ -90,6 +123,7 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
90
123
if err != nil {
91
124
return nil ,fmt .Errorf ("create offer: %w" ,err )
92
125
}
126
+ log .Debug (ctx ,"created offer" ,slog .F ("offer" ,offer ))
93
127
err = rtc .SetLocalDescription (offer )
94
128
if err != nil {
95
129
return nil ,fmt .Errorf ("set local offer: %w" ,err )
@@ -100,21 +134,25 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
100
134
turnProxyURL = options .TURNProxyURL .String ()
101
135
}
102
136
103
- offerMessage , err := json . Marshal ( & BrokerMessage {
137
+ bmsg := BrokerMessage {
104
138
Offer :& offer ,
105
139
Servers :options .ICEServers ,
106
140
TURNProxyURL :turnProxyURL ,
107
- })
141
+ }
142
+ log .Debug (ctx ,"sending offer message" ,slog .F ("msg" ,bmsg ))
143
+ offerMessage ,err := json .Marshal (& bmsg )
108
144
if err != nil {
109
145
return nil ,fmt .Errorf ("marshal offer message: %w" ,err )
110
146
}
147
+
111
148
_ ,err = conn .Write (offerMessage )
112
149
if err != nil {
113
150
return nil ,fmt .Errorf ("write offer: %w" ,err )
114
151
}
115
152
flushCandidates ()
116
153
117
154
dialer := & Dialer {
155
+ log :log ,
118
156
conn :conn ,
119
157
ctrl :ctrl ,
120
158
rtc :rtc ,
@@ -128,6 +166,7 @@ func Dial(ctx context.Context, conn net.Conn, options *DialOptions) (*Dialer, er
128
166
// inside a workspace. The opposing end of the WebSocket messages
129
167
// should be proxied with a Listener.
130
168
type Dialer struct {
169
+ log slog.Logger
131
170
conn net.Conn
132
171
ctrl * webrtc.DataChannel
133
172
ctrlrw datachannel.ReadWriteCloser
@@ -152,20 +191,25 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
152
191
defer func () {
153
192
_ = d .conn .Close ()
154
193
}()
155
- err := waitForConnectionOpen (ctx ,d .rtc )
194
+
195
+ err := waitForConnectionOpen (context .Background (),d .rtc )
156
196
if err != nil {
197
+ d .log .Debug (ctx ,"negotiation error" ,slog .Error (err ))
157
198
if errors .Is (err ,context .DeadlineExceeded ) {
158
199
_ = d .conn .Close ()
159
200
}
160
- errCh <- err
201
+ errCh <- fmt . Errorf ( "wait for connection to open: %w" , err )
161
202
return
162
203
}
204
+
163
205
d .rtc .OnConnectionStateChange (func (pcs webrtc.PeerConnectionState ) {
164
206
if pcs == webrtc .PeerConnectionStateConnected {
207
+ d .log .Debug (ctx ,"connected" )
165
208
return
166
209
}
167
210
168
211
// Close connections opened when RTC was alive.
212
+ d .log .Warn (ctx ,"closing connections due to connection state change" ,slog .F ("pcs" ,pcs .String ()))
169
213
d .connClosersMut .Lock ()
170
214
defer d .connClosersMut .Unlock ()
171
215
for _ ,connCloser := range d .connClosers {
@@ -175,6 +219,7 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
175
219
})
176
220
}()
177
221
222
+ d .log .Debug (ctx ,"beginning negotiation" )
178
223
for {
179
224
var msg BrokerMessage
180
225
err = decoder .Decode (& msg )
@@ -184,6 +229,8 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
184
229
if err != nil {
185
230
return fmt .Errorf ("read: %w" ,err )
186
231
}
232
+ d .log .Debug (ctx ,"got message from handshake conn" ,slog .F ("msg" ,msg ))
233
+
187
234
if msg .Candidate != "" {
188
235
c := webrtc.ICECandidateInit {
189
236
Candidate :msg .Candidate ,
@@ -192,17 +239,22 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
192
239
pendingCandidates = append (pendingCandidates ,c )
193
240
continue
194
241
}
242
+
243
+ d .log .Debug (ctx ,"adding remote ICE candidate" ,slog .F ("c" ,c ))
195
244
err = d .rtc .AddICECandidate (c )
196
245
if err != nil {
197
246
return fmt .Errorf ("accept ice candidate: %s: %w" ,msg .Candidate ,err )
198
247
}
199
248
continue
200
249
}
250
+
201
251
if msg .Answer != nil {
252
+ d .log .Debug (ctx ,"received answer" ,slog .F ("a" ,* msg .Answer ))
202
253
err = d .rtc .SetRemoteDescription (* msg .Answer )
203
254
if err != nil {
204
255
return fmt .Errorf ("set answer: %w" ,err )
205
256
}
257
+
206
258
for _ ,candidate := range pendingCandidates {
207
259
err = d .rtc .AddICECandidate (candidate )
208
260
if err != nil {
@@ -212,11 +264,15 @@ func (d *Dialer) negotiate(ctx context.Context) (err error) {
212
264
pendingCandidates = nil
213
265
continue
214
266
}
267
+
215
268
if msg .Error != "" {
216
- return errors .New (msg .Error )
269
+ d .log .Debug (ctx ,"got error from peer" ,slog .F ("err" ,msg .Error ))
270
+ return fmt .Errorf ("error from peer: %v" ,msg .Error )
217
271
}
272
+
218
273
return fmt .Errorf ("unhandled message: %+v" ,msg )
219
274
}
275
+
220
276
return <- errCh
221
277
}
222
278
@@ -234,6 +290,7 @@ func (d *Dialer) activeConnections() int {
234
290
// Close closes the RTC connection.
235
291
// All data channels dialed will be closed.
236
292
func (d * Dialer )Close ()error {
293
+ d .log .Debug (context .Background (),"close called" )
237
294
return d .rtc .Close ()
238
295
}
239
296
@@ -242,6 +299,7 @@ func (d *Dialer) Ping(ctx context.Context) error {
242
299
if d .ctrl .ReadyState ()== webrtc .DataChannelStateClosed || d .ctrl .ReadyState ()== webrtc .DataChannelStateClosing {
243
300
return webrtc .ErrConnectionClosed
244
301
}
302
+
245
303
// Since we control the client and server we could open this
246
304
// data channel with `Negotiated` true to reduce traffic being
247
305
// sent when the RTC connection is opened.
@@ -257,6 +315,7 @@ func (d *Dialer) Ping(ctx context.Context) error {
257
315
}
258
316
d .pingMut .Lock ()
259
317
defer d .pingMut .Unlock ()
318
+ d .log .Debug (ctx ,"sending ping" )
260
319
_ ,err = d .ctrlrw .Write ([]byte {'a' })
261
320
if err != nil {
262
321
return fmt .Errorf ("write: %w" ,err )
@@ -281,13 +340,18 @@ func (d *Dialer) Ping(ctx context.Context) error {
281
340
282
341
// DialContext dials the network and address on the remote listener.
283
342
func (d * Dialer )DialContext (ctx context.Context ,network ,address string ) (net.Conn ,error ) {
343
+ proto := fmt .Sprintf ("%s:%s" ,network ,address )
344
+ ctx = slog .With (ctx ,slog .F ("proto" ,proto ))
345
+
346
+ d .log .Debug (ctx ,"opening data channel" )
284
347
dc ,err := d .rtc .CreateDataChannel ("proxy" ,& webrtc.DataChannelInit {
285
348
Ordered :boolPtr (network != "udp" ),
286
- Protocol :stringPtr ( fmt . Sprintf ( "%s:%s" , network , address )) ,
349
+ Protocol :& proto ,
287
350
})
288
351
if err != nil {
289
352
return nil ,fmt .Errorf ("create data channel: %w" ,err )
290
353
}
354
+
291
355
d .connClosersMut .Lock ()
292
356
d .connClosers = append (d .connClosers ,dc )
293
357
d .connClosersMut .Unlock ()
@@ -296,10 +360,18 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
296
360
if err != nil {
297
361
return nil ,fmt .Errorf ("wait for open: %w" ,err )
298
362
}
363
+
364
+ ctx = slog .With (ctx ,slog .F ("dc_id" ,dc .ID ()))
365
+ d .log .Debug (ctx ,"data channel opened" )
366
+
299
367
rw ,err := dc .Detach ()
300
368
if err != nil {
301
369
return nil ,fmt .Errorf ("detach: %w" ,err )
302
370
}
371
+ d .log .Debug (ctx ,"data channel detached" )
372
+
373
+ ctx ,cancel := context .WithTimeout (ctx ,time .Second * 5 )
374
+ defer cancel ()
303
375
304
376
errCh := make (chan error )
305
377
go func () {
@@ -309,6 +381,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
309
381
errCh <- fmt .Errorf ("read dial response: %w" ,err )
310
382
return
311
383
}
384
+ d .log .Debug (ctx ,"dial response" ,slog .F ("res" ,res ))
312
385
if res .Err == "" {
313
386
close (errCh )
314
387
return
@@ -323,8 +396,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
323
396
}
324
397
errCh <- err
325
398
}()
326
- ctx ,cancel := context .WithTimeout (ctx ,time .Second * 5 )
327
- defer cancel ()
399
+
328
400
select {
329
401
case err := <- errCh :
330
402
if err != nil {
@@ -343,5 +415,7 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
343
415
rw :rw ,
344
416
}
345
417
c .init ()
418
+
419
+ d .log .Debug (ctx ,"dial channel ready" )
346
420
return c ,nil
347
421
}