Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6f02c02

Browse files
committed
feat: use agent v2 API to send agent logs
1 parent14d0583 commit6f02c02

File tree

8 files changed

+118
-375
lines changed

8 files changed

+118
-375
lines changed

‎agent/agent.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ type Client interface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn,error)
9292
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
9393
PostMetadata(ctx context.Context,req agentsdk.PostMetadataRequest)error
94-
PatchLogs(ctx context.Context,req agentsdk.PatchLogs)error
9594
RewriteDERPMap(derpMap*tailcfg.DERPMap)
9695
}
9796

@@ -165,6 +164,7 @@ func New(options Options) Agent {
165164
syscaller:options.Syscaller,
166165
modifiedProcs:options.ModifiedProcesses,
167166
processManagementTick:options.ProcessManagementTick,
167+
logSender:newLogSender(options.Logger),
168168

169169
prometheusRegistry:prometheusRegistry,
170170
metrics:newAgentMetrics(prometheusRegistry),
@@ -215,6 +215,7 @@ type agent struct {
215215
network*tailnet.Conn
216216
addresses []netip.Prefix
217217
statsReporter*statsReporter
218+
logSender*logSender
218219

219220
connCountReconnectingPTY atomic.Int64
220221

@@ -245,11 +246,11 @@ func (a *agent) init(ctx context.Context) {
245246
sshSrv.ServiceBanner=&a.serviceBanner
246247
a.sshServer=sshSrv
247248
a.scriptRunner=agentscripts.New(agentscripts.Options{
248-
LogDir:a.logDir,
249-
Logger:a.logger,
250-
SSHServer:sshSrv,
251-
Filesystem:a.filesystem,
252-
PatchLogs:a.client.PatchLogs,
249+
LogDir:a.logDir,
250+
Logger:a.logger,
251+
SSHServer:sshSrv,
252+
Filesystem:a.filesystem,
253+
GetScriptLogger:a.logSender.getScriptLogger,
253254
})
254255
// Register runner metrics. If the prom registry is nil, the metrics
255256
// will not report anywhere.
@@ -876,6 +877,15 @@ func (a *agent) run(ctx context.Context) error {
876877
returnnil
877878
})
878879

880+
eg.Go(func()error {
881+
a.logger.Debug(egCtx,"running send logs loop")
882+
err:=a.logSender.sendLoop(egCtx,aAPI)
883+
iferr!=nil {
884+
returnxerrors.Errorf("send logs loop: %w",err)
885+
}
886+
returnnil
887+
})
888+
879889
returneg.Wait()
880890
}
881891

‎agent/agentscripts/agentscripts.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
17+
1618
"github.com/prometheus/client_golang/prometheus"
1719
"github.com/robfig/cron/v3"
1820
"github.com/spf13/afero"
@@ -41,13 +43,18 @@ var (
4143
parser=cron.NewParser(cron.Second|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.DowOptional)
4244
)
4345

46+
typeScriptLoggerinterface {
47+
Send(ctx context.Context,log...agentsdk.Log)error
48+
Flush(context.Context)error
49+
}
50+
4451
// Options are a set of options for the runner.
4552
typeOptionsstruct {
46-
LogDirstring
47-
Logger slog.Logger
48-
SSHServer*agentssh.Server
49-
Filesystem afero.Fs
50-
PatchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error
53+
LogDirstring
54+
Loggerslog.Logger
55+
SSHServer*agentssh.Server
56+
Filesystemafero.Fs
57+
GetScriptLoggerfunc(logSourceID uuid.UUID)ScriptLogger
5158
}
5259

5360
// New creates a runner for the provided scripts.
@@ -238,20 +245,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
238245
cmd.WaitDelay=10*time.Second
239246
cmd.Cancel=cmdCancel(cmd)
240247

241-
send,flushAndClose:=agentsdk.LogsSender(script.LogSourceID,r.PatchLogs,logger)
248+
scriptLogger:=r.GetScriptLogger(script.LogSourceID)
242249
// If ctx is canceled here (or in a writer below), we may be
243250
// discarding logs, but that's okay because we're shutting down
244251
// anyway. We could consider creating a new context here if we
245252
// want better control over flush during shutdown.
246253
deferfunc() {
247-
iferr:=flushAndClose(ctx);err!=nil {
254+
iferr:=scriptLogger.Flush(ctx);err!=nil {
248255
logger.Warn(ctx,"flush startup logs failed",slog.Error(err))
249256
}
250257
}()
251258

252-
infoW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelInfo)
259+
infoW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelInfo)
253260
deferinfoW.Close()
254-
errW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelError)
261+
errW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelError)
255262
defererrW.Close()
256263
cmd.Stdout=io.MultiWriter(fileWriter,infoW)
257264
cmd.Stderr=io.MultiWriter(fileWriter,errW)

‎agent/agentscripts/agentscripts_test.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/coder/coder/v2/testutil"
9+
10+
"github.com/google/uuid"
11+
812
"github.com/prometheus/client_golang/prometheus"
913
"github.com/spf13/afero"
1014
"github.com/stretchr/testify/require"
@@ -24,10 +28,10 @@ func TestMain(m *testing.M) {
2428

2529
funcTestExecuteBasic(t*testing.T) {
2630
t.Parallel()
27-
logs:=make(chan agentsdk.PatchLogs,1)
28-
runner:=setup(t,func(ctx context.Context,req agentsdk.PatchLogs)error {
29-
logs<-req
30-
returnnil
31+
ctx:=testutil.Context(t,testutil.WaitShort)
32+
fLogger:=newFakeScriptLogger()
33+
runner:=setup(t,func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
34+
returnfLogger
3135
})
3236
deferrunner.Close()
3337
err:=runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -37,8 +41,8 @@ func TestExecuteBasic(t *testing.T) {
3741
require.NoError(t,runner.Execute(context.Background(),func(script codersdk.WorkspaceAgentScript)bool {
3842
returntrue
3943
}))
40-
log:=<-logs
41-
require.Equal(t,"hello",log.Logs[0].Output)
44+
log:=testutil.RequireRecvCtx(ctx,t,fLogger.logs)
45+
require.Equal(t,"hello",log.Output)
4246
}
4347

4448
funcTestTimeout(t*testing.T) {
@@ -62,12 +66,12 @@ func TestCronClose(t *testing.T) {
6266
require.NoError(t,runner.Close(),"close runner")
6367
}
6468

65-
funcsetup(t*testing.T,patchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error)*agentscripts.Runner {
69+
funcsetup(t*testing.T,getScriptLoggerfunc(logSourceID uuid.UUID) agentscripts.ScriptLogger)*agentscripts.Runner {
6670
t.Helper()
67-
ifpatchLogs==nil {
71+
ifgetScriptLogger==nil {
6872
// noop
69-
patchLogs=func(ctx context.Context,req agentsdk.PatchLogs)error {
70-
returnnil
73+
getScriptLogger=func(uuid uuid.UUID) agentscripts.ScriptLogger {
74+
returnnoopScriptLogger{}
7175
}
7276
}
7377
fs:=afero.NewMemMapFs()
@@ -80,10 +84,44 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
8084
_=s.Close()
8185
})
8286
returnagentscripts.New(agentscripts.Options{
83-
LogDir:t.TempDir(),
84-
Logger:logger,
85-
SSHServer:s,
86-
Filesystem:fs,
87-
PatchLogs:patchLogs,
87+
LogDir:t.TempDir(),
88+
Logger:logger,
89+
SSHServer:s,
90+
Filesystem:fs,
91+
GetScriptLogger:getScriptLogger,
8892
})
8993
}
94+
95+
typenoopScriptLoggerstruct{}
96+
97+
func (noopScriptLogger)Send(context.Context,...agentsdk.Log)error {
98+
returnnil
99+
}
100+
101+
func (noopScriptLogger)Flush(context.Context)error {
102+
returnnil
103+
}
104+
105+
typefakeScriptLoggerstruct {
106+
logschan agentsdk.Log
107+
}
108+
109+
func (f*fakeScriptLogger)Send(ctx context.Context,logs...agentsdk.Log)error {
110+
for_,log:=rangelogs {
111+
select {
112+
case<-ctx.Done():
113+
returnctx.Err()
114+
casef.logs<-log:
115+
// OK!
116+
}
117+
}
118+
returnnil
119+
}
120+
121+
func (*fakeScriptLogger)Flush(context.Context)error {
122+
returnnil
123+
}
124+
125+
funcnewFakeScriptLogger()*fakeScriptLogger {
126+
return&fakeScriptLogger{make(chan agentsdk.Log,100)}
127+
}

‎agent/agenttest/client.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ type Client struct {
8585
server*drpcserver.Server
8686
fakeAgentAPI*FakeAgentAPI
8787
LastWorkspaceAgentfunc()
88-
PatchWorkspaceLogsfunc()error
8988

9089
mu sync.Mutex// Protects following.
9190
lifecycleStates []codersdk.WorkspaceAgentLifecycle
@@ -165,17 +164,6 @@ func (c *Client) GetStartupLogs() []agentsdk.Log {
165164
returnc.logs
166165
}
167166

168-
func (c*Client)PatchLogs(ctx context.Context,logs agentsdk.PatchLogs)error {
169-
c.mu.Lock()
170-
deferc.mu.Unlock()
171-
ifc.PatchWorkspaceLogs!=nil {
172-
returnc.PatchWorkspaceLogs()
173-
}
174-
c.logs=append(c.logs,logs.Logs...)
175-
c.logger.Debug(ctx,"patch startup logs",slog.F("req",logs))
176-
returnnil
177-
}
178-
179167
func (c*Client)SetServiceBannerFunc(ffunc() (codersdk.ServiceBannerConfig,error)) {
180168
c.fakeAgentAPI.SetServiceBannerFunc(f)
181169
}
@@ -257,9 +245,9 @@ func (*FakeAgentAPI) BatchUpdateMetadata(context.Context, *agentproto.BatchUpdat
257245
panic("implement me")
258246
}
259247

260-
func (*FakeAgentAPI)BatchCreateLogs(context.Context,*agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse,error) {
261-
// TODO implement me
262-
panic("implement me")
248+
func (f*FakeAgentAPI)BatchCreateLogs(ctxcontext.Context,req*agentproto.BatchCreateLogsRequest) (*agentproto.BatchCreateLogsResponse,error) {
249+
f.logger.Info(ctx,"batch create logs called",slog.F("req",req))
250+
return&agentproto.BatchCreateLogsResponse{},nil
263251
}
264252

265253
funcNewFakeAgentAPI(t testing.TB,logger slog.Logger,manifest*agentproto.Manifest,statsChchan*agentproto.Stats)*FakeAgentAPI {

‎agent/logs.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"golang.org/x/xerrors"
1010

1111
"cdr.dev/slog"
12+
"github.com/coder/coder/v2/agent/agentscripts"
1213
"github.com/coder/coder/v2/agent/proto"
1314
"github.com/coder/coder/v2/codersdk/agentsdk"
1415
)
@@ -234,3 +235,30 @@ func (l *logSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
234235
}
235236
returnsrc,q
236237
}
238+
239+
func (l*logSender)getScriptLogger(logSourceID uuid.UUID) agentscripts.ScriptLogger {
240+
returnscriptLogger{srcID:logSourceID,sender:l}
241+
}
242+
243+
typescriptLoggerstruct {
244+
sender*logSender
245+
srcID uuid.UUID
246+
}
247+
248+
func (sscriptLogger)Send(ctx context.Context,log...agentsdk.Log)error {
249+
select {
250+
case<-ctx.Done():
251+
returnctx.Err()
252+
default:
253+
returns.sender.enqueue(s.srcID,log...)
254+
}
255+
}
256+
257+
func (sscriptLogger)Flush(ctx context.Context)error {
258+
select {
259+
case<-ctx.Done():
260+
returnctx.Err()
261+
default:
262+
returns.sender.flush(s.srcID)
263+
}
264+
}

‎codersdk/agentsdk/agentsdk.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,8 @@ type PatchLogs struct {
512512

513513
// PatchLogs writes log messages to the agent startup script.
514514
// Log messages are limited to 1MB in total.
515+
//
516+
// Deprecated: use the DRPCAgentClient.BatchCreateLogs instead
515517
func (c*Client)PatchLogs(ctx context.Context,reqPatchLogs)error {
516518
res,err:=c.SDK.Request(ctx,http.MethodPatch,"/api/v2/workspaceagents/me/logs",req)
517519
iferr!=nil {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp