Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e4eab1

Browse files
committed
feat: use agent v2 API to send agent logs
1 parentf9ca6b2 commit8e4eab1

File tree

6 files changed

+203
-63
lines changed

6 files changed

+203
-63
lines changed

‎agent/agent.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ type Client interface {
9292
ConnectRPC(ctx context.Context) (drpc.Conn,error)
9393
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
9494
PostMetadata(ctx context.Context,req agentsdk.PostMetadataRequest)error
95-
PatchLogs(ctx context.Context,req agentsdk.PatchLogs)error
9695
RewriteDERPMap(derpMap*tailcfg.DERPMap)
9796
}
9897

@@ -182,6 +181,7 @@ func New(options Options) Agent {
182181
syscaller:options.Syscaller,
183182
modifiedProcs:options.ModifiedProcesses,
184183
processManagementTick:options.ProcessManagementTick,
184+
logSender:agentsdk.NewLogSender(options.Logger),
185185

186186
prometheusRegistry:prometheusRegistry,
187187
metrics:newAgentMetrics(prometheusRegistry),
@@ -247,6 +247,7 @@ type agent struct {
247247
network*tailnet.Conn
248248
addresses []netip.Prefix
249249
statsReporter*statsReporter
250+
logSender*agentsdk.LogSender
250251

251252
connCountReconnectingPTY atomic.Int64
252253

@@ -285,7 +286,9 @@ func (a *agent) init() {
285286
Logger:a.logger,
286287
SSHServer:sshSrv,
287288
Filesystem:a.filesystem,
288-
PatchLogs:a.client.PatchLogs,
289+
GetScriptLogger:func(logSourceID uuid.UUID) agentscripts.ScriptLogger {
290+
returna.logSender.GetScriptLogger(logSourceID)
291+
},
289292
})
290293
// Register runner metrics. If the prom registry is nil, the metrics
291294
// will not report anywhere.
@@ -765,6 +768,20 @@ func (a *agent) run() (retErr error) {
765768
},
766769
)
767770

771+
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
772+
// shutdown scripts.
773+
arm.run("send logs",gracefulShutdownBehaviorRemain,
774+
func(ctx context.Context,conn drpc.Conn)error {
775+
err:=a.logSender.SendLoop(ctx,proto.NewDRPCAgentClient(conn))
776+
ifxerrors.Is(err,agentsdk.LogLimitExceededError) {
777+
// we don't want this error to tear down the API connection and propagate to the
778+
// other routines that use the API. The LogSender has already dropped a warning
779+
// log, so just return nil here.
780+
returnnil
781+
}
782+
returnerr
783+
})
784+
768785
// channels to sync goroutines below
769786
// handle manifest
770787
// |
@@ -1776,6 +1793,12 @@ lifecycleWaitLoop:
17761793
a.logger.Debug(context.Background(),"coordinator RPC disconnected")
17771794
}
17781795

1796+
// Wait for logs to be sent
1797+
err=a.logSender.WaitUntilEmpty(a.hardCtx)
1798+
iferr!=nil {
1799+
a.logger.Warn(context.Background(),"timed out waiting for all logs to be sent",slog.Error(err))
1800+
}
1801+
17791802
close(a.closed)
17801803
a.hardCancel()
17811804
ifa.network!=nil {

‎agent/agent_test.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2062,6 +2062,80 @@ func TestAgent_DebugServer(t *testing.T) {
20622062
})
20632063
}
20642064

2065+
funcTestAgent_ScriptLogging(t*testing.T) {
2066+
ifruntime.GOOS=="windows" {
2067+
t.Skip("bash scripts only")
2068+
}
2069+
t.Parallel()
2070+
ctx:=testutil.Context(t,testutil.WaitMedium)
2071+
2072+
derpMap,_:=tailnettest.RunDERPAndSTUN(t)
2073+
logsCh:=make(chan*proto.BatchCreateLogsRequest,100)
2074+
lsStart:= uuid.UUID{0x11}
2075+
lsStop:= uuid.UUID{0x22}
2076+
//nolint:dogsled
2077+
_,_,_,_,agnt:=setupAgent(
2078+
t,
2079+
agentsdk.Manifest{
2080+
DERPMap:derpMap,
2081+
Scripts: []codersdk.WorkspaceAgentScript{
2082+
{
2083+
LogSourceID:lsStart,
2084+
RunOnStart:true,
2085+
Script:`#!/bin/sh
2086+
i=0
2087+
while [ $i -ne 5 ]
2088+
do
2089+
i=$(($i+1))
2090+
echo "start $i"
2091+
done
2092+
`,
2093+
},
2094+
{
2095+
LogSourceID:lsStop,
2096+
RunOnStop:true,
2097+
Script:`#!/bin/sh
2098+
i=0
2099+
while [ $i -ne 3000 ]
2100+
do
2101+
i=$(($i+1))
2102+
echo "stop $i"
2103+
done
2104+
`,// send a lot of stop logs to make sure we don't truncate shutdown logs before closing the API conn
2105+
},
2106+
},
2107+
},
2108+
0,
2109+
func(cl*agenttest.Client,_*agent.Options) {
2110+
cl.SetLogsChannel(logsCh)
2111+
},
2112+
)
2113+
2114+
n:=1
2115+
forn<=5 {
2116+
logs:=testutil.RequireRecvCtx(ctx,t,logsCh)
2117+
require.NotNil(t,logs)
2118+
for_,l:=rangelogs.GetLogs() {
2119+
require.Equal(t,fmt.Sprintf("start %d",n),l.GetOutput())
2120+
n++
2121+
}
2122+
}
2123+
2124+
err:=agnt.Close()
2125+
require.NoError(t,err)
2126+
2127+
n=1
2128+
forn<=3000 {
2129+
logs:=testutil.RequireRecvCtx(ctx,t,logsCh)
2130+
require.NotNil(t,logs)
2131+
for_,l:=rangelogs.GetLogs() {
2132+
require.Equal(t,fmt.Sprintf("stop %d",n),l.GetOutput())
2133+
n++
2134+
}
2135+
t.Logf("got %d stop logs",n-1)
2136+
}
2137+
}
2138+
20652139
// setupAgentSSHClient creates an agent, dials it, and sets up an ssh.Client for it
20662140
funcsetupAgentSSHClient(ctx context.Context,t*testing.T)*ssh.Client {
20672141
//nolint: dogsled
@@ -2137,7 +2211,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21372211
})
21382212
statsCh:=make(chan*proto.Stats,50)
21392213
fs:=afero.NewMemMapFs()
2140-
c:=agenttest.NewClient(t,logger.Named("agent"),metadata.AgentID,metadata,statsCh,coordinator)
2214+
c:=agenttest.NewClient(t,logger.Named("agenttest"),metadata.AgentID,metadata,statsCh,coordinator)
21412215
t.Cleanup(c.Close)
21422216

21432217
options:= agent.Options{
@@ -2152,9 +2226,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21522226
opt(c,&options)
21532227
}
21542228

2155-
closer:=agent.New(options)
2229+
agnt:=agent.New(options)
21562230
t.Cleanup(func() {
2157-
_=closer.Close()
2231+
_=agnt.Close()
21582232
})
21592233
conn,err:=tailnet.NewConn(&tailnet.Options{
21602234
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(),128)},
@@ -2191,7 +2265,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21912265
if!agentConn.AwaitReachable(ctx) {
21922266
t.Fatal("agent not reachable")
21932267
}
2194-
returnagentConn,c,statsCh,fs,closer
2268+
returnagentConn,c,statsCh,fs,agnt
21952269
}
21962270

21972271
vardialTestPayload= []byte("dean-was-here123")

‎agent/agentscripts/agentscripts.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/robfig/cron/v3"
1819
"github.com/spf13/afero"
@@ -41,14 +42,19 @@ var (
4142
parser=cron.NewParser(cron.Second|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.DowOptional)
4243
)
4344

45+
typeScriptLoggerinterface {
46+
Send(ctx context.Context,log...agentsdk.Log)error
47+
Flush(context.Context)error
48+
}
49+
4450
// Options are a set of options for the runner.
4551
typeOptionsstruct {
46-
DataDirBasestring
47-
LogDirstring
48-
Logger slog.Logger
49-
SSHServer*agentssh.Server
50-
Filesystem afero.Fs
51-
PatchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error
52+
DataDirBasestring
53+
LogDirstring
54+
Loggerslog.Logger
55+
SSHServer*agentssh.Server
56+
Filesystemafero.Fs
57+
GetScriptLoggerfunc(logSourceID uuid.UUID)ScriptLogger
5258
}
5359

5460
// New creates a runner for the provided scripts.
@@ -275,20 +281,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
275281
cmd.Env=append(cmd.Env,"CODER_SCRIPT_DATA_DIR="+scriptDataDir)
276282
cmd.Env=append(cmd.Env,"CODER_SCRIPT_BIN_DIR="+r.ScriptBinDir())
277283

278-
send,flushAndClose:=agentsdk.LogsSender(script.LogSourceID,r.PatchLogs,logger)
284+
scriptLogger:=r.GetScriptLogger(script.LogSourceID)
279285
// If ctx is canceled here (or in a writer below), we may be
280286
// discarding logs, but that's okay because we're shutting down
281287
// anyway. We could consider creating a new context here if we
282288
// want better control over flush during shutdown.
283289
deferfunc() {
284-
iferr:=flushAndClose(ctx);err!=nil {
290+
iferr:=scriptLogger.Flush(ctx);err!=nil {
285291
logger.Warn(ctx,"flush startup logs failed",slog.Error(err))
286292
}
287293
}()
288294

289-
infoW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelInfo)
295+
infoW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelInfo)
290296
deferinfoW.Close()
291-
errW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelError)
297+
errW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelError)
292298
defererrW.Close()
293299
cmd.Stdout=io.MultiWriter(fileWriter,infoW)
294300
cmd.Stderr=io.MultiWriter(fileWriter,errW)

‎agent/agentscripts/agentscripts_test.go

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,10 @@ func TestMain(m *testing.M) {
2828

2929
funcTestExecuteBasic(t*testing.T) {
3030
t.Parallel()
31-
logs:=make(chan agentsdk.PatchLogs,1)
32-
runner:=setup(t,func(ctx context.Context,req agentsdk.PatchLogs)error {
33-
select {
34-
case<-ctx.Done():
35-
caselogs<-req:
36-
}
37-
returnnil
31+
ctx:=testutil.Context(t,testutil.WaitShort)
32+
fLogger:=newFakeScriptLogger()
33+
runner:=setup(t,func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
34+
returnfLogger
3835
})
3936
deferrunner.Close()
4037
err:=runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -45,19 +42,15 @@ func TestExecuteBasic(t *testing.T) {
4542
require.NoError(t,runner.Execute(context.Background(),func(script codersdk.WorkspaceAgentScript)bool {
4643
returntrue
4744
}))
48-
log:=<-logs
49-
require.Equal(t,"hello",log.Logs[0].Output)
45+
log:=testutil.RequireRecvCtx(ctx,t,fLogger.logs)
46+
require.Equal(t,"hello",log.Output)
5047
}
5148

5249
funcTestEnv(t*testing.T) {
5350
t.Parallel()
54-
logs:=make(chan agentsdk.PatchLogs,2)
55-
runner:=setup(t,func(ctx context.Context,req agentsdk.PatchLogs)error {
56-
select {
57-
case<-ctx.Done():
58-
caselogs<-req:
59-
}
60-
returnnil
51+
fLogger:=newFakeScriptLogger()
52+
runner:=setup(t,func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
53+
returnfLogger
6154
})
6255
deferrunner.Close()
6356
id:=uuid.New()
@@ -88,11 +81,9 @@ func TestEnv(t *testing.T) {
8881
select {
8982
case<-ctx.Done():
9083
require.Fail(t,"timed out waiting for logs")
91-
casel:=<-logs:
92-
for_,l:=rangel.Logs {
93-
t.Logf("log: %s",l.Output)
94-
}
95-
log=append(log,l.Logs...)
84+
casel:=<-fLogger.logs:
85+
t.Logf("log: %s",l.Output)
86+
log=append(log,l)
9687
}
9788
iflen(log)>=2 {
9889
break
@@ -124,12 +115,12 @@ func TestCronClose(t *testing.T) {
124115
require.NoError(t,runner.Close(),"close runner")
125116
}
126117

127-
funcsetup(t*testing.T,patchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error)*agentscripts.Runner {
118+
funcsetup(t*testing.T,getScriptLoggerfunc(logSourceID uuid.UUID) agentscripts.ScriptLogger)*agentscripts.Runner {
128119
t.Helper()
129-
ifpatchLogs==nil {
120+
ifgetScriptLogger==nil {
130121
// noop
131-
patchLogs=func(ctx context.Context,req agentsdk.PatchLogs)error {
132-
returnnil
122+
getScriptLogger=func(uuid uuid.UUID) agentscripts.ScriptLogger {
123+
returnnoopScriptLogger{}
133124
}
134125
}
135126
fs:=afero.NewMemMapFs()
@@ -140,11 +131,45 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
140131
_=s.Close()
141132
})
142133
returnagentscripts.New(agentscripts.Options{
143-
LogDir:t.TempDir(),
144-
DataDirBase:t.TempDir(),
145-
Logger:logger,
146-
SSHServer:s,
147-
Filesystem:fs,
148-
PatchLogs:patchLogs,
134+
LogDir:t.TempDir(),
135+
DataDirBase:t.TempDir(),
136+
Logger:logger,
137+
SSHServer:s,
138+
Filesystem:fs,
139+
GetScriptLogger:getScriptLogger,
149140
})
150141
}
142+
143+
typenoopScriptLoggerstruct{}
144+
145+
func (noopScriptLogger)Send(context.Context,...agentsdk.Log)error {
146+
returnnil
147+
}
148+
149+
func (noopScriptLogger)Flush(context.Context)error {
150+
returnnil
151+
}
152+
153+
typefakeScriptLoggerstruct {
154+
logschan agentsdk.Log
155+
}
156+
157+
func (f*fakeScriptLogger)Send(ctx context.Context,logs...agentsdk.Log)error {
158+
for_,log:=rangelogs {
159+
select {
160+
case<-ctx.Done():
161+
returnctx.Err()
162+
casef.logs<-log:
163+
// OK!
164+
}
165+
}
166+
returnnil
167+
}
168+
169+
func (*fakeScriptLogger)Flush(context.Context)error {
170+
returnnil
171+
}
172+
173+
funcnewFakeScriptLogger()*fakeScriptLogger {
174+
return&fakeScriptLogger{make(chan agentsdk.Log,100)}
175+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp