Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit249076e

Browse files
committed
feat: use agent v2 API to send agent logs
1 parent2f05031 commit249076e

File tree

6 files changed

+197
-46
lines changed

6 files changed

+197
-46
lines changed

‎agent/agent.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ type Client interface {
9191
ConnectRPC(ctx context.Context) (drpc.Conn,error)
9292
PostLifecycle(ctx context.Context,state agentsdk.PostLifecycleRequest)error
9393
PostMetadata(ctx context.Context,req agentsdk.PostMetadataRequest)error
94-
PatchLogs(ctx context.Context,req agentsdk.PatchLogs)error
9594
RewriteDERPMap(derpMap*tailcfg.DERPMap)
9695
}
9796

@@ -170,6 +169,7 @@ func New(options Options) Agent {
170169
syscaller:options.Syscaller,
171170
modifiedProcs:options.ModifiedProcesses,
172171
processManagementTick:options.ProcessManagementTick,
172+
logSender:agentsdk.NewLogSender(options.Logger),
173173

174174
prometheusRegistry:prometheusRegistry,
175175
metrics:newAgentMetrics(prometheusRegistry),
@@ -234,6 +234,7 @@ type agent struct {
234234
network*tailnet.Conn
235235
addresses []netip.Prefix
236236
statsReporter*statsReporter
237+
logSender*agentsdk.LogSender
237238

238239
connCountReconnectingPTY atomic.Int64
239240

@@ -271,7 +272,9 @@ func (a *agent) init() {
271272
Logger:a.logger,
272273
SSHServer:sshSrv,
273274
Filesystem:a.filesystem,
274-
PatchLogs:a.client.PatchLogs,
275+
GetScriptLogger:func(logSourceID uuid.UUID) agentscripts.ScriptLogger {
276+
returna.logSender.GetScriptLogger(logSourceID)
277+
},
275278
})
276279
// Register runner metrics. If the prom registry is nil, the metrics
277280
// will not report anywhere.
@@ -751,6 +754,20 @@ func (a *agent) run() (retErr error) {
751754
},
752755
)
753756

757+
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by
758+
// shutdown scripts.
759+
arm.run("send logs",gracefulShutdownBehaviorRemain,
760+
func(ctx context.Context,conn drpc.Conn)error {
761+
err:=a.logSender.SendLoop(ctx,proto.NewDRPCAgentClient(conn))
762+
ifxerrors.Is(err,agentsdk.LogLimitExceededError) {
763+
// we don't want this error to tear down the API connection and propagate to the
764+
// other routines that use the API. The LogSender has already dropped a warning
765+
// log, so just return nil here.
766+
returnnil
767+
}
768+
returnerr
769+
})
770+
754771
// channels to sync goroutines below
755772
// handle manifest
756773
// |
@@ -1747,6 +1764,12 @@ lifecycleWaitLoop:
17471764
a.logger.Debug(context.Background(),"coordinator RPC disconnected")
17481765
}
17491766

1767+
// Wait for logs to be sent
1768+
err=a.logSender.WaitUntilEmpty(a.hardCtx)
1769+
iferr!=nil {
1770+
a.logger.Warn(context.Background(),"timed out waiting for all logs to be sent",slog.Error(err))
1771+
}
1772+
17501773
close(a.closed)
17511774
a.hardCancel()
17521775
_=a.sshServer.Close()

‎agent/agent_test.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,6 +2054,80 @@ func TestAgent_DebugServer(t *testing.T) {
20542054
})
20552055
}
20562056

2057+
funcTestAgent_ScriptLogging(t*testing.T) {
2058+
ifruntime.GOOS=="windows" {
2059+
t.Skip("bash scripts only")
2060+
}
2061+
t.Parallel()
2062+
ctx:=testutil.Context(t,testutil.WaitMedium)
2063+
2064+
derpMap,_:=tailnettest.RunDERPAndSTUN(t)
2065+
logsCh:=make(chan*proto.BatchCreateLogsRequest,100)
2066+
lsStart:= uuid.UUID{0x11}
2067+
lsStop:= uuid.UUID{0x22}
2068+
//nolint:dogsled
2069+
_,_,_,_,agnt:=setupAgent(
2070+
t,
2071+
agentsdk.Manifest{
2072+
DERPMap:derpMap,
2073+
Scripts: []codersdk.WorkspaceAgentScript{
2074+
{
2075+
LogSourceID:lsStart,
2076+
RunOnStart:true,
2077+
Script:`#!/bin/sh
2078+
i=0
2079+
while [ $i -ne 5 ]
2080+
do
2081+
i=$(($i+1))
2082+
echo "start $i"
2083+
done
2084+
`,
2085+
},
2086+
{
2087+
LogSourceID:lsStop,
2088+
RunOnStop:true,
2089+
Script:`#!/bin/sh
2090+
i=0
2091+
while [ $i -ne 3000 ]
2092+
do
2093+
i=$(($i+1))
2094+
echo "stop $i"
2095+
done
2096+
`,// send a lot of stop logs to make sure we don't truncate shutdown logs before closing the API conn
2097+
},
2098+
},
2099+
},
2100+
0,
2101+
func(cl*agenttest.Client,_*agent.Options) {
2102+
cl.SetLogsChannel(logsCh)
2103+
},
2104+
)
2105+
2106+
n:=1
2107+
forn<=5 {
2108+
logs:=testutil.RequireRecvCtx(ctx,t,logsCh)
2109+
require.NotNil(t,logs)
2110+
for_,l:=rangelogs.GetLogs() {
2111+
require.Equal(t,fmt.Sprintf("start %d",n),l.GetOutput())
2112+
n++
2113+
}
2114+
}
2115+
2116+
err:=agnt.Close()
2117+
require.NoError(t,err)
2118+
2119+
n=1
2120+
forn<=3000 {
2121+
logs:=testutil.RequireRecvCtx(ctx,t,logsCh)
2122+
require.NotNil(t,logs)
2123+
for_,l:=rangelogs.GetLogs() {
2124+
require.Equal(t,fmt.Sprintf("stop %d",n),l.GetOutput())
2125+
n++
2126+
}
2127+
t.Logf("got %d stop logs",n-1)
2128+
}
2129+
}
2130+
20572131
// setupAgentSSHClient creates an agent, dials it, and sets up an ssh.Client for it
20582132
funcsetupAgentSSHClient(ctx context.Context,t*testing.T)*ssh.Client {
20592133
//nolint: dogsled
@@ -2129,7 +2203,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21292203
})
21302204
statsCh:=make(chan*proto.Stats,50)
21312205
fs:=afero.NewMemMapFs()
2132-
c:=agenttest.NewClient(t,logger.Named("agent"),metadata.AgentID,metadata,statsCh,coordinator)
2206+
c:=agenttest.NewClient(t,logger.Named("agenttest"),metadata.AgentID,metadata,statsCh,coordinator)
21332207
t.Cleanup(c.Close)
21342208

21352209
options:= agent.Options{
@@ -2144,9 +2218,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21442218
opt(c,&options)
21452219
}
21462220

2147-
closer:=agent.New(options)
2221+
agnt:=agent.New(options)
21482222
t.Cleanup(func() {
2149-
_=closer.Close()
2223+
_=agnt.Close()
21502224
})
21512225
conn,err:=tailnet.NewConn(&tailnet.Options{
21522226
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(),128)},
@@ -2183,7 +2257,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati
21832257
if!agentConn.AwaitReachable(ctx) {
21842258
t.Fatal("agent not reachable")
21852259
}
2186-
returnagentConn,c,statsCh,fs,closer
2260+
returnagentConn,c,statsCh,fs,agnt
21872261
}
21882262

21892263
vardialTestPayload= []byte("dean-was-here123")

‎agent/agentscripts/agentscripts.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/google/uuid"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/robfig/cron/v3"
1819
"github.com/spf13/afero"
@@ -41,13 +42,18 @@ var (
4142
parser=cron.NewParser(cron.Second|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.DowOptional)
4243
)
4344

45+
typeScriptLoggerinterface {
46+
Send(ctx context.Context,log...agentsdk.Log)error
47+
Flush(context.Context)error
48+
}
49+
4450
// Options are a set of options for the runner.
4551
typeOptionsstruct {
46-
LogDirstring
47-
Logger slog.Logger
48-
SSHServer*agentssh.Server
49-
Filesystem afero.Fs
50-
PatchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error
52+
LogDirstring
53+
Loggerslog.Logger
54+
SSHServer*agentssh.Server
55+
Filesystemafero.Fs
56+
GetScriptLoggerfunc(logSourceID uuid.UUID)ScriptLogger
5157
}
5258

5359
// New creates a runner for the provided scripts.
@@ -238,20 +244,20 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript)
238244
cmd.WaitDelay=10*time.Second
239245
cmd.Cancel=cmdCancel(cmd)
240246

241-
send,flushAndClose:=agentsdk.LogsSender(script.LogSourceID,r.PatchLogs,logger)
247+
scriptLogger:=r.GetScriptLogger(script.LogSourceID)
242248
// If ctx is canceled here (or in a writer below), we may be
243249
// discarding logs, but that's okay because we're shutting down
244250
// anyway. We could consider creating a new context here if we
245251
// want better control over flush during shutdown.
246252
deferfunc() {
247-
iferr:=flushAndClose(ctx);err!=nil {
253+
iferr:=scriptLogger.Flush(ctx);err!=nil {
248254
logger.Warn(ctx,"flush startup logs failed",slog.Error(err))
249255
}
250256
}()
251257

252-
infoW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelInfo)
258+
infoW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelInfo)
253259
deferinfoW.Close()
254-
errW:=agentsdk.LogsWriter(ctx,send,script.LogSourceID,codersdk.LogLevelError)
260+
errW:=agentsdk.LogsWriter(ctx,scriptLogger.Send,script.LogSourceID,codersdk.LogLevelError)
255261
defererrW.Close()
256262
cmd.Stdout=io.MultiWriter(fileWriter,infoW)
257263
cmd.Stderr=io.MultiWriter(fileWriter,errW)

‎agent/agentscripts/agentscripts_test.go

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/google/uuid"
89
"github.com/prometheus/client_golang/prometheus"
910
"github.com/spf13/afero"
1011
"github.com/stretchr/testify/require"
@@ -15,6 +16,7 @@ import (
1516
"github.com/coder/coder/v2/agent/agentssh"
1617
"github.com/coder/coder/v2/codersdk"
1718
"github.com/coder/coder/v2/codersdk/agentsdk"
19+
"github.com/coder/coder/v2/testutil"
1820
)
1921

2022
funcTestMain(m*testing.M) {
@@ -23,10 +25,10 @@ func TestMain(m *testing.M) {
2325

2426
funcTestExecuteBasic(t*testing.T) {
2527
t.Parallel()
26-
logs:=make(chan agentsdk.PatchLogs,1)
27-
runner:=setup(t,func(ctx context.Context,req agentsdk.PatchLogs)error {
28-
logs<-req
29-
returnnil
28+
ctx:=testutil.Context(t,testutil.WaitShort)
29+
fLogger:=newFakeScriptLogger()
30+
runner:=setup(t,func(uuid2 uuid.UUID) agentscripts.ScriptLogger {
31+
returnfLogger
3032
})
3133
deferrunner.Close()
3234
err:=runner.Init([]codersdk.WorkspaceAgentScript{{
@@ -36,8 +38,8 @@ func TestExecuteBasic(t *testing.T) {
3638
require.NoError(t,runner.Execute(context.Background(),func(script codersdk.WorkspaceAgentScript)bool {
3739
returntrue
3840
}))
39-
log:=<-logs
40-
require.Equal(t,"hello",log.Logs[0].Output)
41+
log:=testutil.RequireRecvCtx(ctx,t,fLogger.logs)
42+
require.Equal(t,"hello",log.Output)
4143
}
4244

4345
funcTestTimeout(t*testing.T) {
@@ -61,12 +63,12 @@ func TestCronClose(t *testing.T) {
6163
require.NoError(t,runner.Close(),"close runner")
6264
}
6365

64-
funcsetup(t*testing.T,patchLogsfunc(ctx context.Context,req agentsdk.PatchLogs)error)*agentscripts.Runner {
66+
funcsetup(t*testing.T,getScriptLoggerfunc(logSourceID uuid.UUID) agentscripts.ScriptLogger)*agentscripts.Runner {
6567
t.Helper()
66-
ifpatchLogs==nil {
68+
ifgetScriptLogger==nil {
6769
// noop
68-
patchLogs=func(ctx context.Context,req agentsdk.PatchLogs)error {
69-
returnnil
70+
getScriptLogger=func(uuid uuid.UUID) agentscripts.ScriptLogger {
71+
returnnoopScriptLogger{}
7072
}
7173
}
7274
fs:=afero.NewMemMapFs()
@@ -77,10 +79,44 @@ func setup(t *testing.T, patchLogs func(ctx context.Context, req agentsdk.PatchL
7779
_=s.Close()
7880
})
7981
returnagentscripts.New(agentscripts.Options{
80-
LogDir:t.TempDir(),
81-
Logger:logger,
82-
SSHServer:s,
83-
Filesystem:fs,
84-
PatchLogs:patchLogs,
82+
LogDir:t.TempDir(),
83+
Logger:logger,
84+
SSHServer:s,
85+
Filesystem:fs,
86+
GetScriptLogger:getScriptLogger,
8587
})
8688
}
89+
90+
typenoopScriptLoggerstruct{}
91+
92+
func (noopScriptLogger)Send(context.Context,...agentsdk.Log)error {
93+
returnnil
94+
}
95+
96+
func (noopScriptLogger)Flush(context.Context)error {
97+
returnnil
98+
}
99+
100+
typefakeScriptLoggerstruct {
101+
logschan agentsdk.Log
102+
}
103+
104+
func (f*fakeScriptLogger)Send(ctx context.Context,logs...agentsdk.Log)error {
105+
for_,log:=rangelogs {
106+
select {
107+
case<-ctx.Done():
108+
returnctx.Err()
109+
casef.logs<-log:
110+
// OK!
111+
}
112+
}
113+
returnnil
114+
}
115+
116+
func (*fakeScriptLogger)Flush(context.Context)error {
117+
returnnil
118+
}
119+
120+
funcnewFakeScriptLogger()*fakeScriptLogger {
121+
return&fakeScriptLogger{make(chan agentsdk.Log,100)}
122+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp