Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc1341cc

Browse files
authored
feat: use proto streams to increase maximum module files payload (#18268)
This PR implements protobuf streaming to handle large module files by:1. **Streaming large payloads**: When module files exceed the 4MB limit,they're streamed in chunks using a new UploadFile RPC method2. **Database storage**: Streamed files are stored in the database andreferenced by hash for deduplication3. **Backward compatibility**: Small module files continue using theexisting direct payload method
1 parent8e29ee5 commitc1341cc

22 files changed

+1885
-493
lines changed

‎cli/testdata/coder_provisioner_list_--output_json.golden

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"last_seen_at": "====[timestamp]=====",
88
"name": "test-daemon",
99
"version": "v0.0.0-devel",
10-
"api_version": "1.6",
10+
"api_version": "1.7",
1111
"provisioners": [
1212
"echo"
1313
],

‎coderd/database/dbauthz/dbauthz.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ var (
171171
DisplayName:"Provisioner Daemon",
172172
Site:rbac.Permissions(map[string][]policy.Action{
173173
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead,policy.ActionUpdate,policy.ActionCreate},
174-
rbac.ResourceFile.Type: {policy.ActionRead},
174+
rbac.ResourceFile.Type: {policy.ActionCreate,policy.ActionRead},
175175
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
176176
rbac.ResourceTemplate.Type: {policy.ActionRead,policy.ActionUpdate},
177177
// Unsure why provisionerd needs update and read personal

‎coderd/database/dbmem/dbmem.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8743,6 +8743,12 @@ func (q *FakeQuerier) InsertFile(_ context.Context, arg database.InsertFileParam
87438743
q.mutex.Lock()
87448744
deferq.mutex.Unlock()
87458745

8746+
ifslices.ContainsFunc(q.files,func(file database.File)bool {
8747+
returnfile.CreatedBy==arg.CreatedBy&&file.Hash==arg.Hash
8748+
}) {
8749+
return database.File{},newUniqueConstraintError(database.UniqueFilesHashCreatedByKey)
8750+
}
8751+
87468752
//nolint:gosimple
87478753
file:= database.File{
87488754
ID:arg.ID,

‎coderd/provisionerdserver/provisionerdserver.go

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,7 @@ func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJo
773773
casedatabase.ProvisionerStorageMethodFile:
774774
file,err:=s.Database.GetFileByID(ctx,job.FileID)
775775
iferr!=nil {
776-
returnnil,failJob(fmt.Sprintf("get file byhash: %s",err))
776+
returnnil,failJob(fmt.Sprintf("get file byid: %s",err))
777777
}
778778
protoJob.TemplateSourceArchive=file.Data
779779
default:
@@ -1321,6 +1321,104 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
13211321
returntemplateAdmins,template,templateVersion,workspaceOwner,nil
13221322
}
13231323

1324+
func (s*server)UploadFile(stream proto.DRPCProvisionerDaemon_UploadFileStream)error {
1325+
varfile*sdkproto.DataBuilder
1326+
// Always terminate the stream with an empty response.
1327+
deferstream.SendAndClose(&proto.Empty{})
1328+
1329+
UploadFileStream:
1330+
for {
1331+
msg,err:=stream.Recv()
1332+
iferr!=nil {
1333+
returnxerrors.Errorf("receive complete job with files: %w",err)
1334+
}
1335+
1336+
switchtyped:=msg.Type.(type) {
1337+
case*proto.UploadFileRequest_DataUpload:
1338+
iffile!=nil {
1339+
returnxerrors.New("unexpected file upload while waiting for file completion")
1340+
}
1341+
1342+
file,err=sdkproto.NewDataBuilder(&sdkproto.DataUpload{
1343+
UploadType:typed.DataUpload.UploadType,
1344+
DataHash:typed.DataUpload.DataHash,
1345+
FileSize:typed.DataUpload.FileSize,
1346+
Chunks:typed.DataUpload.Chunks,
1347+
})
1348+
iferr!=nil {
1349+
returnxerrors.Errorf("unable to create file upload: %w",err)
1350+
}
1351+
1352+
iffile.IsDone() {
1353+
// If a file is 0 bytes, we can consider it done immediately.
1354+
// This should never really happen in practice, but we handle it gracefully.
1355+
break UploadFileStream
1356+
}
1357+
case*proto.UploadFileRequest_ChunkPiece:
1358+
iffile==nil {
1359+
returnxerrors.New("unexpected chunk piece while waiting for file upload")
1360+
}
1361+
1362+
done,err:=file.Add(&sdkproto.ChunkPiece{
1363+
Data:typed.ChunkPiece.Data,
1364+
FullDataHash:typed.ChunkPiece.FullDataHash,
1365+
PieceIndex:typed.ChunkPiece.PieceIndex,
1366+
})
1367+
iferr!=nil {
1368+
returnxerrors.Errorf("unable to add chunk piece: %w",err)
1369+
}
1370+
1371+
ifdone {
1372+
break UploadFileStream
1373+
}
1374+
}
1375+
}
1376+
1377+
fileData,err:=file.Complete()
1378+
iferr!=nil {
1379+
returnxerrors.Errorf("complete file upload: %w",err)
1380+
}
1381+
1382+
// Just rehash the data to be sure it is correct.
1383+
hashBytes:=sha256.Sum256(fileData)
1384+
hash:=hex.EncodeToString(hashBytes[:])
1385+
1386+
varinsert database.InsertFileParams
1387+
1388+
switchfile.Type {
1389+
casesdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES:
1390+
insert= database.InsertFileParams{
1391+
ID:uuid.New(),
1392+
Hash:hash,
1393+
CreatedAt:dbtime.Now(),
1394+
CreatedBy:uuid.Nil,
1395+
Mimetype:tarMimeType,
1396+
Data:fileData,
1397+
}
1398+
default:
1399+
returnxerrors.Errorf("unsupported file upload type: %s",file.Type)
1400+
}
1401+
1402+
//nolint:gocritic // Provisionerd actor
1403+
_,err=s.Database.InsertFile(dbauthz.AsProvisionerd(s.lifecycleCtx),insert)
1404+
iferr!=nil {
1405+
// Duplicated files already exist in the database, so we can ignore this error.
1406+
if!database.IsUniqueViolation(err,database.UniqueFilesHashCreatedByKey) {
1407+
returnxerrors.Errorf("insert file: %w",err)
1408+
}
1409+
}
1410+
1411+
s.Logger.Info(s.lifecycleCtx,"file uploaded to database",
1412+
slog.F("type",file.Type.String()),
1413+
slog.F("hash",hash),
1414+
slog.F("size",len(fileData)),
1415+
// new_insert indicates whether the file was newly inserted or already existed.
1416+
slog.F("new_insert",err==nil),
1417+
)
1418+
1419+
returnnil
1420+
}
1421+
13241422
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
13251423
func (s*server)CompleteJob(ctx context.Context,completed*proto.CompletedJob) (*proto.Empty,error) {
13261424
ctx,span:=s.startTrace(ctx,tracing.FuncName())
@@ -1606,6 +1704,20 @@ func (s *server) completeTemplateImportJob(ctx context.Context, job database.Pro
16061704
}
16071705
}
16081706

1707+
iflen(jobType.TemplateImport.ModuleFilesHash)>0 {
1708+
hashString:=hex.EncodeToString(jobType.TemplateImport.ModuleFilesHash)
1709+
//nolint:gocritic // Acting as provisioner
1710+
file,err:=db.GetFileByHashAndCreator(dbauthz.AsProvisionerd(ctx), database.GetFileByHashAndCreatorParams{Hash:hashString,CreatedBy:uuid.Nil})
1711+
iferr!=nil {
1712+
returnxerrors.Errorf("get file by hash, it should have been uploaded: %w",err)
1713+
}
1714+
1715+
fileID= uuid.NullUUID{
1716+
Valid:true,
1717+
UUID:file.ID,
1718+
}
1719+
}
1720+
16091721
err=db.InsertTemplateVersionTerraformValuesByJobID(ctx, database.InsertTemplateVersionTerraformValuesByJobIDParams{
16101722
JobID:jobID,
16111723
UpdatedAt:now,
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package provisionerdserver_test
2+
3+
import (
4+
"context"
5+
crand"crypto/rand"
6+
"fmt"
7+
"testing"
8+
9+
"github.com/google/uuid"
10+
"github.com/stretchr/testify/require"
11+
"golang.org/x/xerrors"
12+
"storj.io/drpc"
13+
14+
"github.com/coder/coder/v2/coderd/database"
15+
"github.com/coder/coder/v2/coderd/externalauth"
16+
"github.com/coder/coder/v2/codersdk/drpcsdk"
17+
proto"github.com/coder/coder/v2/provisionerd/proto"
18+
sdkproto"github.com/coder/coder/v2/provisionersdk/proto"
19+
"github.com/coder/coder/v2/testutil"
20+
)
21+
22+
// TestUploadFileLargeModuleFiles tests the UploadFile RPC with large module files
23+
funcTestUploadFileLargeModuleFiles(t*testing.T) {
24+
t.Parallel()
25+
26+
ctx:=testutil.Context(t,testutil.WaitMedium)
27+
28+
// Create server
29+
server,db,_,_:=setup(t,false,&overrides{
30+
externalAuthConfigs: []*externalauth.Config{{}},
31+
})
32+
33+
testSizes:= []int{
34+
0,// Empty file
35+
512,// A small file
36+
drpcsdk.MaxMessageSize+1024,// Just over the limit
37+
drpcsdk.MaxMessageSize*2,// 2x the limit
38+
sdkproto.ChunkSize*3+512,// Multiple chunks with partial last
39+
}
40+
41+
for_,size:=rangetestSizes {
42+
t.Run(fmt.Sprintf("size_%d_bytes",size),func(t*testing.T) {
43+
t.Parallel()
44+
45+
// Generate test module files data
46+
moduleData:=make([]byte,size)
47+
_,err:=crand.Read(moduleData)
48+
require.NoError(t,err)
49+
50+
// Convert to upload format
51+
upload,chunks:=sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES,moduleData)
52+
53+
stream:=newMockUploadStream(upload,chunks...)
54+
55+
// Execute upload
56+
err=server.UploadFile(stream)
57+
require.NoError(t,err)
58+
59+
// Upload should be done
60+
require.True(t,stream.isDone(),"stream should be done after upload")
61+
62+
// Verify file was stored in database
63+
hashString:=fmt.Sprintf("%x",upload.DataHash)
64+
file,err:=db.GetFileByHashAndCreator(ctx, database.GetFileByHashAndCreatorParams{
65+
Hash:hashString,
66+
CreatedBy:uuid.Nil,// Provisionerd creates with Nil UUID
67+
})
68+
require.NoError(t,err)
69+
require.Equal(t,hashString,file.Hash)
70+
require.Equal(t,moduleData,file.Data)
71+
require.Equal(t,"application/x-tar",file.Mimetype)
72+
73+
// Try to upload it again, and it should still be successful
74+
stream=newMockUploadStream(upload,chunks...)
75+
err=server.UploadFile(stream)
76+
require.NoError(t,err,"re-upload should succeed without error")
77+
require.True(t,stream.isDone(),"stream should be done after re-upload")
78+
})
79+
}
80+
}
81+
82+
// TestUploadFileErrorScenarios tests various error conditions in file upload
83+
funcTestUploadFileErrorScenarios(t*testing.T) {
84+
t.Parallel()
85+
86+
//nolint:dogsled
87+
server,_,_,_:=setup(t,false,&overrides{
88+
externalAuthConfigs: []*externalauth.Config{{}},
89+
})
90+
91+
// Generate test data
92+
moduleData:=make([]byte,sdkproto.ChunkSize*2)
93+
_,err:=crand.Read(moduleData)
94+
require.NoError(t,err)
95+
96+
upload,chunks:=sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES,moduleData)
97+
98+
t.Run("chunk_before_upload",func(t*testing.T) {
99+
t.Parallel()
100+
101+
stream:=newMockUploadStream(nil,chunks[0])
102+
103+
err:=server.UploadFile(stream)
104+
require.ErrorContains(t,err,"unexpected chunk piece while waiting for file upload")
105+
require.True(t,stream.isDone(),"stream should be done after error")
106+
})
107+
108+
t.Run("duplicate_upload",func(t*testing.T) {
109+
t.Parallel()
110+
111+
stream:=&mockUploadStream{
112+
done:make(chanstruct{}),
113+
messages:make(chan*proto.UploadFileRequest,2),
114+
}
115+
116+
up:=&proto.UploadFileRequest{Type:&proto.UploadFileRequest_DataUpload{DataUpload:upload}}
117+
118+
// Send it twice
119+
stream.messages<-up
120+
stream.messages<-up
121+
122+
err:=server.UploadFile(stream)
123+
require.ErrorContains(t,err,"unexpected file upload while waiting for file completion")
124+
require.True(t,stream.isDone(),"stream should be done after error")
125+
})
126+
127+
t.Run("unsupported_upload_type",func(t*testing.T) {
128+
t.Parallel()
129+
130+
//nolint:govet // Ignore lock copy
131+
cpy:=*upload
132+
cpy.UploadType=sdkproto.DataUploadType_UPLOAD_TYPE_UNKNOWN// Set to an unsupported type
133+
stream:=newMockUploadStream(&cpy,chunks...)
134+
135+
err:=server.UploadFile(stream)
136+
require.ErrorContains(t,err,"unsupported file upload type")
137+
require.True(t,stream.isDone(),"stream should be done after error")
138+
})
139+
}
140+
141+
typemockUploadStreamstruct {
142+
donechanstruct{}
143+
messageschan*proto.UploadFileRequest
144+
}
145+
146+
func (mmockUploadStream)SendAndClose(empty*proto.Empty)error {
147+
close(m.done)
148+
returnnil
149+
}
150+
151+
func (mmockUploadStream)Recv() (*proto.UploadFileRequest,error) {
152+
msg,ok:=<-m.messages
153+
if!ok {
154+
returnnil,xerrors.New("no more messages to receive")
155+
}
156+
returnmsg,nil
157+
}
158+
func (*mockUploadStream)Context() context.Context {panic(errUnimplemented) }
159+
func (*mockUploadStream)MsgSend(msg drpc.Message,enc drpc.Encoding)error {
160+
panic(errUnimplemented)
161+
}
162+
163+
func (*mockUploadStream)MsgRecv(msg drpc.Message,enc drpc.Encoding)error {
164+
panic(errUnimplemented)
165+
}
166+
func (*mockUploadStream)CloseSend()error {panic(errUnimplemented) }
167+
func (*mockUploadStream)Close()error {panic(errUnimplemented) }
168+
func (m*mockUploadStream)isDone()bool {
169+
select {
170+
case<-m.done:
171+
returntrue
172+
default:
173+
returnfalse
174+
}
175+
}
176+
177+
funcnewMockUploadStream(up*sdkproto.DataUpload,chunks...*sdkproto.ChunkPiece)*mockUploadStream {
178+
stream:=&mockUploadStream{
179+
done:make(chanstruct{}),
180+
messages:make(chan*proto.UploadFileRequest,1+len(chunks)),
181+
}
182+
ifup!=nil {
183+
stream.messages<-&proto.UploadFileRequest{Type:&proto.UploadFileRequest_DataUpload{DataUpload:up}}
184+
}
185+
186+
for_,chunk:=rangechunks {
187+
stream.messages<-&proto.UploadFileRequest{Type:&proto.UploadFileRequest_ChunkPiece{ChunkPiece:chunk}}
188+
}
189+
close(stream.messages)
190+
returnstream
191+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp