|
9 | 9 |
|
10 | 10 | "github.com/google/uuid"
|
11 | 11 | "github.com/stretchr/testify/require"
|
| 12 | +protobuf"google.golang.org/protobuf/proto" |
12 | 13 |
|
13 | 14 | "cdr.dev/slog"
|
14 | 15 | "cdr.dev/slog/sloggers/slogtest"
|
@@ -165,6 +166,102 @@ func TestLogSender_LogLimitExceeded(t *testing.T) {
|
165 | 166 | require.Len(t,uut.queues,0)
|
166 | 167 | }
|
167 | 168 |
|
| 169 | +funcTestLogSender_SkipHugeLog(t*testing.T) { |
| 170 | +t.Parallel() |
| 171 | +testCtx:=testutil.Context(t,testutil.WaitShort) |
| 172 | +ctx,cancel:=context.WithCancel(testCtx) |
| 173 | +logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug) |
| 174 | +fDest:=newFakeLogDest() |
| 175 | +uut:=newLogSender(logger) |
| 176 | + |
| 177 | +t0:=dbtime.Now() |
| 178 | +ls1:= uuid.UUID{0x11} |
| 179 | +hugeLog:=make([]byte,logOutputMaxBytes+1) |
| 180 | +fori:=rangehugeLog { |
| 181 | +hugeLog[i]='q' |
| 182 | +} |
| 183 | +err:=uut.enqueue(ls1, |
| 184 | +agentsdk.Log{ |
| 185 | +CreatedAt:t0, |
| 186 | +Output:string(hugeLog), |
| 187 | +Level:codersdk.LogLevelInfo, |
| 188 | +}, |
| 189 | +agentsdk.Log{ |
| 190 | +CreatedAt:t0, |
| 191 | +Output:"test log 1, src 1", |
| 192 | +Level:codersdk.LogLevelInfo, |
| 193 | +}) |
| 194 | +require.NoError(t,err) |
| 195 | + |
| 196 | +loopErr:=make(chanerror,1) |
| 197 | +gofunc() { |
| 198 | +err:=uut.sendLoop(ctx,fDest) |
| 199 | +loopErr<-err |
| 200 | +}() |
| 201 | + |
| 202 | +req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs) |
| 203 | +require.NotNil(t,req) |
| 204 | +require.Len(t,req.Logs,1,"it should skip the huge log") |
| 205 | +require.Equal(t,"test log 1, src 1",req.Logs[0].GetOutput()) |
| 206 | +require.Equal(t,proto.Log_INFO,req.Logs[0].GetLevel()) |
| 207 | +testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{}) |
| 208 | + |
| 209 | +cancel() |
| 210 | +err=testutil.RequireRecvCtx(testCtx,t,loopErr) |
| 211 | +require.NoError(t,err) |
| 212 | +} |
| 213 | + |
| 214 | +funcTestLogSender_Batch(t*testing.T) { |
| 215 | +t.Parallel() |
| 216 | +testCtx:=testutil.Context(t,testutil.WaitShort) |
| 217 | +ctx,cancel:=context.WithCancel(testCtx) |
| 218 | +logger:=slogtest.Make(t,nil).Leveled(slog.LevelDebug) |
| 219 | +fDest:=newFakeLogDest() |
| 220 | +uut:=newLogSender(logger) |
| 221 | + |
| 222 | +t0:=dbtime.Now() |
| 223 | +ls1:= uuid.UUID{0x11} |
| 224 | +varlogs []agentsdk.Log |
| 225 | +fori:=0;i<60000;i++ { |
| 226 | +logs=append(logs, agentsdk.Log{ |
| 227 | +CreatedAt:t0, |
| 228 | +Output:"r", |
| 229 | +Level:codersdk.LogLevelInfo, |
| 230 | +}) |
| 231 | +} |
| 232 | +err:=uut.enqueue(ls1,logs...) |
| 233 | +require.NoError(t,err) |
| 234 | + |
| 235 | +loopErr:=make(chanerror,1) |
| 236 | +gofunc() { |
| 237 | +err:=uut.sendLoop(ctx,fDest) |
| 238 | +loopErr<-err |
| 239 | +}() |
| 240 | + |
| 241 | +// with 60k logs, we should split into two updates to avoid going over 1MiB, since each log |
| 242 | +// is about 21 bytes. |
| 243 | +gotLogs:=0 |
| 244 | +req:=testutil.RequireRecvCtx(ctx,t,fDest.reqs) |
| 245 | +require.NotNil(t,req) |
| 246 | +gotLogs+=len(req.Logs) |
| 247 | +wire,err:=protobuf.Marshal(req) |
| 248 | +require.NoError(t,err) |
| 249 | +require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB") |
| 250 | +testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{}) |
| 251 | +req=testutil.RequireRecvCtx(ctx,t,fDest.reqs) |
| 252 | +require.NotNil(t,req) |
| 253 | +gotLogs+=len(req.Logs) |
| 254 | +wire,err=protobuf.Marshal(req) |
| 255 | +require.NoError(t,err) |
| 256 | +require.Less(t,len(wire),logOutputMaxBytes,"wire should not exceed 1MiB") |
| 257 | +require.Equal(t,60000,gotLogs) |
| 258 | +testutil.RequireSendCtx(ctx,t,fDest.resps,&proto.BatchCreateLogsResponse{}) |
| 259 | + |
| 260 | +cancel() |
| 261 | +err=testutil.RequireRecvCtx(testCtx,t,loopErr) |
| 262 | +require.NoError(t,err) |
| 263 | +} |
| 264 | + |
168 | 265 | typefakeLogDeststruct {
|
169 | 266 | reqschan*proto.BatchCreateLogsRequest
|
170 | 267 | respschan*proto.BatchCreateLogsResponse
|
|