Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: use screen for reconnecting terminal sessions if available#8640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
code-asher merged 30 commits intomainfromasher/reconnection-with-screen
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
30 commits
Select commitHold shift + click to select a range
0b8499d
Add screen backend for reconnecting ptys
code-asherJul 18, 2023
9cb043c
Fix leaking goroutine in wait
code-asherAug 8, 2023
19633c9
Remove connection_id from reconnecting PTY
code-asherAug 8, 2023
8f4956f
Remove error from close return
code-asherAug 8, 2023
ee887b9
Refactor reconnecting PTY backends
code-asherAug 8, 2023
1697070
Merge remote-tracking branch 'github/main' into asher/reconnection-wi…
code-asherAug 8, 2023
a2149fc
Remove extra mutex unlock
code-asherAug 8, 2023
e3c808b
Fix heartbeat typo in comment
code-asherAug 8, 2023
369a36e
Tweak connection close
code-asherAug 8, 2023
72d405c
Linter fixes
code-asherAug 8, 2023
e37fc0f
Clear active conns on close
code-asherAug 9, 2023
61a4253
Avoid useless buffer reset on close
code-asherAug 9, 2023
c7978db
Move lifecycle after buffer and process are set
code-asherAug 9, 2023
a083b31
Add info logs for starting, stopping, and attaching
code-asherAug 9, 2023
bb40f78
Do not hold mutex while waiting for state in screen
code-asherAug 9, 2023
089e1f9
Remove incorrect statement about closing on Attach
code-asherAug 9, 2023
ee67045
Remove backend type from SDK/API
code-asherAug 9, 2023
a6bcdd2
Use PATH to test buffered reconnecting pty
code-asherAug 9, 2023
3ff1510
Do not hold mutex while waiting for state
code-asherAug 9, 2023
a781173
Avoid clobbering attach error with close errors
code-asherAug 10, 2023
7a8ec2e
Immediately read screen process
code-asherAug 10, 2023
56ca7ac
Fix incorrect logger context on reconnecting PTY
code-asherAug 10, 2023
9b88a68
Protect map and state with the same mutex
code-asherAug 10, 2023
56e71c9
Fix incorrect test comment
code-asherAug 11, 2023
d4170ca
Attempt fixing flake with 'echo test' command
code-asherAug 11, 2023
34c5c1a
Avoid wait callback when context expires
code-asherAug 11, 2023
88a6b96
Remove err from wait callback
code-asherAug 11, 2023
1fd4e9a
Add state wait func where caller holds the lock
code-asherAug 14, 2023
57f464a
Merge remote-tracking branch 'github/main' into asher/reconnection-wi…
code-asherAug 14, 2023
968526d
Remove unused fn
code-asherAug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 19 additions & 193 deletionsagent/agent.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,7 +21,6 @@ import (
"sync"
"time"

"github.com/armon/circbuf"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
Expand All@@ -36,12 +35,12 @@ import (

"cdr.dev/slog"
"github.com/coder/coder/agent/agentssh"
"github.com/coder/coder/agent/reconnectingpty"
"github.com/coder/coder/buildinfo"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/gitauth"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/pty"
"github.com/coder/coder/tailnet"
"github.com/coder/retry"
)
Expand DownExpand Up@@ -92,9 +91,6 @@ type Agent interface {
}

func New(options Options) Agent {
if options.ReconnectingPTYTimeout == 0 {
options.ReconnectingPTYTimeout = 5 * time.Minute
}
if options.Filesystem == nil {
options.Filesystem = afero.NewOsFs()
}
Expand DownExpand Up@@ -1075,8 +1071,8 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
defer a.connCountReconnectingPTY.Add(-1)

connectionID := uuid.NewString()
logger= logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
logger.Debug(ctx, "starting handler")
connLogger := logger.With(slog.F("message_id", msg.ID), slog.F("connection_id", connectionID))
connLogger.Debug(ctx, "starting handler")

defer func() {
if err := retErr; err != nil {
Expand All@@ -1087,22 +1083,22 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
// If the agent is closed, we don't want to
// log this as an error since it's expected.
if closed {
logger.Debug(ctx, "reconnectingPTY failed withsession error (agent closed)", slog.Error(err))
connLogger.Debug(ctx, "reconnectingpty failed withattach error (agent closed)", slog.Error(err))
} else {
logger.Error(ctx, "reconnectingPTY failed withsession error", slog.Error(err))
connLogger.Error(ctx, "reconnectingpty failed withattach error", slog.Error(err))
}
}
logger.Debug(ctx, "session closed")
connLogger.Debug(ctx, "reconnecting pty connection closed")
}()

var rpty*reconnectingPTY
sendConnected := make(chan*reconnectingPTY, 1)
var rptyreconnectingpty.ReconnectingPTY
sendConnected := make(chanreconnectingpty.ReconnectingPTY, 1)
// On store, reserve this ID to prevent multiple concurrent new connections.
waitReady, ok := a.reconnectingPTYs.LoadOrStore(msg.ID, sendConnected)
if ok {
close(sendConnected) // Unused.
logger.Debug(ctx, "connecting to existingsession")
c, ok := waitReady.(chan*reconnectingPTY)
connLogger.Debug(ctx, "connecting to existingreconnecting pty")
c, ok := waitReady.(chanreconnectingpty.ReconnectingPTY)
if !ok {
return xerrors.Errorf("found invalid type in reconnecting pty map: %T", waitReady)
}
Expand All@@ -1112,7 +1108,7 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
}
c <- rpty // Put it back for the next reconnect.
} else {
logger.Debug(ctx, "creating newsession")
connLogger.Debug(ctx, "creating newreconnecting pty")

connected := false
defer func() {
Expand All@@ -1128,169 +1124,24 @@ func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, m
a.metrics.reconnectingPTYErrors.WithLabelValues("create_command").Add(1)
return xerrors.Errorf("create command: %w", err)
}
cmd.Env = append(cmd.Env, "TERM=xterm-256color")

// Default to buffer 64KiB.
circularBuffer, err := circbuf.NewBuffer(64 << 10)
if err != nil {
return xerrors.Errorf("create circular buffer: %w", err)
}

ptty, process, err := pty.Start(cmd)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("start_command").Add(1)
return xerrors.Errorf("start command: %w", err)
}
rpty = reconnectingpty.New(ctx, cmd, &reconnectingpty.Options{
Timeout: a.reconnectingPTYTimeout,
Metrics: a.metrics.reconnectingPTYErrors,
}, logger.With(slog.F("message_id", msg.ID)))

ctx, cancel := context.WithCancel(ctx)
rpty = &reconnectingPTY{
activeConns: map[string]net.Conn{
// We have to put the connection in the map instantly otherwise
// the connection won't be closed if the process instantly dies.
connectionID: conn,
},
ptty: ptty,
// Timeouts created with an after func can be reset!
timeout: time.AfterFunc(a.reconnectingPTYTimeout, cancel),
circularBuffer: circularBuffer,
}
// We don't need to separately monitor for the process exiting.
// When it exits, our ptty.OutputReader() will return EOF after
// reading all process output.
if err = a.trackConnGoroutine(func() {
buffer := make([]byte, 1024)
for {
read, err := rpty.ptty.OutputReader().Read(buffer)
if err != nil {
// When the PTY is closed, this is triggered.
// Error is typically a benign EOF, so only log for debugging.
if errors.Is(err, io.EOF) {
logger.Debug(ctx, "unable to read pty output, command might have exited", slog.Error(err))
} else {
logger.Warn(ctx, "unable to read pty output, command might have exited", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("output_reader").Add(1)
}
break
}
part := buffer[:read]
rpty.circularBufferMutex.Lock()
_, err = rpty.circularBuffer.Write(part)
rpty.circularBufferMutex.Unlock()
if err != nil {
logger.Error(ctx, "write to circular buffer", slog.Error(err))
break
}
rpty.activeConnsMutex.Lock()
for cid, conn := range rpty.activeConns {
_, err = conn.Write(part)
if err != nil {
logger.Warn(ctx,
"error writing to active conn",
slog.F("other_conn_id", cid),
slog.Error(err),
)
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
}
}
rpty.activeConnsMutex.Unlock()
}

// Cleanup the process, PTY, and delete it's
// ID from memory.
_ = process.Kill()
rpty.Close()
rpty.Wait()
a.reconnectingPTYs.Delete(msg.ID)
}); err != nil {
_ = process.Kill()
_ = ptty.Close()
rpty.Close(err.Error())
return xerrors.Errorf("start routine: %w", err)
}

connected = true
sendConnected <- rpty
}
// Resize the PTY to initial height + width.
err := rpty.ptty.Resize(msg.Height, msg.Width)
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "reconnecting PTY initial resize failed, but will continue", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
// Write any previously stored data for the TTY.
rpty.circularBufferMutex.RLock()
prevBuf := slices.Clone(rpty.circularBuffer.Bytes())
rpty.circularBufferMutex.RUnlock()
// Note that there is a small race here between writing buffered
// data and storing conn in activeConns. This is likely a very minor
// edge case, but we should look into ways to avoid it. Holding
// activeConnsMutex would be one option, but holding this mutex
// while also holding circularBufferMutex seems dangerous.
_, err = conn.Write(prevBuf)
if err != nil {
a.metrics.reconnectingPTYErrors.WithLabelValues("write").Add(1)
return xerrors.Errorf("write buffer to conn: %w", err)
}
// Multiple connections to the same TTY are permitted.
// This could easily be used for terminal sharing, but
// we do it because it's a nice user experience to
// copy/paste a terminal URL and have it _just work_.
rpty.activeConnsMutex.Lock()
rpty.activeConns[connectionID] = conn
rpty.activeConnsMutex.Unlock()
// Resetting this timeout prevents the PTY from exiting.
rpty.timeout.Reset(a.reconnectingPTYTimeout)

ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
heartbeat := time.NewTicker(a.reconnectingPTYTimeout / 2)
defer heartbeat.Stop()
go func() {
// Keep updating the activity while this
// connection is alive!
for {
select {
case <-ctx.Done():
return
case <-heartbeat.C:
}
rpty.timeout.Reset(a.reconnectingPTYTimeout)
}
}()
defer func() {
// After this connection ends, remove it from
// the PTYs active connections. If it isn't
// removed, all PTY data will be sent to it.
rpty.activeConnsMutex.Lock()
delete(rpty.activeConns, connectionID)
rpty.activeConnsMutex.Unlock()
}()
decoder := json.NewDecoder(conn)
var req codersdk.ReconnectingPTYRequest
for {
err = decoder.Decode(&req)
if xerrors.Is(err, io.EOF) {
return nil
}
if err != nil {
logger.Warn(ctx, "reconnecting PTY failed with read error", slog.Error(err))
return nil
}
_, err = rpty.ptty.InputWriter().Write([]byte(req.Data))
if err != nil {
logger.Warn(ctx, "reconnecting PTY failed with write error", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("input_writer").Add(1)
return nil
}
// Check if a resize needs to happen!
if req.Height == 0 || req.Width == 0 {
continue
}
err = rpty.ptty.Resize(req.Height, req.Width)
if err != nil {
// We can continue after this, it's not fatal!
logger.Error(ctx, "reconnecting PTY resize failed, but will continue", slog.Error(err))
a.metrics.reconnectingPTYErrors.WithLabelValues("resize").Add(1)
}
}
return rpty.Attach(ctx, connectionID, conn, msg.Height, msg.Width, connLogger)
}

// startReportingConnectionStats runs the connection stats reporting goroutine.
Expand DownExpand Up@@ -1541,31 +1392,6 @@ lifecycleWaitLoop:
return nil
}

type reconnectingPTY struct {
activeConnsMutex sync.Mutex
activeConns map[string]net.Conn

circularBuffer *circbuf.Buffer
circularBufferMutex sync.RWMutex
timeout *time.Timer
ptty pty.PTYCmd
}

// Close ends all connections to the reconnecting
// PTY and clear the circular buffer.
func (r *reconnectingPTY) Close() {
r.activeConnsMutex.Lock()
defer r.activeConnsMutex.Unlock()
for _, conn := range r.activeConns {
_ = conn.Close()
}
_ = r.ptty.Close()
r.circularBufferMutex.Lock()
r.circularBuffer.Reset()
r.circularBufferMutex.Unlock()
r.timeout.Stop()
}

// userHomeDir returns the home directory of the current user, giving
// priority to the $HOME environment variable.
func userHomeDir() (string, error) {
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp