@@ -33,6 +33,7 @@ import (
33
33
34
34
"cdr.dev/slog"
35
35
"github.com/coder/coder/agent/usershell"
36
+ "github.com/coder/coder/codersdk"
36
37
"github.com/coder/coder/pty"
37
38
"github.com/coder/coder/tailnet"
38
39
"github.com/coder/retry"
@@ -49,55 +50,41 @@ const (
49
50
MagicSessionErrorCode = 229
50
51
)
51
52
52
- var (
53
- // tailnetIP is a static IPv6 address with the Tailscale prefix that is used to route
54
- // connections from clients to this node. A dynamic address is not required because a Tailnet
55
- // client only dials a single agent at a time.
56
- tailnetIP = netip .MustParseAddr ("fd7a:115c:a1e0:49d6:b259:b7ac:b1b2:48f4" )
57
- tailnetSSHPort = 1
58
- tailnetReconnectingPTYPort = 2
59
- tailnetSpeedtestPort = 3
60
- )
61
-
62
53
type Options struct {
63
- CoordinatorDialer CoordinatorDialer
64
- FetchMetadata FetchMetadata
65
-
66
- StatsReporter StatsReporter
67
- ReconnectingPTYTimeout time.Duration
68
- EnvironmentVariables map [string ]string
69
- Logger slog.Logger
70
- }
71
-
72
- type Metadata struct {
73
- DERPMap * tailcfg.DERPMap `json:"derpmap"`
74
- EnvironmentVariables map [string ]string `json:"environment_variables"`
75
- StartupScript string `json:"startup_script"`
76
- Directory string `json:"directory"`
54
+ CoordinatorDialer CoordinatorDialer
55
+ FetchMetadata FetchMetadata
56
+ StatsReporter StatsReporter
57
+ WorkspaceAgentApps WorkspaceAgentApps
58
+ PostWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth
59
+ ReconnectingPTYTimeout time.Duration
60
+ EnvironmentVariables map [string ]string
61
+ Logger slog.Logger
77
62
}
78
63
79
64
// CoordinatorDialer is a function that constructs a new broker.
80
65
// A dialer must be passed in to allow for reconnects.
81
- type CoordinatorDialer func (ctx context.Context ) (net.Conn ,error )
66
+ type CoordinatorDialer func (context.Context ) (net.Conn ,error )
82
67
83
68
// FetchMetadata is a function to obtain metadata for the agent.
84
- type FetchMetadata func (ctx context.Context ) (Metadata ,error )
69
+ type FetchMetadata func (context.Context ) (codersdk. WorkspaceAgentMetadata ,error )
85
70
86
71
func New (options Options ) io.Closer {
87
72
if options .ReconnectingPTYTimeout == 0 {
88
73
options .ReconnectingPTYTimeout = 5 * time .Minute
89
74
}
90
75
ctx ,cancelFunc := context .WithCancel (context .Background ())
91
76
server := & agent {
92
- reconnectingPTYTimeout :options .ReconnectingPTYTimeout ,
93
- logger :options .Logger ,
94
- closeCancel :cancelFunc ,
95
- closed :make (chan struct {}),
96
- envVars :options .EnvironmentVariables ,
97
- coordinatorDialer :options .CoordinatorDialer ,
98
- fetchMetadata :options .FetchMetadata ,
99
- stats :& Stats {},
100
- statsReporter :options .StatsReporter ,
77
+ reconnectingPTYTimeout :options .ReconnectingPTYTimeout ,
78
+ logger :options .Logger ,
79
+ closeCancel :cancelFunc ,
80
+ closed :make (chan struct {}),
81
+ envVars :options .EnvironmentVariables ,
82
+ coordinatorDialer :options .CoordinatorDialer ,
83
+ fetchMetadata :options .FetchMetadata ,
84
+ stats :& Stats {},
85
+ statsReporter :options .StatsReporter ,
86
+ workspaceAgentApps :options .WorkspaceAgentApps ,
87
+ postWorkspaceAgentAppHealth :options .PostWorkspaceAgentAppHealth ,
101
88
}
102
89
server .init (ctx )
103
90
return server
@@ -120,14 +107,16 @@ type agent struct {
120
107
fetchMetadata FetchMetadata
121
108
sshServer * ssh.Server
122
109
123
- network * tailnet.Conn
124
- coordinatorDialer CoordinatorDialer
125
- stats * Stats
126
- statsReporter StatsReporter
110
+ network * tailnet.Conn
111
+ coordinatorDialer CoordinatorDialer
112
+ stats * Stats
113
+ statsReporter StatsReporter
114
+ workspaceAgentApps WorkspaceAgentApps
115
+ postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth
127
116
}
128
117
129
118
func (a * agent )run (ctx context.Context ) {
130
- var metadata Metadata
119
+ var metadata codersdk. WorkspaceAgentMetadata
131
120
var err error
132
121
// An exponential back-off occurs when the connection is failing to dial.
133
122
// This is to prevent server spam in case of a coderd outage.
@@ -168,6 +157,10 @@ func (a *agent) run(ctx context.Context) {
168
157
if metadata .DERPMap != nil {
169
158
go a .runTailnet (ctx ,metadata .DERPMap )
170
159
}
160
+
161
+ if a .workspaceAgentApps != nil && a .postWorkspaceAgentAppHealth != nil {
162
+ go NewWorkspaceAppHealthReporter (a .logger ,a .workspaceAgentApps ,a .postWorkspaceAgentAppHealth )(ctx )
163
+ }
171
164
}
172
165
173
166
func (a * agent )runTailnet (ctx context.Context ,derpMap * tailcfg.DERPMap ) {
@@ -182,7 +175,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
182
175
}
183
176
var err error
184
177
a .network ,err = tailnet .NewConn (& tailnet.Options {
185
- Addresses : []netip.Prefix {netip .PrefixFrom (tailnetIP ,128 )},
178
+ Addresses : []netip.Prefix {netip .PrefixFrom (codersdk . TailnetIP ,128 )},
186
179
DERPMap :derpMap ,
187
180
Logger :a .logger .Named ("tailnet" ),
188
181
})
@@ -199,7 +192,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
199
192
})
200
193
go a .runCoordinator (ctx )
201
194
202
- sshListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (tailnetSSHPort ))
195
+ sshListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (codersdk . TailnetSSHPort ))
203
196
if err != nil {
204
197
a .logger .Critical (ctx ,"listen for ssh" ,slog .Error (err ))
205
198
return
@@ -213,7 +206,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
213
206
go a .sshServer .HandleConn (a .stats .wrapConn (conn ))
214
207
}
215
208
}()
216
- reconnectingPTYListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (tailnetReconnectingPTYPort ))
209
+ reconnectingPTYListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (codersdk . TailnetReconnectingPTYPort ))
217
210
if err != nil {
218
211
a .logger .Critical (ctx ,"listen for reconnecting pty" ,slog .Error (err ))
219
212
return
@@ -239,15 +232,15 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
239
232
if err != nil {
240
233
continue
241
234
}
242
- var msg reconnectingPTYInit
235
+ var msg codersdk. ReconnectingPTYInit
243
236
err = json .Unmarshal (data ,& msg )
244
237
if err != nil {
245
238
continue
246
239
}
247
240
go a .handleReconnectingPTY (ctx ,msg ,conn )
248
241
}
249
242
}()
250
- speedtestListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (tailnetSpeedtestPort ))
243
+ speedtestListener ,err := a .network .Listen ("tcp" ,":" + strconv .Itoa (codersdk . TailnetSpeedtestPort ))
251
244
if err != nil {
252
245
a .logger .Critical (ctx ,"listen for speedtest" ,slog .Error (err ))
253
246
return
@@ -272,6 +265,15 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
272
265
273
266
// runCoordinator listens for nodes and updates the self-node as it changes.
274
267
func (a * agent )runCoordinator (ctx context.Context ) {
268
+ for {
269
+ reconnect := a .runCoordinatorWithRetry (ctx )
270
+ if ! reconnect {
271
+ return
272
+ }
273
+ }
274
+ }
275
+
276
+ func (a * agent )runCoordinatorWithRetry (ctx context.Context ) (reconnect bool ) {
275
277
var coordinator net.Conn
276
278
var err error
277
279
// An exponential back-off occurs when the connection is failing to dial.
@@ -280,38 +282,38 @@ func (a *agent) runCoordinator(ctx context.Context) {
280
282
coordinator ,err = a .coordinatorDialer (ctx )
281
283
if err != nil {
282
284
if errors .Is (err ,context .Canceled ) {
283
- return
285
+ return false
284
286
}
285
287
if a .isClosed () {
286
- return
288
+ return false
287
289
}
288
290
a .logger .Warn (context .Background (),"failed to dial" ,slog .Error (err ))
289
291
continue
290
292
}
293
+ //nolint:revive // Defer is ok because we're exiting this loop.
294
+ defer coordinator .Close ()
291
295
a .logger .Info (context .Background (),"connected to coordination server" )
292
296
break
293
297
}
294
298
select {
295
299
case <- ctx .Done ():
296
- return
300
+ return false
297
301
default :
298
302
}
299
- defer coordinator .Close ()
300
303
sendNodes ,errChan := tailnet .ServeCoordinator (coordinator ,a .network .UpdateNodes )
301
304
a .network .SetNodeCallback (sendNodes )
302
305
select {
303
306
case <- ctx .Done ():
304
- return
307
+ return false
305
308
case err := <- errChan :
306
309
if a .isClosed () {
307
- return
310
+ return false
308
311
}
309
312
if errors .Is (err ,context .Canceled ) {
310
- return
313
+ return false
311
314
}
312
315
a .logger .Debug (ctx ,"node broker accept exited; restarting connection" ,slog .Error (err ))
313
- a .runCoordinator (ctx )
314
- return
316
+ return true
315
317
}
316
318
}
317
319
@@ -434,7 +436,7 @@ func (a *agent) init(ctx context.Context) {
434
436
435
437
go a .run (ctx )
436
438
if a .statsReporter != nil {
437
- cl ,err := a .statsReporter (ctx ,a .logger ,func ()* Stats {
439
+ cl ,err := a .statsReporter (ctx ,a .logger ,func ()* codersdk. AgentStats {
438
440
return a .stats .Copy ()
439
441
})
440
442
if err != nil {
@@ -469,7 +471,7 @@ func (a *agent) createCommand(ctx context.Context, rawCommand string, env []stri
469
471
if rawMetadata == nil {
470
472
return nil ,xerrors .Errorf ("no metadata was provided: %w" ,err )
471
473
}
472
- metadata ,valid := rawMetadata .(Metadata )
474
+ metadata ,valid := rawMetadata .(codersdk. WorkspaceAgentMetadata )
473
475
if ! valid {
474
476
return nil ,xerrors .Errorf ("metadata is the wrong type: %T" ,metadata )
475
477
}
@@ -625,7 +627,7 @@ func (a *agent) handleSSHSession(session ssh.Session) (retErr error) {
625
627
return cmd .Wait ()
626
628
}
627
629
628
- func (a * agent )handleReconnectingPTY (ctx context.Context ,msg reconnectingPTYInit ,conn net.Conn ) {
630
+ func (a * agent )handleReconnectingPTY (ctx context.Context ,msg codersdk. ReconnectingPTYInit ,conn net.Conn ) {
629
631
defer conn .Close ()
630
632
631
633
var rpty * reconnectingPTY
@@ -766,7 +768,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, msg reconnectingPTYIn
766
768
rpty .activeConnsMutex .Unlock ()
767
769
}()
768
770
decoder := json .NewDecoder (conn )
769
- var req ReconnectingPTYRequest
771
+ var req codersdk. ReconnectingPTYRequest
770
772
for {
771
773
err = decoder .Decode (& req )
772
774
if xerrors .Is (err ,io .EOF ) {