@@ -58,32 +58,28 @@ type tailnetAPIConnector struct {
58
58
coordinateURL string
59
59
dialOptions * websocket.DialOptions
60
60
conn tailnetConn
61
+ customDialFn func () (proto.DRPCTailnetClient ,error )
62
+
63
+ clientMu sync.RWMutex
64
+ client proto.DRPCTailnetClient
61
65
62
66
connected chan error
63
67
isFirst bool
64
68
closed chan struct {}
65
69
}
66
70
67
- // runTailnetAPIConnector creates and runs a tailnetAPIConnector
68
- func runTailnetAPIConnector (
69
- ctx context.Context ,logger slog.Logger ,
70
- agentID uuid.UUID ,coordinateURL string ,dialOptions * websocket.DialOptions ,
71
- conn tailnetConn ,
72
- )* tailnetAPIConnector {
73
- tac := & tailnetAPIConnector {
71
+ // Create a new tailnetAPIConnector without running it
72
+ func newTailnetAPIConnector (ctx context.Context ,logger slog.Logger ,agentID uuid.UUID ,coordinateURL string ,dialOptions * websocket.DialOptions )* tailnetAPIConnector {
73
+ return & tailnetAPIConnector {
74
74
ctx :ctx ,
75
75
logger :logger ,
76
76
agentID :agentID ,
77
77
coordinateURL :coordinateURL ,
78
78
dialOptions :dialOptions ,
79
- conn :conn ,
79
+ conn :nil ,
80
80
connected :make (chan error ,1 ),
81
81
closed :make (chan struct {}),
82
82
}
83
- tac .gracefulCtx ,tac .cancelGracefulCtx = context .WithCancel (context .Background ())
84
- go tac .manageGracefulTimeout ()
85
- go tac .run ()
86
- return tac
87
83
}
88
84
89
85
// manageGracefulTimeout allows the gracefulContext to last 1 second longer than the main context
@@ -99,21 +95,27 @@ func (tac *tailnetAPIConnector) manageGracefulTimeout() {
99
95
}
100
96
}
101
97
102
- func (tac * tailnetAPIConnector )run () {
103
- tac .isFirst = true
104
- defer close (tac .closed )
105
- for retrier := retry .New (50 * time .Millisecond ,10 * time .Second );retrier .Wait (tac .ctx ); {
106
- tailnetClient ,err := tac .dial ()
107
- if xerrors .Is (err ,& codersdk.Error {}) {
108
- return
109
- }
110
- if err != nil {
111
- continue
98
+ // Runs a tailnetAPIConnector using the provided connection
99
+ func (tac * tailnetAPIConnector )runConnector (conn tailnetConn ) {
100
+ tac .conn = conn
101
+ tac .gracefulCtx ,tac .cancelGracefulCtx = context .WithCancel (context .Background ())
102
+ go tac .manageGracefulTimeout ()
103
+ go func () {
104
+ tac .isFirst = true
105
+ defer close (tac .closed )
106
+ for retrier := retry .New (50 * time .Millisecond ,10 * time .Second );retrier .Wait (tac .ctx ); {
107
+ tailnetClient ,err := tac .dial ()
108
+ if err != nil {
109
+ continue
110
+ }
111
+ tac .clientMu .Lock ()
112
+ tac .client = tailnetClient
113
+ tac .clientMu .Unlock ()
114
+ tac .logger .Debug (tac .ctx ,"obtained tailnet API v2+ client" )
115
+ tac .coordinateAndDERPMap (tailnetClient )
116
+ tac .logger .Debug (tac .ctx ,"tailnet API v2+ connection lost" )
112
117
}
113
- tac .logger .Debug (tac .ctx ,"obtained tailnet API v2+ client" )
114
- tac .coordinateAndDERPMap (tailnetClient )
115
- tac .logger .Debug (tac .ctx ,"tailnet API v2+ connection lost" )
116
- }
118
+ }()
117
119
}
118
120
119
121
var permanentErrorStatuses = []int {
@@ -123,6 +125,10 @@ var permanentErrorStatuses = []int{
123
125
}
124
126
125
127
func (tac * tailnetAPIConnector )dial () (proto.DRPCTailnetClient ,error ) {
128
+ if tac .customDialFn != nil {
129
+ return tac .customDialFn ()
130
+ }
131
+
126
132
tac .logger .Debug (tac .ctx ,"dialing Coder tailnet v2+ API" )
127
133
// nolint:bodyclose
128
134
ws ,res ,err := websocket .Dial (tac .ctx ,tac .coordinateURL ,tac .dialOptions )
@@ -194,7 +200,10 @@ func (tac *tailnetAPIConnector) coordinateAndDERPMap(client proto.DRPCTailnetCli
194
200
// we do NOT want to gracefully disconnect on the coordinate() routine. So, we'll just
195
201
// close the underlying connection. This will trigger a retry of the control plane in
196
202
// run().
203
+ tac .clientMu .Lock ()
197
204
client .DRPCConn ().Close ()
205
+ tac .client = nil
206
+ tac .clientMu .Unlock ()
198
207
// Note that derpMap() logs it own errors, we don't bother here.
199
208
}
200
209
}()
@@ -258,3 +267,18 @@ func (tac *tailnetAPIConnector) derpMap(client proto.DRPCTailnetClient) error {
258
267
tac .conn .SetDERPMap (dm )
259
268
}
260
269
}
270
+
271
+ func (tac * tailnetAPIConnector )SendTelemetryEvent (event * proto.TelemetryEvent ) {
272
+ tac .clientMu .RLock ()
273
+ // We hold the lock for the entire telemetry request, but this would only block
274
+ // a coordinate retry, and closing the connection.
275
+ defer tac .clientMu .RUnlock ()
276
+ if tac .client == nil {
277
+ return
278
+ }
279
+ ctx ,cancel := context .WithTimeout (tac .ctx ,5 * time .Second )
280
+ defer cancel ()
281
+ _ ,_ = tac .client .PostTelemetry (ctx ,& proto.TelemetryRequest {
282
+ Events : []* proto.TelemetryEvent {event },
283
+ })
284
+ }