Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitce627bf

Browse files
SasSwartmafredri
andauthored
feat: implement agent socket api, client and cli (#20758)
closes:#10352closes:coder/internal#1094closes:coder/internal#1095In this pull request, we enable a new set of experimental cli commandsgrouped under `coder exp sync`.These commands allow any process acting within a coder workspace toinform the coder agent of its requirements and execution progress. Thecoder agent will then relay this information to other processes thathave subscribed.These commands are:```# Check if this feature is enabled in your environment coder exp sync ping# express that your unit depends on anothercoder exp sync want <unit> <dependency_unit> # express that your unit intends to start a portion of the script that requires # other units to have completed first. This command blocks until all dependencies have been metcoder exp sync start <unit> # express that your unit has completes its work, allowing dependent units to begin their executioncoder exp sync complete <unit>```Example:In order to automatically run claude code in a new workspace, it mustfirst have a git repository cloned. The scripts responsible for cloningthe repository and for running claude code would coordinate in thefollowing way:```bash# Script A: Claude code# Inform the agent that the claude script wants the git script.# That is, the git script must have completed before the claude script can begin its executioncoder exp sync want claude git# Inform the agent that we would now like to begin execution of claude.# This command will block until the git script (and any other defined dependencies)# have completedcoder exp sync start claude# Now we run claude code and any other commands we needclaude ...# Once our script has completed, we inform the agent, so that any scripts that depend on this one# may begin their executioncoder exp sync complete claude``````bash# Script B: Git# Because the git script does not have any dependencies, we can simply inform the agent that we # intend to startcoder exp sync start gitgit clone ssh://git@github.com/coder/coder# Once the repository have been cloned, we inform the agent that this script is complete, so that# scripts that depend on it may begin their execution.coder exp sync complete git```Notes:* Unit names (ie. `claude` and `git`) given as input to the synccommands are arbitrary strings. You do not have to conform to specificidentifiers. We recommend naming your scripts descriptively, butsuccinctly.* Scripts unit names should be well documented. Other scripts will needto know the names you've chosen in order to depend on yours. Therefore,you---------Co-authored-by: Mathias Fredriksson <mafredri@gmail.com>
1 parentee58f40 commitce627bf

File tree

37 files changed

+1313
-276
lines changed

37 files changed

+1313
-276
lines changed

‎agent/agent.go‎

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/coder/coder/v2/agent/agentcontainers"
4242
"github.com/coder/coder/v2/agent/agentexec"
4343
"github.com/coder/coder/v2/agent/agentscripts"
44+
"github.com/coder/coder/v2/agent/agentsocket"
4445
"github.com/coder/coder/v2/agent/agentssh"
4546
"github.com/coder/coder/v2/agent/proto"
4647
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
@@ -97,6 +98,8 @@ type Options struct {
9798
Devcontainersbool
9899
DevcontainerAPIOptions []agentcontainers.Option// Enable Devcontainers for these to be effective.
99100
Clock quartz.Clock
101+
SocketServerEnabledbool
102+
SocketPathstring// Path for the agent socket server socket
100103
}
101104

102105
typeClientinterface {
@@ -202,6 +205,8 @@ func New(options Options) Agent {
202205

203206
devcontainers:options.Devcontainers,
204207
containerAPIOptions:options.DevcontainerAPIOptions,
208+
socketPath:options.SocketPath,
209+
socketServerEnabled:options.SocketServerEnabled,
205210
}
206211
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
207212
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
@@ -279,6 +284,10 @@ type agent struct {
279284
devcontainersbool
280285
containerAPIOptions []agentcontainers.Option
281286
containerAPI*agentcontainers.API
287+
288+
socketServerEnabledbool
289+
socketPathstring
290+
socketServer*agentsocket.Server
282291
}
283292

284293
func (a*agent)TailnetConn()*tailnet.Conn {
@@ -358,9 +367,32 @@ func (a *agent) init() {
358367
s.ExperimentalContainers=a.devcontainers
359368
},
360369
)
370+
371+
a.initSocketServer()
372+
361373
goa.runLoop()
362374
}
363375

376+
// initSocketServer initializes server that allows direct communication with a workspace agent using IPC.
377+
func (a*agent)initSocketServer() {
378+
if!a.socketServerEnabled {
379+
a.logger.Info(a.hardCtx,"socket server is disabled")
380+
return
381+
}
382+
383+
server,err:=agentsocket.NewServer(
384+
a.logger.Named("socket"),
385+
agentsocket.WithPath(a.socketPath),
386+
)
387+
iferr!=nil {
388+
a.logger.Warn(a.hardCtx,"failed to create socket server",slog.Error(err),slog.F("path",a.socketPath))
389+
return
390+
}
391+
392+
a.socketServer=server
393+
a.logger.Debug(a.hardCtx,"socket server started",slog.F("path",a.socketPath))
394+
}
395+
364396
// runLoop attempts to start the agent in a retry loop.
365397
// Coder may be offline temporarily, a connection issue
366398
// may be happening, but regardless after the intermittent
@@ -1928,13 +1960,20 @@ func (a *agent) Close() error {
19281960
lifecycleState=codersdk.WorkspaceAgentLifecycleShutdownError
19291961
}
19301962
}
1963+
19311964
a.setLifecycle(lifecycleState)
19321965

19331966
err=a.scriptRunner.Close()
19341967
iferr!=nil {
19351968
a.logger.Error(a.hardCtx,"script runner close",slog.Error(err))
19361969
}
19371970

1971+
ifa.socketServer!=nil {
1972+
iferr:=a.socketServer.Close();err!=nil {
1973+
a.logger.Error(a.hardCtx,"socket server close",slog.Error(err))
1974+
}
1975+
}
1976+
19381977
iferr:=a.containerAPI.Close();err!=nil {
19391978
a.logger.Error(a.hardCtx,"container API close",slog.Error(err))
19401979
}

‎agent/agentsocket/client.go‎

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package agentsocket
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/xerrors"
7+
"storj.io/drpc"
8+
"storj.io/drpc/drpcconn"
9+
10+
"github.com/coder/coder/v2/agent/agentsocket/proto"
11+
"github.com/coder/coder/v2/agent/unit"
12+
)
13+
14+
// Option represents a configuration option for NewClient.
15+
typeOptionfunc(*options)
16+
17+
typeoptionsstruct {
18+
pathstring
19+
}
20+
21+
// WithPath sets the socket path. If not provided or empty, the client will
22+
// auto-discover the default socket path.
23+
funcWithPath(pathstring)Option {
24+
returnfunc(opts*options) {
25+
ifpath=="" {
26+
return
27+
}
28+
opts.path=path
29+
}
30+
}
31+
32+
// Client provides a client for communicating with the workspace agentsocket API.
33+
typeClientstruct {
34+
client proto.DRPCAgentSocketClient
35+
conn drpc.Conn
36+
}
37+
38+
// NewClient creates a new socket client and opens a connection to the socket.
39+
// If path is not provided via WithPath or is empty, it will auto-discover the
40+
// default socket path.
41+
funcNewClient(ctx context.Context,opts...Option) (*Client,error) {
42+
options:=&options{}
43+
for_,opt:=rangeopts {
44+
opt(options)
45+
}
46+
47+
conn,err:=dialSocket(ctx,options.path)
48+
iferr!=nil {
49+
returnnil,xerrors.Errorf("connect to socket: %w",err)
50+
}
51+
52+
drpcConn:=drpcconn.New(conn)
53+
client:=proto.NewDRPCAgentSocketClient(drpcConn)
54+
55+
return&Client{
56+
client:client,
57+
conn:drpcConn,
58+
},nil
59+
}
60+
61+
// Close closes the socket connection.
62+
func (c*Client)Close()error {
63+
returnc.conn.Close()
64+
}
65+
66+
// Ping sends a ping request to the agent.
67+
func (c*Client)Ping(ctx context.Context)error {
68+
_,err:=c.client.Ping(ctx,&proto.PingRequest{})
69+
returnerr
70+
}
71+
72+
// SyncStart starts a unit in the dependency graph.
73+
func (c*Client)SyncStart(ctx context.Context,unitName unit.ID)error {
74+
_,err:=c.client.SyncStart(ctx,&proto.SyncStartRequest{
75+
Unit:string(unitName),
76+
})
77+
returnerr
78+
}
79+
80+
// SyncWant declares a dependency between units.
81+
func (c*Client)SyncWant(ctx context.Context,unitName,dependsOn unit.ID)error {
82+
_,err:=c.client.SyncWant(ctx,&proto.SyncWantRequest{
83+
Unit:string(unitName),
84+
DependsOn:string(dependsOn),
85+
})
86+
returnerr
87+
}
88+
89+
// SyncComplete marks a unit as complete in the dependency graph.
90+
func (c*Client)SyncComplete(ctx context.Context,unitName unit.ID)error {
91+
_,err:=c.client.SyncComplete(ctx,&proto.SyncCompleteRequest{
92+
Unit:string(unitName),
93+
})
94+
returnerr
95+
}
96+
97+
// SyncReady requests whether a unit is ready to be started. That is, all dependencies are satisfied.
98+
func (c*Client)SyncReady(ctx context.Context,unitName unit.ID) (bool,error) {
99+
resp,err:=c.client.SyncReady(ctx,&proto.SyncReadyRequest{
100+
Unit:string(unitName),
101+
})
102+
returnresp.Ready,err
103+
}
104+
105+
// SyncStatus gets the status of a unit and its dependencies.
106+
func (c*Client)SyncStatus(ctx context.Context,unitName unit.ID) (SyncStatusResponse,error) {
107+
resp,err:=c.client.SyncStatus(ctx,&proto.SyncStatusRequest{
108+
Unit:string(unitName),
109+
})
110+
iferr!=nil {
111+
returnSyncStatusResponse{},err
112+
}
113+
114+
vardependencies []DependencyInfo
115+
for_,dep:=rangeresp.Dependencies {
116+
dependencies=append(dependencies,DependencyInfo{
117+
DependsOn:unit.ID(dep.DependsOn),
118+
RequiredStatus:unit.Status(dep.RequiredStatus),
119+
CurrentStatus:unit.Status(dep.CurrentStatus),
120+
IsSatisfied:dep.IsSatisfied,
121+
})
122+
}
123+
124+
returnSyncStatusResponse{
125+
UnitName:unitName,
126+
Status:unit.Status(resp.Status),
127+
IsReady:resp.IsReady,
128+
Dependencies:dependencies,
129+
},nil
130+
}
131+
132+
// SyncStatusResponse contains the status information for a unit.
133+
typeSyncStatusResponsestruct {
134+
UnitName unit.ID`table:"unit,default_sort" json:"unit_name"`
135+
Status unit.Status`table:"status" json:"status"`
136+
IsReadybool`table:"ready" json:"is_ready"`
137+
Dependencies []DependencyInfo`table:"dependencies" json:"dependencies"`
138+
}
139+
140+
// DependencyInfo contains information about a unit dependency.
141+
typeDependencyInfostruct {
142+
DependsOn unit.ID`table:"depends on,default_sort" json:"depends_on"`
143+
RequiredStatus unit.Status`table:"required status" json:"required_status"`
144+
CurrentStatus unit.Status`table:"current status" json:"current_status"`
145+
IsSatisfiedbool`table:"satisfied" json:"is_satisfied"`
146+
}

‎agent/agentsocket/server.go‎

Lines changed: 11 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88

99
"golang.org/x/xerrors"
10-
11-
"github.com/hashicorp/yamux"
1210
"storj.io/drpc/drpcmux"
1311
"storj.io/drpc/drpcserver"
1412

@@ -33,11 +31,17 @@ type Server struct {
3331
wg sync.WaitGroup
3432
}
3533

36-
funcNewServer(pathstring,logger slog.Logger) (*Server,error) {
34+
// NewServer creates a new agent socket server.
35+
funcNewServer(logger slog.Logger,opts...Option) (*Server,error) {
36+
options:=&options{}
37+
for_,opt:=rangeopts {
38+
opt(options)
39+
}
40+
3741
logger=logger.Named("agentsocket-server")
3842
server:=&Server{
3943
logger:logger,
40-
path:path,
44+
path:options.path,
4145
service:&DRPCAgentSocketService{
4246
logger:logger,
4347
unitManager:unit.NewManager(),
@@ -61,14 +65,6 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
6165
},
6266
})
6367

64-
ifserver.path=="" {
65-
varerrerror
66-
server.path,err=getDefaultSocketPath()
67-
iferr!=nil {
68-
returnnil,xerrors.Errorf("get default socket path: %w",err)
69-
}
70-
}
71-
7268
listener,err:=createSocket(server.path)
7369
iferr!=nil {
7470
returnnil,xerrors.Errorf("create socket: %w",err)
@@ -91,6 +87,7 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
9187
returnserver,nil
9288
}
9389

90+
// Close stops the server and cleans up resources.
9491
func (s*Server)Close()error {
9592
s.mu.Lock()
9693

@@ -134,52 +131,8 @@ func (s *Server) acceptConnections() {
134131
return
135132
}
136133

137-
for {
138-
select {
139-
case<-s.ctx.Done():
140-
return
141-
default:
142-
}
143-
144-
conn,err:=listener.Accept()
145-
iferr!=nil {
146-
s.logger.Warn(s.ctx,"error accepting connection",slog.Error(err))
147-
continue
148-
}
149-
150-
s.mu.Lock()
151-
ifs.listener==nil {
152-
s.mu.Unlock()
153-
_=conn.Close()
154-
return
155-
}
156-
s.wg.Add(1)
157-
s.mu.Unlock()
158-
159-
gofunc() {
160-
defers.wg.Done()
161-
s.handleConnection(conn)
162-
}()
163-
}
164-
}
165-
166-
func (s*Server)handleConnection(conn net.Conn) {
167-
deferconn.Close()
168-
169-
s.logger.Debug(s.ctx,"new connection accepted",slog.F("remote_addr",conn.RemoteAddr()))
170-
171-
config:=yamux.DefaultConfig()
172-
config.LogOutput=nil
173-
config.Logger=slog.Stdlib(s.ctx,s.logger.Named("agentsocket-yamux"),slog.LevelInfo)
174-
session,err:=yamux.Server(conn,config)
175-
iferr!=nil {
176-
s.logger.Warn(s.ctx,"failed to create yamux session",slog.Error(err))
177-
return
178-
}
179-
defersession.Close()
180-
181-
err=s.drpcServer.Serve(s.ctx,session)
134+
err:=s.drpcServer.Serve(s.ctx,listener)
182135
iferr!=nil {
183-
s.logger.Debug(s.ctx,"drpc server finished",slog.Error(err))
136+
s.logger.Warn(s.ctx,"error servingdrpc server",slog.Error(err))
184137
}
185138
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp