@@ -49,7 +49,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
49
49
// if we are following logs, start the subscription before we query the database, so that we don't miss any logs
50
50
// between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track
51
51
// of processed IDs.
52
- var bufferedLogs <- chan database.ProvisionerJobLog
52
+ var bufferedLogs <- chan * database.ProvisionerJobLog
53
53
if follow {
54
54
bl ,closeFollow ,err := api .followLogs (actor ,job .ID )
55
55
if err != nil {
@@ -173,8 +173,9 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
173
173
logger .Debug (context .Background (),"job logs context canceled" )
174
174
return
175
175
case log ,ok := <- bufferedLogs :
176
- if ! ok {
177
- logger .Debug (context .Background (),"done with published logs" )
176
+ // A nil log is sent when complete!
177
+ if ! ok || log == nil {
178
+ logger .Debug (context .Background (),"reached the end of published logs" )
178
179
return
179
180
}
180
181
if logIdsDone [log .ID ] {
@@ -183,7 +184,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
183
184
}else {
184
185
logger .Debug (ctx ,"subscribe encoding log" ,
185
186
slog .F ("stage" ,log .Stage ))
186
- err = encoder .Encode (convertProvisionerJobLog (log ))
187
+ err = encoder .Encode (convertProvisionerJobLog (* log ))
187
188
if err != nil {
188
189
return
189
190
}
@@ -369,12 +370,12 @@ type provisionerJobLogsMessage struct {
369
370
EndOfLogs bool `json:"end_of_logs,omitempty"`
370
371
}
371
372
372
- func (api * API )followLogs (actor rbac.Subject ,jobID uuid.UUID ) (<- chan database.ProvisionerJobLog ,func (),error ) {
373
+ func (api * API )followLogs (actor rbac.Subject ,jobID uuid.UUID ) (<- chan * database.ProvisionerJobLog ,func (),error ) {
373
374
logger := api .Logger .With (slog .F ("job_id" ,jobID ))
374
375
375
376
var (
376
377
closed = make (chan struct {})
377
- bufferedLogs = make (chan database.ProvisionerJobLog ,128 )
378
+ bufferedLogs = make (chan * database.ProvisionerJobLog ,128 )
378
379
logMut = & sync.Mutex {}
379
380
)
380
381
closeSubscribe ,err := api .Pubsub .Subscribe (
@@ -415,9 +416,9 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
415
416
return
416
417
default :
417
418
}
418
-
419
+ log := log
419
420
select {
420
- case bufferedLogs <- log :
421
+ case bufferedLogs <- & log :
421
422
logger .Debug (ctx ,"subscribe buffered log" ,slog .F ("stage" ,log .Stage ))
422
423
default :
423
424
// If this overflows users could miss logs streaming. This can happen
@@ -439,15 +440,17 @@ func (api *API) followLogs(actor rbac.Subject, jobID uuid.UUID) (<-chan database
439
440
default :
440
441
}
441
442
logger .Debug (ctx ,"got End of Logs" )
442
-
443
- close (closed )
444
- close (bufferedLogs )
443
+ bufferedLogs <- nil
445
444
logMut .Unlock ()
446
445
}
447
446
},
448
447
)
449
448
if err != nil {
450
449
return nil ,nil ,err
451
450
}
452
- return bufferedLogs ,closeSubscribe ,nil
451
+ return bufferedLogs ,func () {
452
+ closeSubscribe ()
453
+ close (closed )
454
+ close (bufferedLogs )
455
+ },nil
453
456
}