- Notifications
You must be signed in to change notification settings - Fork928
feat: Add web terminal with reconnecting TTYs#1186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Merged
Changes fromall commits
Commits
Show all changes
8 commits Select commitHold shift + click to select a range
31f27bc
feat: Add web terminal with reconnecting TTYs
kylecarbs15d843e
Add xstate service
kylecarbscb5ae98
Add the webpage for accessing a web terminal
kylecarbs229c7e4
Add terminal page tests
kylecarbs621aeb1
Merge branch 'main' into webterm
kylecarbs3e1a0a4
Use Ticker instead of Timer
kylecarbs4ef7106
Active Windows mode on Windows
kylecarbs19c7b54
Merge branch 'main' into webterm
kylecarbsFile filter
Filter by extension
Conversations
Failed to load comments.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Jump to file
Failed to load files.
Loading
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
4 changes: 4 additions & 0 deletions.vscode/settings.json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
269 changes: 256 additions & 13 deletionsagent/agent.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -4,6 +4,7 @@ import ( | ||
"context" | ||
"crypto/rand" | ||
"crypto/rsa" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
@@ -12,10 +13,14 @@ import ( | ||
"os/exec" | ||
"os/user" | ||
"runtime" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
"github.com/armon/circbuf" | ||
"github.com/google/uuid" | ||
gsyslog "github.com/hashicorp/go-syslog" | ||
"go.uber.org/atomic" | ||
@@ -33,6 +38,11 @@ import ( | ||
"golang.org/x/xerrors" | ||
) | ||
type Options struct { | ||
ReconnectingPTYTimeout time.Duration | ||
Logger slog.Logger | ||
} | ||
type Metadata struct { | ||
OwnerEmail string `json:"owner_email"` | ||
OwnerUsername string `json:"owner_username"` | ||
@@ -42,13 +52,20 @@ type Metadata struct { | ||
type Dialer func(ctx context.Context, logger slog.Logger) (Metadata, *peerbroker.Listener, error) | ||
func New(dialer Dialer, options *Options) io.Closer { | ||
if options == nil { | ||
options = &Options{} | ||
} | ||
if options.ReconnectingPTYTimeout == 0 { | ||
options.ReconnectingPTYTimeout = 5 * time.Minute | ||
} | ||
ctx, cancelFunc := context.WithCancel(context.Background()) | ||
server := &agent{ | ||
dialer: dialer, | ||
reconnectingPTYTimeout: options.ReconnectingPTYTimeout, | ||
logger: options.Logger, | ||
closeCancel: cancelFunc, | ||
closed: make(chan struct{}), | ||
} | ||
server.init(ctx) | ||
return server | ||
@@ -58,6 +75,9 @@ type agent struct { | ||
dialer Dialer | ||
logger slog.Logger | ||
reconnectingPTYs sync.Map | ||
reconnectingPTYTimeout time.Duration | ||
connCloseWait sync.WaitGroup | ||
closeCancel context.CancelFunc | ||
closeMutex sync.Mutex | ||
@@ -196,6 +216,8 @@ func (a *agent) handlePeerConn(ctx context.Context, conn *peer.Conn) { | ||
switch channel.Protocol() { | ||
case "ssh": | ||
go a.sshServer.HandleConn(channel.NetConn()) | ||
case "reconnecting-pty": | ||
go a.handleReconnectingPTY(ctx, channel.Label(), channel.NetConn()) | ||
default: | ||
a.logger.Warn(ctx, "unhandled protocol from channel", | ||
slog.F("protocol", channel.Protocol()), | ||
@@ -282,22 +304,25 @@ func (a *agent) init(ctx context.Context) { | ||
go a.run(ctx) | ||
} | ||
// createCommand processes raw command input with OpenSSH-like behavior. | ||
// If the rawCommand provided is empty, it will default to the users shell. | ||
// This injects environment variables specified by the user at launch too. | ||
func (a *agent) createCommand(ctx context.Context, rawCommand string, env []string) (*exec.Cmd, error) { | ||
currentUser, err := user.Current() | ||
if err != nil { | ||
returnnil,xerrors.Errorf("get current user: %w", err) | ||
} | ||
username := currentUser.Username | ||
shell, err := usershell.Get(username) | ||
if err != nil { | ||
returnnil,xerrors.Errorf("get user shell: %w", err) | ||
} | ||
// gliderlabs/ssh returns a command slice of zero | ||
// when a shell is requested. | ||
command :=rawCommand | ||
if len(command) == 0 { | ||
command = shell | ||
} | ||
@@ -307,11 +332,11 @@ func (a *agent) handleSSHSession(session ssh.Session) error { | ||
if runtime.GOOS == "windows" { | ||
caller = "/c" | ||
} | ||
cmd := exec.CommandContext(ctx, shell, caller, command) | ||
cmd.Env = append(os.Environ(),env...) | ||
executablePath, err := os.Executable() | ||
if err != nil { | ||
returnnil,xerrors.Errorf("getting os executable: %w", err) | ||
} | ||
// Git on Windows resolves with UNIX-style paths. | ||
// If using backslashes, it's unable to find the executable. | ||
@@ -332,6 +357,14 @@ func (a *agent) handleSSHSession(session ssh.Session) error { | ||
} | ||
} | ||
} | ||
return cmd, nil | ||
} | ||
func (a *agent) handleSSHSession(session ssh.Session) error { | ||
cmd, err := a.createCommand(session.Context(), session.RawCommand(), session.Environ()) | ||
if err != nil { | ||
return err | ||
} | ||
sshPty, windowSize, isPty := session.Pty() | ||
if isPty { | ||
@@ -381,6 +414,194 @@ func (a *agent) handleSSHSession(session ssh.Session) error { | ||
return cmd.Wait() | ||
} | ||
func (a *agent) handleReconnectingPTY(ctx context.Context, rawID string, conn net.Conn) { | ||
defer conn.Close() | ||
// The ID format is referenced in conn.go. | ||
// <uuid>:<height>:<width> | ||
idParts := strings.Split(rawID, ":") | ||
if len(idParts) != 3 { | ||
a.logger.Warn(ctx, "client sent invalid id format", slog.F("raw-id", rawID)) | ||
return | ||
} | ||
id := idParts[0] | ||
// Enforce a consistent format for IDs. | ||
_, err := uuid.Parse(id) | ||
if err != nil { | ||
a.logger.Warn(ctx, "client sent reconnection token that isn't a uuid", slog.F("id", id), slog.Error(err)) | ||
return | ||
} | ||
// Parse the initial terminal dimensions. | ||
height, err := strconv.Atoi(idParts[1]) | ||
if err != nil { | ||
a.logger.Warn(ctx, "client sent invalid height", slog.F("id", id), slog.F("height", idParts[1])) | ||
return | ||
} | ||
width, err := strconv.Atoi(idParts[2]) | ||
if err != nil { | ||
a.logger.Warn(ctx, "client sent invalid width", slog.F("id", id), slog.F("width", idParts[2])) | ||
return | ||
} | ||
var rpty *reconnectingPTY | ||
rawRPTY, ok := a.reconnectingPTYs.Load(id) | ||
if ok { | ||
rpty, ok = rawRPTY.(*reconnectingPTY) | ||
if !ok { | ||
a.logger.Warn(ctx, "found invalid type in reconnecting pty map", slog.F("id", id)) | ||
} | ||
} else { | ||
// Empty command will default to the users shell! | ||
cmd, err := a.createCommand(ctx, "", nil) | ||
if err != nil { | ||
a.logger.Warn(ctx, "create reconnecting pty command", slog.Error(err)) | ||
return | ||
} | ||
cmd.Env = append(cmd.Env, "TERM=xterm-256color") | ||
ptty, process, err := pty.Start(cmd) | ||
if err != nil { | ||
a.logger.Warn(ctx, "start reconnecting pty command", slog.F("id", id)) | ||
} | ||
// Default to buffer 64KB. | ||
circularBuffer, err := circbuf.NewBuffer(64 * 1024) | ||
if err != nil { | ||
a.logger.Warn(ctx, "create circular buffer", slog.Error(err)) | ||
return | ||
} | ||
a.closeMutex.Lock() | ||
a.connCloseWait.Add(1) | ||
a.closeMutex.Unlock() | ||
ctx, cancelFunc := context.WithCancel(ctx) | ||
rpty = &reconnectingPTY{ | ||
activeConns: make(map[string]net.Conn), | ||
ptty: ptty, | ||
// Timeouts created with an after func can be reset! | ||
timeout: time.AfterFunc(a.reconnectingPTYTimeout, cancelFunc), | ||
circularBuffer: circularBuffer, | ||
} | ||
a.reconnectingPTYs.Store(id, rpty) | ||
go func() { | ||
// CommandContext isn't respected for Windows PTYs right now, | ||
// so we need to manually track the lifecycle. | ||
// When the context has been completed either: | ||
// 1. The timeout completed. | ||
// 2. The parent context was canceled. | ||
<-ctx.Done() | ||
_ = process.Kill() | ||
}() | ||
go func() { | ||
// If the process dies randomly, we should | ||
// close the pty. | ||
_, _ = process.Wait() | ||
rpty.Close() | ||
}() | ||
go func() { | ||
buffer := make([]byte, 1024) | ||
for { | ||
read, err := rpty.ptty.Output().Read(buffer) | ||
if err != nil { | ||
// When the PTY is closed, this is triggered. | ||
break | ||
} | ||
part := buffer[:read] | ||
_, err = rpty.circularBuffer.Write(part) | ||
if err != nil { | ||
a.logger.Error(ctx, "reconnecting pty write buffer", slog.Error(err), slog.F("id", id)) | ||
break | ||
} | ||
rpty.activeConnsMutex.Lock() | ||
for _, conn := range rpty.activeConns { | ||
_, _ = conn.Write(part) | ||
} | ||
rpty.activeConnsMutex.Unlock() | ||
} | ||
// Cleanup the process, PTY, and delete it's | ||
// ID from memory. | ||
_ = process.Kill() | ||
rpty.Close() | ||
a.reconnectingPTYs.Delete(id) | ||
a.connCloseWait.Done() | ||
}() | ||
} | ||
// Resize the PTY to initial height + width. | ||
err = rpty.ptty.Resize(uint16(height), uint16(width)) | ||
if err != nil { | ||
// We can continue after this, it's not fatal! | ||
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", id), slog.Error(err)) | ||
} | ||
// Write any previously stored data for the TTY. | ||
_, err = conn.Write(rpty.circularBuffer.Bytes()) | ||
if err != nil { | ||
a.logger.Warn(ctx, "write reconnecting pty buffer", slog.F("id", id), slog.Error(err)) | ||
return | ||
} | ||
connectionID := uuid.NewString() | ||
// Multiple connections to the same TTY are permitted. | ||
// This could easily be used for terminal sharing, but | ||
// we do it because it's a nice user experience to | ||
// copy/paste a terminal URL and have it _just work_. | ||
rpty.activeConnsMutex.Lock() | ||
rpty.activeConns[connectionID] = conn | ||
rpty.activeConnsMutex.Unlock() | ||
// Resetting this timeout prevents the PTY from exiting. | ||
rpty.timeout.Reset(a.reconnectingPTYTimeout) | ||
ctx, cancelFunc := context.WithCancel(ctx) | ||
defer cancelFunc() | ||
code-asher marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
heartbeat := time.NewTicker(a.reconnectingPTYTimeout / 2) | ||
defer heartbeat.Stop() | ||
go func() { | ||
// Keep updating the activity while this | ||
// connection is alive! | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-heartbeat.C: | ||
} | ||
rpty.timeout.Reset(a.reconnectingPTYTimeout) | ||
} | ||
}() | ||
defer func() { | ||
// After this connection ends, remove it from | ||
// the PTYs active connections. If it isn't | ||
// removed, all PTY data will be sent to it. | ||
rpty.activeConnsMutex.Lock() | ||
delete(rpty.activeConns, connectionID) | ||
rpty.activeConnsMutex.Unlock() | ||
}() | ||
decoder := json.NewDecoder(conn) | ||
var req ReconnectingPTYRequest | ||
for { | ||
err = decoder.Decode(&req) | ||
if xerrors.Is(err, io.EOF) { | ||
return | ||
} | ||
if err != nil { | ||
a.logger.Warn(ctx, "reconnecting pty buffer read error", slog.F("id", id), slog.Error(err)) | ||
return | ||
} | ||
_, err = rpty.ptty.Input().Write([]byte(req.Data)) | ||
if err != nil { | ||
a.logger.Warn(ctx, "write to reconnecting pty", slog.F("id", id), slog.Error(err)) | ||
return | ||
} | ||
// Check if a resize needs to happen! | ||
if req.Height == 0 || req.Width == 0 { | ||
continue | ||
} | ||
err = rpty.ptty.Resize(req.Height, req.Width) | ||
if err != nil { | ||
// We can continue after this, it's not fatal! | ||
a.logger.Error(ctx, "resize reconnecting pty", slog.F("id", id), slog.Error(err)) | ||
} | ||
} | ||
} | ||
// isClosed returns whether the API is closed or not. | ||
func (a *agent) isClosed() bool { | ||
select { | ||
@@ -403,3 +624,25 @@ func (a *agent) Close() error { | ||
a.connCloseWait.Wait() | ||
return nil | ||
} | ||
type reconnectingPTY struct { | ||
activeConnsMutex sync.Mutex | ||
activeConns map[string]net.Conn | ||
circularBuffer *circbuf.Buffer | ||
timeout *time.Timer | ||
ptty pty.PTY | ||
} | ||
// Close ends all connections to the reconnecting | ||
// PTY and clear the circular buffer. | ||
func (r *reconnectingPTY) Close() { | ||
r.activeConnsMutex.Lock() | ||
defer r.activeConnsMutex.Unlock() | ||
code-asher marked this conversation as resolved. Show resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
for _, conn := range r.activeConns { | ||
_ = conn.Close() | ||
} | ||
_ = r.ptty.Close() | ||
r.circularBuffer.Reset() | ||
r.timeout.Stop() | ||
} |
Oops, something went wrong.
Uh oh!
There was an error while loading.Please reload this page.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.