Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6f9f887

Browse files
authored
feat: add support for logging events from ReplicaSets (#4)
* feat: add support for logging events from ReplicaSetsFixes#3* Fix replicaset tests event
1 parente67408b commit6f9f887

File tree

4 files changed

+195
-14
lines changed

4 files changed

+195
-14
lines changed

‎.github/workflows/ci.yaml‎

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ name: ci
33
on:
44
push:
55

6-
pull_request:
7-
86
workflow_dispatch:
97

108
permissions:

‎logger.go‎

Lines changed: 100 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/coder/coder/codersdk"
1414
"github.com/coder/coder/codersdk/agentsdk"
1515
"github.com/fatih/color"
16+
appsv1"k8s.io/api/apps/v1"
1617
corev1"k8s.io/api/core/v1"
1718
v1"k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/client-go/informers"
@@ -52,8 +53,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5253
errChan:make(chanerror,16),
5354
ctx:ctx,
5455
cancelFunc:cancelFunc,
55-
agentTokenToLogger:map[string]agentLogger{},
56+
agentTokenToLogger:map[string]*agentLogger{},
5657
podToAgentTokens:map[string][]string{},
58+
replicaSetToTokens:map[string][]string{},
5759
}
5860
returnreporter,reporter.init()
5961
}
@@ -67,8 +69,9 @@ type podEventLogger struct {
6769
ctx context.Context
6870
cancelFunc context.CancelFunc
6971
mutex sync.RWMutex
70-
agentTokenToLoggermap[string]agentLogger
72+
agentTokenToLoggermap[string]*agentLogger
7173
podToAgentTokensmap[string][]string
74+
replicaSetToTokensmap[string][]string
7275
}
7376

7477
// init starts the informer factory and registers event handlers.
@@ -91,6 +94,7 @@ func (p *podEventLogger) init() error {
9194
// When a Pod is created, it's added to the map of Pods we're
9295
// interested in. When a Pod is deleted, it's removed from the map.
9396
podInformer:=podFactory.Core().V1().Pods().Informer()
97+
replicaInformer:=podFactory.Apps().V1().ReplicaSets().Informer()
9498
eventInformer:=eventFactory.Core().V1().Events().Informer()
9599

96100
_,err:=podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -130,7 +134,7 @@ func (p *podEventLogger) init() error {
130134
}
131135
}
132136
ifregistered {
133-
p.logger.Info(p.ctx,"registered agent pod",slog.F("pod",pod.Name))
137+
p.logger.Info(p.ctx,"registered agent pod",slog.F("name",pod.Name),slog.F("namespace",pod.Namespace))
134138
}
135139
},
136140
DeleteFunc:func(objinterface{}) {
@@ -153,13 +157,92 @@ func (p *podEventLogger) init() error {
153157
Level:codersdk.LogLevelError,
154158
})
155159
}
156-
p.logger.Info(p.ctx,"unregistered agent pod",slog.F("pod",pod.Name))
160+
p.logger.Info(p.ctx,"unregistered agent pod",slog.F("name",pod.Name))
157161
},
158162
})
159163
iferr!=nil {
160164
returnfmt.Errorf("register pod handler: %w",err)
161165
}
162166

167+
_,err=replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
168+
AddFunc:func(objinterface{}) {
169+
replica,ok:=obj.(*appsv1.ReplicaSet)
170+
if!ok {
171+
p.errChan<-fmt.Errorf("unexpected replica object type: %T",obj)
172+
return
173+
}
174+
175+
// We don't want to add logs to workspaces that are already started!
176+
if!replica.CreationTimestamp.After(startTime) {
177+
return
178+
}
179+
180+
p.mutex.Lock()
181+
deferp.mutex.Unlock()
182+
183+
varregisteredbool
184+
for_,container:=rangereplica.Spec.Template.Spec.Containers {
185+
for_,env:=rangecontainer.Env {
186+
ifenv.Name!="CODER_AGENT_TOKEN" {
187+
continue
188+
}
189+
registered=true
190+
tokens,ok:=p.replicaSetToTokens[replica.Name]
191+
if!ok {
192+
tokens=make([]string,0)
193+
}
194+
tokens=append(tokens,env.Value)
195+
p.replicaSetToTokens[replica.Name]=tokens
196+
197+
p.sendLog(replica.Name,env.Value, agentsdk.StartupLog{
198+
CreatedAt:time.Now(),
199+
Output:fmt.Sprintf("🐳 %s: %s",newColor(color.Bold).Sprint("Queued pod from ReplicaSet"),replica.Name),
200+
Level:codersdk.LogLevelInfo,
201+
})
202+
}
203+
}
204+
ifregistered {
205+
p.logger.Info(p.ctx,"registered agent pod from ReplicaSet",slog.F("name",replica.Name))
206+
}
207+
},
208+
DeleteFunc:func(objinterface{}) {
209+
replicaSet,ok:=obj.(*appsv1.ReplicaSet)
210+
if!ok {
211+
p.errChan<-fmt.Errorf("unexpected replica set delete object type: %T",obj)
212+
return
213+
}
214+
p.mutex.Lock()
215+
deferp.mutex.Unlock()
216+
_,ok=p.replicaSetToTokens[replicaSet.Name]
217+
if!ok {
218+
return
219+
}
220+
delete(p.replicaSetToTokens,replicaSet.Name)
221+
for_,pod:=rangereplicaSet.Spec.Template.Spec.Containers {
222+
name:=pod.Name
223+
ifname=="" {
224+
name=replicaSet.Spec.Template.Name
225+
}
226+
tokens,ok:=p.podToAgentTokens[name]
227+
if!ok {
228+
continue
229+
}
230+
delete(p.podToAgentTokens,name)
231+
for_,token:=rangetokens {
232+
p.sendLog(pod.Name,token, agentsdk.StartupLog{
233+
CreatedAt:time.Now(),
234+
Output:fmt.Sprintf("🗑️ %s: %s",newColor(color.Bold).Sprint("Deleted ReplicaSet"),replicaSet.Name),
235+
Level:codersdk.LogLevelError,
236+
})
237+
}
238+
}
239+
p.logger.Info(p.ctx,"unregistered ReplicaSet",slog.F("name",replicaSet.Name))
240+
},
241+
})
242+
iferr!=nil {
243+
returnfmt.Errorf("register replicaset handler: %w",err)
244+
}
245+
163246
_,err=eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
164247
AddFunc:func(objinterface{}) {
165248
event,ok:=obj.(*corev1.Event)
@@ -175,8 +258,14 @@ func (p *podEventLogger) init() error {
175258

176259
p.mutex.Lock()
177260
deferp.mutex.Unlock()
178-
tokens,ok:=p.podToAgentTokens[event.InvolvedObject.Name]
179-
if!ok {
261+
vartokens []string
262+
switchevent.InvolvedObject.Kind {
263+
case"Pod":
264+
tokens,ok=p.podToAgentTokens[event.InvolvedObject.Name]
265+
case"ReplicaSet":
266+
tokens,ok=p.replicaSetToTokens[event.InvolvedObject.Name]
267+
}
268+
iftokens==nil||!ok {
180269
return
181270
}
182271

@@ -210,23 +299,23 @@ func (p *podEventLogger) init() error {
210299
// loggerForToken returns a logger for the given pod name and agent token.
211300
// If a logger already exists for the token, it's returned. Otherwise a new
212301
// logger is created and returned.
213-
func (p*podEventLogger)sendLog(podName,tokenstring,log agentsdk.StartupLog) {
302+
func (p*podEventLogger)sendLog(resourceName,tokenstring,log agentsdk.StartupLog) {
214303
logger,ok:=p.agentTokenToLogger[token]
215304
if!ok {
216305
client:=agentsdk.New(p.coderURL)
217306
client.SetSessionToken(token)
218-
client.SDK.Logger=p.logger.Named(podName)
307+
client.SDK.Logger=p.logger.Named(resourceName)
219308
sendLog,closer:=client.QueueStartupLogs(p.ctx,p.logDebounce)
220309

221-
logger=agentLogger{
310+
logger=&agentLogger{
222311
sendLog:sendLog,
223312
closer:closer,
224313
closeTimer:time.AfterFunc(p.logDebounce*5,func() {
225314
logger.closed.Store(true)
226315
// We want to have two close cycles for loggers!
227316
err:=closer.Close()
228317
iferr!=nil {
229-
p.logger.Error(p.ctx,"close agent logger",slog.Error(err),slog.F("pod",podName))
318+
p.logger.Error(p.ctx,"close agent logger",slog.Error(err),slog.F("pod",resourceName))
230319
}
231320
p.mutex.Lock()
232321
delete(p.agentTokenToLogger,token)
@@ -239,7 +328,7 @@ func (p *podEventLogger) sendLog(podName, token string, log agentsdk.StartupLog)
239328
// If the logger was already closed, we await the close before
240329
// creating a new logger. This is to ensure all loggers get sent in order!
241330
_=logger.closer.Close()
242-
p.sendLog(podName,token,log)
331+
p.sendLog(resourceName,token,log)
243332
return
244333
}
245334
// We make this 5x the debounce because it's low-cost to persist a few

‎logger_test.go‎

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,103 @@ import (
1313
"github.com/coder/coder/codersdk/agentsdk"
1414
"github.com/stretchr/testify/require"
1515
"github.com/zeebo/assert"
16+
appsv1"k8s.io/api/apps/v1"
1617
corev1"k8s.io/api/core/v1"
1718
v1"k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/client-go/kubernetes/fake"
1920
)
2021

21-
funcTestPodEventLogger(t*testing.T) {
22+
funcTestReplicaSetEvents(t*testing.T) {
23+
t.Parallel()
24+
25+
queued:=make(chan agentsdk.PatchStartupLogs,1)
26+
agent:=httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter,r*http.Request) {
27+
varreq agentsdk.PatchStartupLogs
28+
err:=json.NewDecoder(r.Body).Decode(&req)
29+
assert.NoError(t,err)
30+
queued<-req
31+
}))
32+
agentURL,err:=url.Parse(agent.URL)
33+
require.NoError(t,err)
34+
namespace:="test-namespace"
35+
client:=fake.NewSimpleClientset()
36+
ctx:=context.Background()
37+
reporter,err:=newPodEventLogger(ctx,podEventLoggerOptions{
38+
client:client,
39+
coderURL:agentURL,
40+
namespace:namespace,
41+
logger:slogtest.Make(t,nil),
42+
logDebounce:time.Millisecond,
43+
})
44+
require.NoError(t,err)
45+
46+
rs:=&appsv1.ReplicaSet{
47+
ObjectMeta: v1.ObjectMeta{
48+
Name:"test-rs",
49+
CreationTimestamp: v1.Time{
50+
Time:time.Now().Add(time.Hour),
51+
},
52+
},
53+
Spec: appsv1.ReplicaSetSpec{
54+
Template: corev1.PodTemplateSpec{
55+
ObjectMeta: v1.ObjectMeta{
56+
Name:"test-pod",
57+
},
58+
Spec: corev1.PodSpec{
59+
Containers: []corev1.Container{{
60+
Env: []corev1.EnvVar{
61+
{
62+
Name:"CODER_AGENT_TOKEN",
63+
Value:"test-token",
64+
},
65+
},
66+
}},
67+
},
68+
},
69+
},
70+
}
71+
_,err=client.AppsV1().ReplicaSets(namespace).Create(ctx,rs, v1.CreateOptions{})
72+
require.NoError(t,err)
73+
74+
log:=<-queued
75+
require.Len(t,log.Logs,1)
76+
require.Contains(t,log.Logs[0].Output,"Queued pod from ReplicaSet")
77+
78+
event:=&corev1.Event{
79+
ObjectMeta: v1.ObjectMeta{
80+
Name:"test-event",
81+
CreationTimestamp: v1.Time{
82+
Time:time.Now().Add(time.Hour),
83+
},
84+
},
85+
InvolvedObject: corev1.ObjectReference{
86+
Kind:"ReplicaSet",
87+
Name:"test-rs",
88+
},
89+
Reason:"Test",
90+
Message:"Test event",
91+
}
92+
_,err=client.CoreV1().Events(namespace).Create(ctx,event, v1.CreateOptions{})
93+
require.NoError(t,err)
94+
95+
log=<-queued
96+
require.Len(t,log.Logs,1)
97+
require.Contains(t,log.Logs[0].Output,event.Message)
98+
99+
err=client.AppsV1().ReplicaSets(namespace).Delete(ctx,rs.Name, v1.DeleteOptions{})
100+
require.NoError(t,err)
101+
102+
require.Eventually(t,func()bool {
103+
reporter.mutex.Lock()
104+
deferreporter.mutex.Unlock()
105+
returnlen(reporter.podToAgentTokens)==0&&len(reporter.replicaSetToTokens)==0
106+
},time.Second,time.Millisecond)
107+
108+
err=reporter.Close()
109+
require.NoError(t,err)
110+
}
111+
112+
funcTestPodEvents(t*testing.T) {
22113
t.Parallel()
23114

24115
queued:=make(chan agentsdk.PatchStartupLogs,1)

‎templates/service.yaml‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ rules:
66
-apiGroups:[""]
77
resources:["pods", "events"]
88
verbs:["get", "watch", "list"]
9+
-apiGroups:["apps"]
10+
resources:["replicasets", "events"]
11+
verbs:["get", "watch", "list"]
912
---
1013
apiVersion:v1
1114
kind:ServiceAccount

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp