Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat: implement agent socket api, client and cli (#20758)#20976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
SasSwart wants to merge1 commit intorelease/2.29
base:release/2.29
Choose a base branch
Loading
fromjjs/release-2.29-scriptordering
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletionsagent/agent.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -41,6 +41,7 @@ import (
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentscripts"
"github.com/coder/coder/v2/agent/agentsocket"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
Expand DownExpand Up@@ -97,6 +98,8 @@ type Options struct {
Devcontainers bool
DevcontainerAPIOptions []agentcontainers.Option // Enable Devcontainers for these to be effective.
Clock quartz.Clock
SocketServerEnabled bool
SocketPath string // Path for the agent socket server socket
}

type Client interface {
Expand DownExpand Up@@ -202,6 +205,8 @@ func New(options Options) Agent {

devcontainers: options.Devcontainers,
containerAPIOptions: options.DevcontainerAPIOptions,
socketPath: options.SocketPath,
socketServerEnabled: options.SocketServerEnabled,
}
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
Expand DownExpand Up@@ -279,6 +284,10 @@ type agent struct {
devcontainers bool
containerAPIOptions []agentcontainers.Option
containerAPI *agentcontainers.API

socketServerEnabled bool
socketPath string
socketServer *agentsocket.Server
}

func (a *agent) TailnetConn() *tailnet.Conn {
Expand DownExpand Up@@ -358,9 +367,32 @@ func (a *agent) init() {
s.ExperimentalContainers = a.devcontainers
},
)

a.initSocketServer()

go a.runLoop()
}

// initSocketServer initializes server that allows direct communication with a workspace agent using IPC.
func (a *agent) initSocketServer() {
if !a.socketServerEnabled {
a.logger.Info(a.hardCtx, "socket server is disabled")
return
}

server, err := agentsocket.NewServer(
a.logger.Named("socket"),
agentsocket.WithPath(a.socketPath),
)
if err != nil {
a.logger.Warn(a.hardCtx, "failed to create socket server", slog.Error(err), slog.F("path", a.socketPath))
return
}

a.socketServer = server
a.logger.Debug(a.hardCtx, "socket server started", slog.F("path", a.socketPath))
}

// runLoop attempts to start the agent in a retry loop.
// Coder may be offline temporarily, a connection issue
// may be happening, but regardless after the intermittent
Expand DownExpand Up@@ -1928,13 +1960,20 @@ func (a *agent) Close() error {
lifecycleState = codersdk.WorkspaceAgentLifecycleShutdownError
}
}

a.setLifecycle(lifecycleState)

err = a.scriptRunner.Close()
if err != nil {
a.logger.Error(a.hardCtx, "script runner close", slog.Error(err))
}

if a.socketServer != nil {
if err := a.socketServer.Close(); err != nil {
a.logger.Error(a.hardCtx, "socket server close", slog.Error(err))
}
}

if err := a.containerAPI.Close(); err != nil {
a.logger.Error(a.hardCtx, "container API close", slog.Error(err))
}
Expand Down
146 changes: 146 additions & 0 deletionsagent/agentsocket/client.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
package agentsocket

import (
"context"

"golang.org/x/xerrors"
"storj.io/drpc"
"storj.io/drpc/drpcconn"

"github.com/coder/coder/v2/agent/agentsocket/proto"
"github.com/coder/coder/v2/agent/unit"
)

// Option represents a configuration option for NewClient.
type Option func(*options)

type options struct {
path string
}

// WithPath sets the socket path. If not provided or empty, the client will
// auto-discover the default socket path.
func WithPath(path string) Option {
return func(opts *options) {
if path == "" {
return
}
opts.path = path
}
}

// Client provides a client for communicating with the workspace agentsocket API.
type Client struct {
client proto.DRPCAgentSocketClient
conn drpc.Conn
}

// NewClient creates a new socket client and opens a connection to the socket.
// If path is not provided via WithPath or is empty, it will auto-discover the
// default socket path.
func NewClient(ctx context.Context, opts ...Option) (*Client, error) {
options := &options{}
for _, opt := range opts {
opt(options)
}

conn, err := dialSocket(ctx, options.path)
if err != nil {
return nil, xerrors.Errorf("connect to socket: %w", err)
}

drpcConn := drpcconn.New(conn)
client := proto.NewDRPCAgentSocketClient(drpcConn)

return &Client{
client: client,
conn: drpcConn,
}, nil
}

// Close closes the socket connection.
func (c *Client) Close() error {
return c.conn.Close()
}

// Ping sends a ping request to the agent.
func (c *Client) Ping(ctx context.Context) error {
_, err := c.client.Ping(ctx, &proto.PingRequest{})
return err
}

// SyncStart starts a unit in the dependency graph.
func (c *Client) SyncStart(ctx context.Context, unitName unit.ID) error {
_, err := c.client.SyncStart(ctx, &proto.SyncStartRequest{
Unit: string(unitName),
})
return err
}

// SyncWant declares a dependency between units.
func (c *Client) SyncWant(ctx context.Context, unitName, dependsOn unit.ID) error {
_, err := c.client.SyncWant(ctx, &proto.SyncWantRequest{
Unit: string(unitName),
DependsOn: string(dependsOn),
})
return err
}

// SyncComplete marks a unit as complete in the dependency graph.
func (c *Client) SyncComplete(ctx context.Context, unitName unit.ID) error {
_, err := c.client.SyncComplete(ctx, &proto.SyncCompleteRequest{
Unit: string(unitName),
})
return err
}

// SyncReady requests whether a unit is ready to be started. That is, all dependencies are satisfied.
func (c *Client) SyncReady(ctx context.Context, unitName unit.ID) (bool, error) {
resp, err := c.client.SyncReady(ctx, &proto.SyncReadyRequest{
Unit: string(unitName),
})
return resp.Ready, err
}

// SyncStatus gets the status of a unit and its dependencies.
func (c *Client) SyncStatus(ctx context.Context, unitName unit.ID) (SyncStatusResponse, error) {
resp, err := c.client.SyncStatus(ctx, &proto.SyncStatusRequest{
Unit: string(unitName),
})
if err != nil {
return SyncStatusResponse{}, err
}

var dependencies []DependencyInfo
for _, dep := range resp.Dependencies {
dependencies = append(dependencies, DependencyInfo{
DependsOn: unit.ID(dep.DependsOn),
RequiredStatus: unit.Status(dep.RequiredStatus),
CurrentStatus: unit.Status(dep.CurrentStatus),
IsSatisfied: dep.IsSatisfied,
})
}

return SyncStatusResponse{
UnitName: unitName,
Status: unit.Status(resp.Status),
IsReady: resp.IsReady,
Dependencies: dependencies,
}, nil
}

// SyncStatusResponse contains the status information for a unit.
type SyncStatusResponse struct {
UnitName unit.ID `table:"unit,default_sort" json:"unit_name"`
Status unit.Status `table:"status" json:"status"`
IsReady bool `table:"ready" json:"is_ready"`
Dependencies []DependencyInfo `table:"dependencies" json:"dependencies"`
}

// DependencyInfo contains information about a unit dependency.
type DependencyInfo struct {
DependsOn unit.ID `table:"depends on,default_sort" json:"depends_on"`
RequiredStatus unit.Status `table:"required status" json:"required_status"`
CurrentStatus unit.Status `table:"current status" json:"current_status"`
IsSatisfied bool `table:"satisfied" json:"is_satisfied"`
}
69 changes: 11 additions & 58 deletionsagent/agentsocket/server.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,8 +7,6 @@ import (
"sync"

"golang.org/x/xerrors"

"github.com/hashicorp/yamux"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"

Expand All@@ -33,11 +31,17 @@ type Server struct {
wg sync.WaitGroup
}

func NewServer(path string, logger slog.Logger) (*Server, error) {
// NewServer creates a new agent socket server.
func NewServer(logger slog.Logger, opts ...Option) (*Server, error) {
options := &options{}
for _, opt := range opts {
opt(options)
}

logger = logger.Named("agentsocket-server")
server := &Server{
logger: logger,
path: path,
path:options.path,
service: &DRPCAgentSocketService{
logger: logger,
unitManager: unit.NewManager(),
Expand All@@ -61,14 +65,6 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
},
})

if server.path == "" {
var err error
server.path, err = getDefaultSocketPath()
if err != nil {
return nil, xerrors.Errorf("get default socket path: %w", err)
}
}

listener, err := createSocket(server.path)
if err != nil {
return nil, xerrors.Errorf("create socket: %w", err)
Expand All@@ -91,6 +87,7 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
return server, nil
}

// Close stops the server and cleans up resources.
func (s *Server) Close() error {
s.mu.Lock()

Expand DownExpand Up@@ -134,52 +131,8 @@ func (s *Server) acceptConnections() {
return
}

for {
select {
case <-s.ctx.Done():
return
default:
}

conn, err := listener.Accept()
if err != nil {
s.logger.Warn(s.ctx, "error accepting connection", slog.Error(err))
continue
}

s.mu.Lock()
if s.listener == nil {
s.mu.Unlock()
_ = conn.Close()
return
}
s.wg.Add(1)
s.mu.Unlock()

go func() {
defer s.wg.Done()
s.handleConnection(conn)
}()
}
}

func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()

s.logger.Debug(s.ctx, "new connection accepted", slog.F("remote_addr", conn.RemoteAddr()))

config := yamux.DefaultConfig()
config.LogOutput = nil
config.Logger = slog.Stdlib(s.ctx, s.logger.Named("agentsocket-yamux"), slog.LevelInfo)
session, err := yamux.Server(conn, config)
if err != nil {
s.logger.Warn(s.ctx, "failed to create yamux session", slog.Error(err))
return
}
defer session.Close()

err = s.drpcServer.Serve(s.ctx, session)
err := s.drpcServer.Serve(s.ctx, listener)
if err != nil {
s.logger.Debug(s.ctx, "drpc server finished", slog.Error(err))
s.logger.Warn(s.ctx, "error servingdrpc server", slog.Error(err))
}
}
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp