Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6e551ab

Browse files
authored
Merge branch 'main' into jakehwll/ai-bridge-observability
2 parents5d2cd7a +f8d9a80 commit6e551ab

File tree

65 files changed

+2509
-316
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2509
-316
lines changed

‎agent/agent.go‎

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/coder/coder/v2/agent/agentcontainers"
4242
"github.com/coder/coder/v2/agent/agentexec"
4343
"github.com/coder/coder/v2/agent/agentscripts"
44+
"github.com/coder/coder/v2/agent/agentsocket"
4445
"github.com/coder/coder/v2/agent/agentssh"
4546
"github.com/coder/coder/v2/agent/proto"
4647
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
@@ -97,6 +98,8 @@ type Options struct {
9798
Devcontainersbool
9899
DevcontainerAPIOptions []agentcontainers.Option// Enable Devcontainers for these to be effective.
99100
Clock quartz.Clock
101+
SocketServerEnabledbool
102+
SocketPathstring// Path for the agent socket server socket
100103
}
101104

102105
typeClientinterface {
@@ -202,6 +205,8 @@ func New(options Options) Agent {
202205

203206
devcontainers:options.Devcontainers,
204207
containerAPIOptions:options.DevcontainerAPIOptions,
208+
socketPath:options.SocketPath,
209+
socketServerEnabled:options.SocketServerEnabled,
205210
}
206211
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
207212
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
@@ -279,6 +284,10 @@ type agent struct {
279284
devcontainersbool
280285
containerAPIOptions []agentcontainers.Option
281286
containerAPI*agentcontainers.API
287+
288+
socketServerEnabledbool
289+
socketPathstring
290+
socketServer*agentsocket.Server
282291
}
283292

284293
func (a*agent)TailnetConn()*tailnet.Conn {
@@ -358,9 +367,32 @@ func (a *agent) init() {
358367
s.ExperimentalContainers=a.devcontainers
359368
},
360369
)
370+
371+
a.initSocketServer()
372+
361373
goa.runLoop()
362374
}
363375

376+
// initSocketServer initializes server that allows direct communication with a workspace agent using IPC.
377+
func (a*agent)initSocketServer() {
378+
if!a.socketServerEnabled {
379+
a.logger.Info(a.hardCtx,"socket server is disabled")
380+
return
381+
}
382+
383+
server,err:=agentsocket.NewServer(
384+
a.logger.Named("socket"),
385+
agentsocket.WithPath(a.socketPath),
386+
)
387+
iferr!=nil {
388+
a.logger.Warn(a.hardCtx,"failed to create socket server",slog.Error(err),slog.F("path",a.socketPath))
389+
return
390+
}
391+
392+
a.socketServer=server
393+
a.logger.Debug(a.hardCtx,"socket server started",slog.F("path",a.socketPath))
394+
}
395+
364396
// runLoop attempts to start the agent in a retry loop.
365397
// Coder may be offline temporarily, a connection issue
366398
// may be happening, but regardless after the intermittent
@@ -1928,13 +1960,20 @@ func (a *agent) Close() error {
19281960
lifecycleState=codersdk.WorkspaceAgentLifecycleShutdownError
19291961
}
19301962
}
1963+
19311964
a.setLifecycle(lifecycleState)
19321965

19331966
err=a.scriptRunner.Close()
19341967
iferr!=nil {
19351968
a.logger.Error(a.hardCtx,"script runner close",slog.Error(err))
19361969
}
19371970

1971+
ifa.socketServer!=nil {
1972+
iferr:=a.socketServer.Close();err!=nil {
1973+
a.logger.Error(a.hardCtx,"socket server close",slog.Error(err))
1974+
}
1975+
}
1976+
19381977
iferr:=a.containerAPI.Close();err!=nil {
19391978
a.logger.Error(a.hardCtx,"container API close",slog.Error(err))
19401979
}

‎agent/agentsocket/client.go‎

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package agentsocket
2+
3+
import (
4+
"context"
5+
6+
"golang.org/x/xerrors"
7+
"storj.io/drpc"
8+
"storj.io/drpc/drpcconn"
9+
10+
"github.com/coder/coder/v2/agent/agentsocket/proto"
11+
"github.com/coder/coder/v2/agent/unit"
12+
)
13+
14+
// Option represents a configuration option for NewClient.
15+
typeOptionfunc(*options)
16+
17+
typeoptionsstruct {
18+
pathstring
19+
}
20+
21+
// WithPath sets the socket path. If not provided or empty, the client will
22+
// auto-discover the default socket path.
23+
funcWithPath(pathstring)Option {
24+
returnfunc(opts*options) {
25+
ifpath=="" {
26+
return
27+
}
28+
opts.path=path
29+
}
30+
}
31+
32+
// Client provides a client for communicating with the workspace agentsocket API.
33+
typeClientstruct {
34+
client proto.DRPCAgentSocketClient
35+
conn drpc.Conn
36+
}
37+
38+
// NewClient creates a new socket client and opens a connection to the socket.
39+
// If path is not provided via WithPath or is empty, it will auto-discover the
40+
// default socket path.
41+
funcNewClient(ctx context.Context,opts...Option) (*Client,error) {
42+
options:=&options{}
43+
for_,opt:=rangeopts {
44+
opt(options)
45+
}
46+
47+
conn,err:=dialSocket(ctx,options.path)
48+
iferr!=nil {
49+
returnnil,xerrors.Errorf("connect to socket: %w",err)
50+
}
51+
52+
drpcConn:=drpcconn.New(conn)
53+
client:=proto.NewDRPCAgentSocketClient(drpcConn)
54+
55+
return&Client{
56+
client:client,
57+
conn:drpcConn,
58+
},nil
59+
}
60+
61+
// Close closes the socket connection.
62+
func (c*Client)Close()error {
63+
returnc.conn.Close()
64+
}
65+
66+
// Ping sends a ping request to the agent.
67+
func (c*Client)Ping(ctx context.Context)error {
68+
_,err:=c.client.Ping(ctx,&proto.PingRequest{})
69+
returnerr
70+
}
71+
72+
// SyncStart starts a unit in the dependency graph.
73+
func (c*Client)SyncStart(ctx context.Context,unitName unit.ID)error {
74+
_,err:=c.client.SyncStart(ctx,&proto.SyncStartRequest{
75+
Unit:string(unitName),
76+
})
77+
returnerr
78+
}
79+
80+
// SyncWant declares a dependency between units.
81+
func (c*Client)SyncWant(ctx context.Context,unitName,dependsOn unit.ID)error {
82+
_,err:=c.client.SyncWant(ctx,&proto.SyncWantRequest{
83+
Unit:string(unitName),
84+
DependsOn:string(dependsOn),
85+
})
86+
returnerr
87+
}
88+
89+
// SyncComplete marks a unit as complete in the dependency graph.
90+
func (c*Client)SyncComplete(ctx context.Context,unitName unit.ID)error {
91+
_,err:=c.client.SyncComplete(ctx,&proto.SyncCompleteRequest{
92+
Unit:string(unitName),
93+
})
94+
returnerr
95+
}
96+
97+
// SyncReady requests whether a unit is ready to be started. That is, all dependencies are satisfied.
98+
func (c*Client)SyncReady(ctx context.Context,unitName unit.ID) (bool,error) {
99+
resp,err:=c.client.SyncReady(ctx,&proto.SyncReadyRequest{
100+
Unit:string(unitName),
101+
})
102+
returnresp.Ready,err
103+
}
104+
105+
// SyncStatus gets the status of a unit and its dependencies.
106+
func (c*Client)SyncStatus(ctx context.Context,unitName unit.ID) (SyncStatusResponse,error) {
107+
resp,err:=c.client.SyncStatus(ctx,&proto.SyncStatusRequest{
108+
Unit:string(unitName),
109+
})
110+
iferr!=nil {
111+
returnSyncStatusResponse{},err
112+
}
113+
114+
vardependencies []DependencyInfo
115+
for_,dep:=rangeresp.Dependencies {
116+
dependencies=append(dependencies,DependencyInfo{
117+
DependsOn:unit.ID(dep.DependsOn),
118+
RequiredStatus:unit.Status(dep.RequiredStatus),
119+
CurrentStatus:unit.Status(dep.CurrentStatus),
120+
IsSatisfied:dep.IsSatisfied,
121+
})
122+
}
123+
124+
returnSyncStatusResponse{
125+
UnitName:unitName,
126+
Status:unit.Status(resp.Status),
127+
IsReady:resp.IsReady,
128+
Dependencies:dependencies,
129+
},nil
130+
}
131+
132+
// SyncStatusResponse contains the status information for a unit.
133+
typeSyncStatusResponsestruct {
134+
UnitName unit.ID`table:"unit,default_sort" json:"unit_name"`
135+
Status unit.Status`table:"status" json:"status"`
136+
IsReadybool`table:"ready" json:"is_ready"`
137+
Dependencies []DependencyInfo`table:"dependencies" json:"dependencies"`
138+
}
139+
140+
// DependencyInfo contains information about a unit dependency.
141+
typeDependencyInfostruct {
142+
DependsOn unit.ID`table:"depends on,default_sort" json:"depends_on"`
143+
RequiredStatus unit.Status`table:"required status" json:"required_status"`
144+
CurrentStatus unit.Status`table:"current status" json:"current_status"`
145+
IsSatisfiedbool`table:"satisfied" json:"is_satisfied"`
146+
}

‎agent/agentsocket/server.go‎

Lines changed: 11 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88

99
"golang.org/x/xerrors"
10-
11-
"github.com/hashicorp/yamux"
1210
"storj.io/drpc/drpcmux"
1311
"storj.io/drpc/drpcserver"
1412

@@ -33,11 +31,17 @@ type Server struct {
3331
wg sync.WaitGroup
3432
}
3533

36-
funcNewServer(pathstring,logger slog.Logger) (*Server,error) {
34+
// NewServer creates a new agent socket server.
35+
funcNewServer(logger slog.Logger,opts...Option) (*Server,error) {
36+
options:=&options{}
37+
for_,opt:=rangeopts {
38+
opt(options)
39+
}
40+
3741
logger=logger.Named("agentsocket-server")
3842
server:=&Server{
3943
logger:logger,
40-
path:path,
44+
path:options.path,
4145
service:&DRPCAgentSocketService{
4246
logger:logger,
4347
unitManager:unit.NewManager(),
@@ -61,14 +65,6 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
6165
},
6266
})
6367

64-
ifserver.path=="" {
65-
varerrerror
66-
server.path,err=getDefaultSocketPath()
67-
iferr!=nil {
68-
returnnil,xerrors.Errorf("get default socket path: %w",err)
69-
}
70-
}
71-
7268
listener,err:=createSocket(server.path)
7369
iferr!=nil {
7470
returnnil,xerrors.Errorf("create socket: %w",err)
@@ -91,6 +87,7 @@ func NewServer(path string, logger slog.Logger) (*Server, error) {
9187
returnserver,nil
9288
}
9389

90+
// Close stops the server and cleans up resources.
9491
func (s*Server)Close()error {
9592
s.mu.Lock()
9693

@@ -134,52 +131,8 @@ func (s *Server) acceptConnections() {
134131
return
135132
}
136133

137-
for {
138-
select {
139-
case<-s.ctx.Done():
140-
return
141-
default:
142-
}
143-
144-
conn,err:=listener.Accept()
145-
iferr!=nil {
146-
s.logger.Warn(s.ctx,"error accepting connection",slog.Error(err))
147-
continue
148-
}
149-
150-
s.mu.Lock()
151-
ifs.listener==nil {
152-
s.mu.Unlock()
153-
_=conn.Close()
154-
return
155-
}
156-
s.wg.Add(1)
157-
s.mu.Unlock()
158-
159-
gofunc() {
160-
defers.wg.Done()
161-
s.handleConnection(conn)
162-
}()
163-
}
164-
}
165-
166-
func (s*Server)handleConnection(conn net.Conn) {
167-
deferconn.Close()
168-
169-
s.logger.Debug(s.ctx,"new connection accepted",slog.F("remote_addr",conn.RemoteAddr()))
170-
171-
config:=yamux.DefaultConfig()
172-
config.LogOutput=nil
173-
config.Logger=slog.Stdlib(s.ctx,s.logger.Named("agentsocket-yamux"),slog.LevelInfo)
174-
session,err:=yamux.Server(conn,config)
175-
iferr!=nil {
176-
s.logger.Warn(s.ctx,"failed to create yamux session",slog.Error(err))
177-
return
178-
}
179-
defersession.Close()
180-
181-
err=s.drpcServer.Serve(s.ctx,session)
134+
err:=s.drpcServer.Serve(s.ctx,listener)
182135
iferr!=nil {
183-
s.logger.Debug(s.ctx,"drpc server finished",slog.Error(err))
136+
s.logger.Warn(s.ctx,"error servingdrpc server",slog.Error(err))
184137
}
185138
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp