- Notifications
You must be signed in to change notification settings - Fork928
feat: switch agent to use v2 API for sending logs#12068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -92,7 +92,6 @@ type Client interface { | ||
ConnectRPC(ctx context.Context) (drpc.Conn, error) | ||
PostLifecycle(ctx context.Context, state agentsdk.PostLifecycleRequest) error | ||
PostMetadata(ctx context.Context, req agentsdk.PostMetadataRequest) error | ||
RewriteDERPMap(derpMap *tailcfg.DERPMap) | ||
} | ||
@@ -181,6 +180,7 @@ func New(options Options) Agent { | ||
syscaller: options.Syscaller, | ||
modifiedProcs: options.ModifiedProcesses, | ||
processManagementTick: options.ProcessManagementTick, | ||
logSender: agentsdk.NewLogSender(options.Logger), | ||
prometheusRegistry: prometheusRegistry, | ||
metrics: newAgentMetrics(prometheusRegistry), | ||
@@ -245,6 +245,7 @@ type agent struct { | ||
network *tailnet.Conn | ||
addresses []netip.Prefix | ||
statsReporter *statsReporter | ||
logSender *agentsdk.LogSender | ||
connCountReconnectingPTY atomic.Int64 | ||
@@ -283,7 +284,9 @@ func (a *agent) init() { | ||
Logger: a.logger, | ||
SSHServer: sshSrv, | ||
Filesystem: a.filesystem, | ||
GetScriptLogger: func(logSourceID uuid.UUID) agentscripts.ScriptLogger { | ||
return a.logSender.GetScriptLogger(logSourceID) | ||
}, | ||
}) | ||
// Register runner metrics. If the prom registry is nil, the metrics | ||
// will not report anywhere. | ||
@@ -763,6 +766,20 @@ func (a *agent) run() (retErr error) { | ||
}, | ||
) | ||
// sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by | ||
// shutdown scripts. | ||
connMan.start("send logs", gracefulShutdownBehaviorRemain, | ||
func(ctx context.Context, conn drpc.Conn) error { | ||
err := a.logSender.SendLoop(ctx, proto.NewDRPCAgentClient(conn)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Is it good practice to use multiple clients over the same conn, or should we define the client in the parent scope? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. The client contains no state other than the conn itself. It just maps RPC methods to So, I think it's totally reasonable to make a new one per routine. That allows me to keep the | ||
if xerrors.Is(err, agentsdk.LogLimitExceededError) { | ||
// we don't want this error to tear down the API connection and propagate to the | ||
// other routines that use the API. The LogSender has already dropped a warning | ||
// log, so just return nil here. | ||
return nil | ||
} | ||
return err | ||
}) | ||
// channels to sync goroutines below | ||
// handle manifest | ||
// | | ||
@@ -1769,6 +1786,12 @@ lifecycleWaitLoop: | ||
a.logger.Debug(context.Background(), "coordinator RPC disconnected") | ||
} | ||
// Wait for logs to be sent | ||
err = a.logSender.WaitUntilEmpty(a.hardCtx) | ||
if err != nil { | ||
a.logger.Warn(context.Background(), "timed out waiting for all logs to be sent", slog.Error(err)) | ||
} | ||
a.hardCancel() | ||
if a.network != nil { | ||
_ = a.network.Close() | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -2062,6 +2062,80 @@ func TestAgent_DebugServer(t *testing.T) { | ||
}) | ||
} | ||
func TestAgent_ScriptLogging(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. ❤️ | ||
if runtime.GOOS == "windows" { | ||
t.Skip("bash scripts only") | ||
} | ||
t.Parallel() | ||
ctx := testutil.Context(t, testutil.WaitMedium) | ||
derpMap, _ := tailnettest.RunDERPAndSTUN(t) | ||
logsCh := make(chan *proto.BatchCreateLogsRequest, 100) | ||
lsStart := uuid.UUID{0x11} | ||
lsStop := uuid.UUID{0x22} | ||
//nolint:dogsled | ||
_, _, _, _, agnt := setupAgent( | ||
t, | ||
agentsdk.Manifest{ | ||
DERPMap: derpMap, | ||
Scripts: []codersdk.WorkspaceAgentScript{ | ||
{ | ||
LogSourceID: lsStart, | ||
RunOnStart: true, | ||
Script: `#!/bin/sh | ||
i=0 | ||
while [ $i -ne 5 ] | ||
do | ||
i=$(($i+1)) | ||
echo "start $i" | ||
done | ||
`, | ||
}, | ||
{ | ||
LogSourceID: lsStop, | ||
RunOnStop: true, | ||
Script: `#!/bin/sh | ||
i=0 | ||
while [ $i -ne 3000 ] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. I wonder if 3000 is flake-risky, considering we're using WaitMedium? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Running on its own, the script part completes in ~100ms, including queueing all the logs, sending them, and asserting them in the test. If it flakes, something else is very wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Sure 👍🏻. I was mostly just worried about Windows, it can be unfathomably slow 😄 | ||
do | ||
i=$(($i+1)) | ||
echo "stop $i" | ||
done | ||
`, // send a lot of stop logs to make sure we don't truncate shutdown logs before closing the API conn | ||
}, | ||
}, | ||
}, | ||
0, | ||
func(cl *agenttest.Client, _ *agent.Options) { | ||
cl.SetLogsChannel(logsCh) | ||
}, | ||
) | ||
n := 1 | ||
for n <= 5 { | ||
logs := testutil.RequireRecvCtx(ctx, t, logsCh) | ||
require.NotNil(t, logs) | ||
for _, l := range logs.GetLogs() { | ||
require.Equal(t, fmt.Sprintf("start %d", n), l.GetOutput()) | ||
n++ | ||
} | ||
} | ||
err := agnt.Close() | ||
require.NoError(t, err) | ||
n = 1 | ||
for n <= 3000 { | ||
logs := testutil.RequireRecvCtx(ctx, t, logsCh) | ||
require.NotNil(t, logs) | ||
for _, l := range logs.GetLogs() { | ||
require.Equal(t, fmt.Sprintf("stop %d", n), l.GetOutput()) | ||
n++ | ||
} | ||
t.Logf("got %d stop logs", n-1) | ||
} | ||
} | ||
// setupAgentSSHClient creates an agent, dials it, and sets up an ssh.Client for it | ||
func setupAgentSSHClient(ctx context.Context, t *testing.T) *ssh.Client { | ||
//nolint: dogsled | ||
@@ -2137,7 +2211,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati | ||
}) | ||
statsCh := make(chan *proto.Stats, 50) | ||
fs := afero.NewMemMapFs() | ||
c := agenttest.NewClient(t, logger.Named("agenttest"), metadata.AgentID, metadata, statsCh, coordinator) | ||
t.Cleanup(c.Close) | ||
options := agent.Options{ | ||
@@ -2152,9 +2226,9 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati | ||
opt(c, &options) | ||
} | ||
agnt := agent.New(options) | ||
t.Cleanup(func() { | ||
_ =agnt.Close() | ||
}) | ||
conn, err := tailnet.NewConn(&tailnet.Options{ | ||
Addresses: []netip.Prefix{netip.PrefixFrom(tailnet.IP(), 128)}, | ||
@@ -2191,7 +2265,7 @@ func setupAgent(t *testing.T, metadata agentsdk.Manifest, ptyTimeout time.Durati | ||
if !agentConn.AwaitReachable(ctx) { | ||
t.Fatal("agent not reachable") | ||
} | ||
return agentConn, c, statsCh, fs,agnt | ||
} | ||
var dialTestPayload = []byte("dean-was-here123") | ||
Uh oh!
There was an error while loading.Please reload this page.