@@ -21,7 +21,6 @@ import (
21
21
"sync"
22
22
"time"
23
23
24
- "github.com/armon/circbuf"
25
24
"github.com/google/uuid"
26
25
"github.com/prometheus/client_golang/prometheus"
27
26
"github.com/spf13/afero"
@@ -34,12 +33,12 @@ import (
34
33
35
34
"cdr.dev/slog"
36
35
"github.com/coder/coder/agent/agentssh"
36
+ "github.com/coder/coder/agent/reconnectingpty"
37
37
"github.com/coder/coder/buildinfo"
38
38
"github.com/coder/coder/coderd/database"
39
39
"github.com/coder/coder/coderd/gitauth"
40
40
"github.com/coder/coder/codersdk"
41
41
"github.com/coder/coder/codersdk/agentsdk"
42
- "github.com/coder/coder/pty"
43
42
"github.com/coder/coder/tailnet"
44
43
"github.com/coder/retry"
45
44
)
@@ -87,9 +86,6 @@ type Agent interface {
87
86
}
88
87
89
88
func New (options Options )Agent {
90
- if options .ReconnectingPTYTimeout == 0 {
91
- options .ReconnectingPTYTimeout = 5 * time .Minute
92
- }
93
89
if options .Filesystem == nil {
94
90
options .Filesystem = afero .NewOsFs ()
95
91
}
@@ -1042,14 +1038,14 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1042
1038
logger .Debug (ctx ,"session closed" )
1043
1039
}()
1044
1040
1045
- var rpty * reconnectingPTY
1046
- sendConnected := make (chan * reconnectingPTY ,1 )
1041
+ var rpty * reconnectingpty. ReconnectingPTY
1042
+ sendConnected := make (chan * reconnectingpty. ReconnectingPTY ,1 )
1047
1043
// On store, reserve this ID to prevent multiple concurrent new connections.
1048
1044
waitReady ,ok := a .reconnectingPTYs .LoadOrStore (msg .ID ,sendConnected )
1049
1045
if ok {
1050
1046
close (sendConnected )// Unused.
1051
1047
logger .Debug (ctx ,"connecting to existing session" )
1052
- c ,ok := waitReady .(chan * reconnectingPTY )
1048
+ c ,ok := waitReady .(chan * reconnectingpty. ReconnectingPTY )
1053
1049
if ! ok {
1054
1050
return xerrors .Errorf ("found invalid type in reconnecting pty map: %T" ,waitReady )
1055
1051
}
@@ -1075,169 +1071,37 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
1075
1071
a .metrics .reconnectingPTYErrors .WithLabelValues ("create_command" ).Add (1 )
1076
1072
return xerrors .Errorf ("create command: %w" ,err )
1077
1073
}
1078
- cmd .Env = append (cmd .Env ,"TERM=xterm-256color" )
1079
1074
1080
- // Default to buffer 64KiB.
1081
- circularBuffer ,err := circbuf .NewBuffer (64 << 10 )
1082
- if err != nil {
1083
- return xerrors .Errorf ("create circular buffer: %w" ,err )
1075
+ // The ablity to select the backend type is mostly for tests.
1076
+ backendType := msg .BackendType
1077
+ if backendType == "" {
1078
+ _ ,err = exec .LookPath ("screen" )
1079
+ if err == nil {
1080
+ backendType = codersdk .ReconnectingPTYBackendTypeScreen
1081
+ }else {
1082
+ backendType = codersdk .ReconnectingPTYBackendTypeBuffered
1083
+ }
1084
1084
}
1085
1085
1086
- ptty ,process ,err := pty .Start (cmd )
1087
- if err != nil {
1088
- a .metrics .reconnectingPTYErrors .WithLabelValues ("start_command" ).Add (1 )
1089
- return xerrors .Errorf ("start command: %w" ,err )
1090
- }
1086
+ rpty = reconnectingpty .New (ctx ,cmd ,& reconnectingpty.Options {
1087
+ BackendType :backendType ,
1088
+ Timeout :a .reconnectingPTYTimeout ,
1089
+ Metrics :a .metrics .reconnectingPTYErrors ,
1090
+ Logger :logger ,
1091
+ })
1091
1092
1092
- ctx ,cancel := context .WithCancel (ctx )
1093
- rpty = & reconnectingPTY {
1094
- activeConns :map [string ]net.Conn {
1095
- // We have to put the connection in the map instantly otherwise
1096
- // the connection won't be closed if the process instantly dies.
1097
- connectionID :conn ,
1098
- },
1099
- ptty :ptty ,
1100
- // Timeouts created with an after func can be reset!
1101
- timeout :time .AfterFunc (a .reconnectingPTYTimeout ,cancel ),
1102
- circularBuffer :circularBuffer ,
1103
- }
1104
- // We don't need to separately monitor for the process exiting.
1105
- // When it exits, our ptty.OutputReader() will return EOF after
1106
- // reading all process output.
1107
1093
if err = a .trackConnGoroutine (func () {
1108
- buffer := make ([]byte ,1024 )
1109
- for {
1110
- read ,err := rpty .ptty .OutputReader ().Read (buffer )
1111
- if err != nil {
1112
- // When the PTY is closed, this is triggered.
1113
- // Error is typically a benign EOF, so only log for debugging.
1114
- if errors .Is (err ,io .EOF ) {
1115
- logger .Debug (ctx ,"unable to read pty output, command might have exited" ,slog .Error (err ))
1116
- }else {
1117
- logger .Warn (ctx ,"unable to read pty output, command might have exited" ,slog .Error (err ))
1118
- a .metrics .reconnectingPTYErrors .WithLabelValues ("output_reader" ).Add (1 )
1119
- }
1120
- break
1121
- }
1122
- part := buffer [:read ]
1123
- rpty .circularBufferMutex .Lock ()
1124
- _ ,err = rpty .circularBuffer .Write (part )
1125
- rpty .circularBufferMutex .Unlock ()
1126
- if err != nil {
1127
- logger .Error (ctx ,"write to circular buffer" ,slog .Error (err ))
1128
- break
1129
- }
1130
- rpty .activeConnsMutex .Lock ()
1131
- for cid ,conn := range rpty .activeConns {
1132
- _ ,err = conn .Write (part )
1133
- if err != nil {
1134
- logger .Warn (ctx ,
1135
- "error writing to active conn" ,
1136
- slog .F ("other_conn_id" ,cid ),
1137
- slog .Error (err ),
1138
- )
1139
- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1140
- }
1141
- }
1142
- rpty .activeConnsMutex .Unlock ()
1143
- }
1144
-
1145
- // Cleanup the process, PTY, and delete it's
1146
- // ID from memory.
1147
- _ = process .Kill ()
1148
- rpty .Close ()
1094
+ rpty .Wait ()
1149
1095
a .reconnectingPTYs .Delete (msg .ID )
1150
1096
});err != nil {
1151
- _ = process .Kill ()
1152
- _ = ptty .Close ()
1097
+ rpty .Close (err .Error ())
1153
1098
return xerrors .Errorf ("start routine: %w" ,err )
1154
1099
}
1100
+
1155
1101
connected = true
1156
1102
sendConnected <- rpty
1157
1103
}
1158
- // Resize the PTY to initial height + width.
1159
- err := rpty .ptty .Resize (msg .Height ,msg .Width )
1160
- if err != nil {
1161
- // We can continue after this, it's not fatal!
1162
- logger .Error (ctx ,"reconnecting PTY initial resize failed, but will continue" ,slog .Error (err ))
1163
- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1164
- }
1165
- // Write any previously stored data for the TTY.
1166
- rpty .circularBufferMutex .RLock ()
1167
- prevBuf := slices .Clone (rpty .circularBuffer .Bytes ())
1168
- rpty .circularBufferMutex .RUnlock ()
1169
- // Note that there is a small race here between writing buffered
1170
- // data and storing conn in activeConns. This is likely a very minor
1171
- // edge case, but we should look into ways to avoid it. Holding
1172
- // activeConnsMutex would be one option, but holding this mutex
1173
- // while also holding circularBufferMutex seems dangerous.
1174
- _ ,err = conn .Write (prevBuf )
1175
- if err != nil {
1176
- a .metrics .reconnectingPTYErrors .WithLabelValues ("write" ).Add (1 )
1177
- return xerrors .Errorf ("write buffer to conn: %w" ,err )
1178
- }
1179
- // Multiple connections to the same TTY are permitted.
1180
- // This could easily be used for terminal sharing, but
1181
- // we do it because it's a nice user experience to
1182
- // copy/paste a terminal URL and have it _just work_.
1183
- rpty .activeConnsMutex .Lock ()
1184
- rpty .activeConns [connectionID ]= conn
1185
- rpty .activeConnsMutex .Unlock ()
1186
- // Resetting this timeout prevents the PTY from exiting.
1187
- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1188
-
1189
- ctx ,cancelFunc := context .WithCancel (ctx )
1190
- defer cancelFunc ()
1191
- heartbeat := time .NewTicker (a .reconnectingPTYTimeout / 2 )
1192
- defer heartbeat .Stop ()
1193
- go func () {
1194
- // Keep updating the activity while this
1195
- // connection is alive!
1196
- for {
1197
- select {
1198
- case <- ctx .Done ():
1199
- return
1200
- case <- heartbeat .C :
1201
- }
1202
- rpty .timeout .Reset (a .reconnectingPTYTimeout )
1203
- }
1204
- }()
1205
- defer func () {
1206
- // After this connection ends, remove it from
1207
- // the PTYs active connections. If it isn't
1208
- // removed, all PTY data will be sent to it.
1209
- rpty .activeConnsMutex .Lock ()
1210
- delete (rpty .activeConns ,connectionID )
1211
- rpty .activeConnsMutex .Unlock ()
1212
- }()
1213
- decoder := json .NewDecoder (conn )
1214
- var req codersdk.ReconnectingPTYRequest
1215
- for {
1216
- err = decoder .Decode (& req )
1217
- if xerrors .Is (err ,io .EOF ) {
1218
- return nil
1219
- }
1220
- if err != nil {
1221
- logger .Warn (ctx ,"reconnecting PTY failed with read error" ,slog .Error (err ))
1222
- return nil
1223
- }
1224
- _ ,err = rpty .ptty .InputWriter ().Write ([]byte (req .Data ))
1225
- if err != nil {
1226
- logger .Warn (ctx ,"reconnecting PTY failed with write error" ,slog .Error (err ))
1227
- a .metrics .reconnectingPTYErrors .WithLabelValues ("input_writer" ).Add (1 )
1228
- return nil
1229
- }
1230
- // Check if a resize needs to happen!
1231
- if req .Height == 0 || req .Width == 0 {
1232
- continue
1233
- }
1234
- err = rpty .ptty .Resize (req .Height ,req .Width )
1235
- if err != nil {
1236
- // We can continue after this, it's not fatal!
1237
- logger .Error (ctx ,"reconnecting PTY resize failed, but will continue" ,slog .Error (err ))
1238
- a .metrics .reconnectingPTYErrors .WithLabelValues ("resize" ).Add (1 )
1239
- }
1240
- }
1104
+ return rpty .Attach (ctx ,connectionID ,conn ,msg .Height ,msg .Width )
1241
1105
}
1242
1106
1243
1107
// startReportingConnectionStats runs the connection stats reporting goroutine.
@@ -1455,31 +1319,6 @@ lifecycleWaitLoop:
1455
1319
return nil
1456
1320
}
1457
1321
1458
- type reconnectingPTY struct {
1459
- activeConnsMutex sync.Mutex
1460
- activeConns map [string ]net.Conn
1461
-
1462
- circularBuffer * circbuf.Buffer
1463
- circularBufferMutex sync.RWMutex
1464
- timeout * time.Timer
1465
- ptty pty.PTYCmd
1466
- }
1467
-
1468
- // Close ends all connections to the reconnecting
1469
- // PTY and clear the circular buffer.
1470
- func (r * reconnectingPTY )Close () {
1471
- r .activeConnsMutex .Lock ()
1472
- defer r .activeConnsMutex .Unlock ()
1473
- for _ ,conn := range r .activeConns {
1474
- _ = conn .Close ()
1475
- }
1476
- _ = r .ptty .Close ()
1477
- r .circularBufferMutex .Lock ()
1478
- r .circularBuffer .Reset ()
1479
- r .circularBufferMutex .Unlock ()
1480
- r .timeout .Stop ()
1481
- }
1482
-
1483
1322
// userHomeDir returns the home directory of the current user, giving
1484
1323
// priority to the $HOME environment variable.
1485
1324
func userHomeDir () (string ,error ) {