Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: fix goroutine leak in log streaming over websocket#15709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
spikecurtis merged 1 commit intomainfromspike/14881-read-from-json-websockets
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletionscoderd/provisionerjobs.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -15,6 +15,7 @@ import (
"nhooyr.io/websocket"

"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk/wsjson"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
Expand DownExpand Up@@ -312,6 +313,7 @@ type logFollower struct {
r *http.Request
rw http.ResponseWriter
conn *websocket.Conn
enc *wsjson.Encoder[codersdk.ProvisionerJobLog]

jobID uuid.UUID
after int64
Expand DownExpand Up@@ -391,6 +393,7 @@ func (f *logFollower) follow() {
}
defer f.conn.Close(websocket.StatusNormalClosure, "done")
go httpapi.Heartbeat(f.ctx, f.conn)
f.enc = wsjson.NewEncoder[codersdk.ProvisionerJobLog](f.conn, websocket.MessageText)

// query for logs once right away, so we can get historical data from before
// subscription
Expand DownExpand Up@@ -488,11 +491,7 @@ func (f *logFollower) query() error {
return xerrors.Errorf("error fetching logs: %w", err)
}
for _, log := range logs {
logB, err := json.Marshal(convertProvisionerJobLog(log))
if err != nil {
return xerrors.Errorf("error marshaling log: %w", err)
}
err = f.conn.Write(f.ctx, websocket.MessageText, logB)
err := f.enc.Encode(convertProvisionerJobLog(log))
if err != nil {
return xerrors.Errorf("error writing to websocket: %w", err)
}
Expand Down
24 changes: 7 additions & 17 deletionscoderd/workspaceagents.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -39,6 +39,7 @@ import (
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/codersdk/wsjson"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
)
Expand DownExpand Up@@ -396,11 +397,9 @@ func (api *API) workspaceAgentLogs(rw http.ResponseWriter, r *http.Request) {
}
go httpapi.Heartbeat(ctx, conn)

ctx, wsNetConn:= codersdk.WebsocketNetConn(ctx,conn, websocket.MessageText)
deferwsNetConn.Close() // Also closes conn.
encoder:=wsjson.NewEncoder[[]codersdk.WorkspaceAgentLog](conn, websocket.MessageText)
deferencoder.Close(websocket.StatusNormalClosure)

// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(wsNetConn)
err = encoder.Encode(convertWorkspaceAgentLogs(logs))
if err != nil {
return
Expand DownExpand Up@@ -740,16 +739,8 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
})
return
}
ctx, nconn := codersdk.WebsocketNetConn(ctx, ws, websocket.MessageBinary)
defer nconn.Close()

// Slurp all packets from the connection into io.Discard so pongs get sent
// by the websocket package. We don't do any reads ourselves so this is
// necessary.
go func() {
_, _ = io.Copy(io.Discard, nconn)
_ = nconn.Close()
}()
encoder := wsjson.NewEncoder[*tailcfg.DERPMap](ws, websocket.MessageBinary)
defer encoder.Close(websocket.StatusGoingAway)

go func(ctx context.Context) {
// TODO(mafredri): Is this too frequent? Use separate ping disconnect timeout?
Expand All@@ -767,7 +758,7 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
err := ws.Ping(ctx)
cancel()
if err != nil {
_ =nconn.Close()
_ =ws.Close(websocket.StatusGoingAway, "ping failed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

👍🏻

return
}
}
Expand All@@ -780,9 +771,8 @@ func (api *API) derpMapUpdates(rw http.ResponseWriter, r *http.Request) {
for {
derpMap := api.DERPMap()
if lastDERPMap == nil || !tailnet.CompareDERPMaps(lastDERPMap, derpMap) {
err :=json.NewEncoder(nconn).Encode(derpMap)
err :=encoder.Encode(derpMap)
if err != nil {
_ = nconn.Close()
return
}
lastDERPMap = derpMap
Expand Down
33 changes: 3 additions & 30 deletionscodersdk/provisionerdaemons.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,6 +19,7 @@ import (

"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/codersdk/wsjson"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner"
)
Expand DownExpand Up@@ -162,36 +163,8 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
}
return nil, nil, ReadBodyAsError(res)
}
logs := make(chan ProvisionerJobLog)
closed := make(chan struct{})
go func() {
defer close(closed)
defer close(logs)
defer conn.Close(websocket.StatusGoingAway, "")
var log ProvisionerJobLog
for {
msgType, msg, err := conn.Read(ctx)
if err != nil {
return
}
if msgType != websocket.MessageText {
return
}
err = json.Unmarshal(msg, &log)
if err != nil {
return
}
select {
case <-ctx.Done():
return
case logs <- log:
}
}
}()
return logs, closeFunc(func() error {
<-closed
return nil
}), nil
d := wsjson.NewDecoder[ProvisionerJobLog](conn, websocket.MessageText, c.logger)
return d.Chan(), d, nil
}

// ServeProvisionerDaemonRequest are the parameters to call ServeProvisionerDaemon with
Expand Down
29 changes: 3 additions & 26 deletionscodersdk/workspaceagents.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -15,6 +15,7 @@ import (
"nhooyr.io/websocket"

"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/codersdk/wsjson"
)

type WorkspaceAgentStatus string
Expand DownExpand Up@@ -454,30 +455,6 @@ func (c *Client) WorkspaceAgentLogsAfter(ctx context.Context, agentID uuid.UUID,
}
return nil, nil, ReadBodyAsError(res)
}
logChunks := make(chan []WorkspaceAgentLog, 1)
closed := make(chan struct{})
ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageText)
decoder := json.NewDecoder(wsNetConn)
go func() {
defer close(closed)
defer close(logChunks)
defer conn.Close(websocket.StatusGoingAway, "")
for {
var logs []WorkspaceAgentLog
err = decoder.Decode(&logs)
if err != nil {
return
}
select {
case <-ctx.Done():
return
case logChunks <- logs:
}
}
}()
return logChunks, closeFunc(func() error {
_ = wsNetConn.Close()
<-closed
return nil
}), nil
d := wsjson.NewDecoder[[]WorkspaceAgentLog](conn, websocket.MessageText, c.logger)
return d.Chan(), d, nil
}
75 changes: 75 additions & 0 deletionscodersdk/wsjson/decoder.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
package wsjson

import (
"context"
"encoding/json"
"sync/atomic"

"nhooyr.io/websocket"

"cdr.dev/slog"
)

type Decoder[T any] struct {
conn *websocket.Conn
typ websocket.MessageType
ctx context.Context
cancel context.CancelFunc
chanCalled atomic.Bool
logger slog.Logger
}

// Chan starts the decoder reading from the websocket and returns a channel for reading the
// resulting values. The chan T is closed if the underlying websocket is closed, or we encounter an
// error. We also close the underlying websocket if we encounter an error reading or decoding.
func (d *Decoder[T]) Chan() <-chan T {
if !d.chanCalled.CompareAndSwap(false, true) {
panic("chan called more than once")
}
values := make(chan T, 1)
go func() {
defer close(values)
defer d.conn.Close(websocket.StatusGoingAway, "")
for {
// we don't use d.ctx here because it only gets canceled after closing the connection
// and a "connection closed" type error is more clear than context canceled.
typ, b, err := d.conn.Read(context.Background())
if err != nil {
// might be benign like EOF, so just log at debug
d.logger.Debug(d.ctx, "error reading from websocket", slog.Error(err))
return
}
if typ != d.typ {
d.logger.Error(d.ctx, "websocket type mismatch while decoding")
return
}
var value T
err = json.Unmarshal(b, &value)
if err != nil {
d.logger.Error(d.ctx, "error unmarshalling", slog.Error(err))
return
}
select {
case values <- value:
// OK
case <-d.ctx.Done():
return
}
}
}()
return values
}

// nolint: revive // complains that Encoder has the same function name
func (d *Decoder[T]) Close() error {
err := d.conn.Close(websocket.StatusNormalClosure, "")
d.cancel()
return err
}

// NewDecoder creates a JSON-over-websocket decoder for type T, which must be deserializable from
// JSON.
func NewDecoder[T any](conn *websocket.Conn, typ websocket.MessageType, logger slog.Logger) *Decoder[T] {
ctx, cancel := context.WithCancel(context.Background())
return &Decoder[T]{conn: conn, ctx: ctx, cancel: cancel, typ: typ, logger: logger}
}
42 changes: 42 additions & 0 deletionscodersdk/wsjson/encoder.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
package wsjson

import (
"context"
"encoding/json"

"golang.org/x/xerrors"
"nhooyr.io/websocket"
)

type Encoder[T any] struct {
conn *websocket.Conn
typ websocket.MessageType
}

func (e *Encoder[T]) Encode(v T) error {
w, err := e.conn.Writer(context.Background(), e.typ)
if err != nil {
return xerrors.Errorf("get websocket writer: %w", err)
}
defer w.Close()
j := json.NewEncoder(w)
err = j.Encode(v)
if err != nil {
return xerrors.Errorf("encode json: %w", err)
}
return nil
}

func (e *Encoder[T]) Close(c websocket.StatusCode) error {
return e.conn.Close(c, "")
}

// NewEncoder creates a JSON-over websocket encoder for the type T, which must be JSON-serializable.
// You may then call Encode() to send objects over the websocket. Creating an Encoder closes the
// websocket for reading, turning it into a unidirectional write stream of JSON-encoded objects.
func NewEncoder[T any](conn *websocket.Conn, typ websocket.MessageType) *Encoder[T] {
// Here we close the websocket for reading, so that the websocket library will handle pings and
// close frames.
_ = conn.CloseRead(context.Background())
return &Encoder[T]{conn: conn, typ: typ}
}
Loading

[8]ページ先頭

©2009-2025 Movatter.jp