Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit96b6eea

Browse files
committed
upload files independently
1 parentf3913c8 commit96b6eea

File tree

5 files changed

+107
-48
lines changed

5 files changed

+107
-48
lines changed

‎coderd/provisionerdserver/provisionerdserver.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,21 +1321,36 @@ func (s *server) prepareForNotifyWorkspaceManualBuildFailed(ctx context.Context,
13211321
returntemplateAdmins,template,templateVersion,workspaceOwner,nil
13221322
}
13231323

1324-
func (s*server)CompleteJobWithFiles(stream proto.DRPCProvisionerDaemon_CompleteJobWithFilesStream)error {
1324+
func (s*server)UploadFile(stream proto.DRPCProvisionerDaemon_UploadFileStream)error {
13251325
varfile*sdkproto.DataBuilder
1326+
deferstream.Close()
13261327

1327-
// stream expects files first
1328+
UploadFileStream:
13281329
for {
13291330
msg,err:=stream.Recv()
13301331
iferr!=nil {
13311332
returnxerrors.Errorf("receive complete job with files: %w",err)
13321333
}
13331334

13341335
switchtyped:=msg.Type.(type) {
1335-
case*proto.CompleteWithFilesRequest_Complete:
1336-
case*proto.CompleteWithFilesRequest_ChunkPiece:
1336+
case*proto.UploadFileRequest_ChunkPiece:
1337+
iffile==nil {
1338+
returnxerrors.New("unexpected chunk piece while waiting for file upload")
1339+
}
1340+
1341+
done,err:=file.Add(&sdkproto.ChunkPiece{
1342+
Data:typed.ChunkPiece.Data,
1343+
FullDataHash:typed.ChunkPiece.FullDataHash,
1344+
PieceIndex:typed.ChunkPiece.PieceIndex,
1345+
})
1346+
iferr!=nil {
1347+
returnxerrors.Errorf("unable to add chunk piece: %w",err)
1348+
}
13371349

1338-
case*proto.CompleteWithFilesRequest_DataUpload:
1350+
ifdone {
1351+
break UploadFileStream
1352+
}
1353+
case*proto.UploadFileRequest_DataUpload:
13391354
iffile!=nil {
13401355
returnxerrors.New("unexpected file upload while waiting for file completion")
13411356
}
@@ -1352,6 +1367,39 @@ func (s *server) CompleteJobWithFiles(stream proto.DRPCProvisionerDaemon_Complet
13521367
}
13531368
}
13541369

1370+
fileData,err:=file.Complete()
1371+
iferr!=nil {
1372+
returnxerrors.Errorf("complete file upload: %w",err)
1373+
}
1374+
1375+
// Just rehash the data to be sure it is correct.
1376+
hashBytes:=sha256.Sum256(fileData)
1377+
hash:=hex.EncodeToString(hashBytes[:])
1378+
1379+
varinsert database.InsertFileParams
1380+
1381+
switchfile.Type {
1382+
casesdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES:
1383+
insert= database.InsertFileParams{
1384+
ID:uuid.New(),
1385+
Hash:hash,
1386+
CreatedAt:dbtime.Now(),
1387+
CreatedBy:uuid.Nil,
1388+
Mimetype:tarMimeType,
1389+
Data:fileData,
1390+
}
1391+
default:
1392+
returnxerrors.Errorf("unsupported file upload type: %s",file.Type)
1393+
}
1394+
1395+
_,err=s.Database.InsertFile(s.lifecycleCtx,insert)
1396+
iferr!=nil {
1397+
// Duplicated files already exist in the database, so we can ignore this error.
1398+
if!database.IsUniqueViolation(err,database.UniqueFilesHashCreatedByKey) {
1399+
returnxerrors.Errorf("insert file: %w",err)
1400+
}
1401+
}
1402+
returnnil
13551403
}
13561404

13571405
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.

‎provisionerd/proto/provisionerd.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ message CompletedJob {
9292
repeatedprovisioner.Presetpresets=8;
9393
bytesplan=9;
9494
bytesmodule_files=10;
95+
bytesmodule_files_hash=11;
9596
}
9697
messageTemplateDryRun {
9798
repeatedprovisioner.Resourceresources=1;

‎provisionerd/provisionerd.go

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -517,56 +517,66 @@ func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
517517
returnerr
518518
}
519519

520-
func (p*Server)CompleteJobWithModuleFiles(ctx context.Context,in*proto.CompletedJob,moduleFiles []byte)error {
520+
func (p*Server)UploadModuleFiles(ctx context.Context,moduleFiles []byte)error {
521+
//Send the files separately if the message size is too large.
522+
_,err:=clientDoWithRetries(ctx,p.client,func(ctx context.Context,client proto.DRPCProvisionerDaemonClient) (*proto.Empty,error) {
523+
stream,err:=client.UploadFile(ctx)
524+
iferr!=nil {
525+
returnnil,xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w",err)
526+
}
527+
deferstream.Close()
528+
529+
dataUp,chunks:=sdkproto.BytesToDataUpload(sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES,moduleFiles)
530+
531+
err=stream.Send(&proto.UploadFileRequest{Type:&proto.UploadFileRequest_DataUpload{DataUpload:&proto.DataUpload{
532+
UploadType:proto.DataUploadType(dataUp.UploadType),
533+
DataHash:dataUp.DataHash,
534+
FileSize:dataUp.FileSize,
535+
Chunks:dataUp.Chunks,
536+
}}})
537+
iferr!=nil {
538+
ifretryable(err) {// Do not retry
539+
returnnil,xerrors.Errorf("send data upload: %s",err.Error())
540+
}
541+
returnnil,xerrors.Errorf("send data upload: %w",err)
542+
}
543+
544+
fori,chunk:=rangechunks {
545+
err=stream.Send(&proto.UploadFileRequest{Type:&proto.UploadFileRequest_ChunkPiece{ChunkPiece:&proto.ChunkPiece{
546+
Data:chunk.Data,
547+
FullDataHash:chunk.FullDataHash,
548+
PieceIndex:chunk.PieceIndex,
549+
}}})
550+
iferr!=nil {
551+
ifretryable(err) {// Do not retry
552+
returnnil,xerrors.Errorf("send chunk piece: %s",err.Error())
553+
}
554+
returnnil,xerrors.Errorf("send chunk piece %d: %w",i,err)
555+
}
556+
}
557+
558+
return&proto.Empty{},nil
559+
})
560+
iferr!=nil {
561+
returnxerrors.Errorf("upload module files: %w",err)
562+
}
563+
521564
returnnil
522-
// Send the files separately if the message size is too large.
523-
//_, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
524-
//stream, err := client.CompleteJobWithFiles(ctx)
525-
//if err != nil {
526-
//return nil, xerrors.Errorf("failed to start CompleteJobWithFiles stream: %w", err)
527-
//}
528-
//
529-
//dataUp, chunks := sdkproto.BytesToDataUpload(moduleFiles)
530-
//
531-
//err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_DataUpload{DataUpload: &proto.DataUpload{
532-
//UploadType: proto.DataUploadType(dataUp.UploadType),
533-
//DataHash: dataUp.DataHash,
534-
//FileSize: dataUp.FileSize,
535-
//Chunks: dataUp.Chunks,
536-
//}}})
537-
//if err != nil {
538-
//return nil, xerrors.Errorf("send data upload: %w", err)
539-
//}
540-
//
541-
//for i, chunk := range chunks {
542-
//err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_ChunkPiece{ChunkPiece: &proto.ChunkPiece{
543-
//Data: chunk.Data,
544-
//FullDataHash: chunk.FullDataHash,
545-
//PieceIndex: chunk.PieceIndex,
546-
//}}})
547-
//if err != nil {
548-
//return nil, xerrors.Errorf("send chunk piece %d: %w", i, err)
549-
//}
550-
//}
551-
//
552-
//err = stream.Send(&proto.CompleteWithFilesRequest{Type: &proto.CompleteWithFilesRequest_Complete{Complete: in}})
553-
//if err != nil {
554-
//return nil, xerrors.Errorf("send complete job: %w", err)
555-
//}
556-
//
557-
//return &proto.Empty{}, nil
558-
//})
559-
//return err
560565
}
561566

562567
func (p*Server)CompleteJob(ctx context.Context,in*proto.CompletedJob)error {
563568
ifti,ok:=in.Type.(*proto.CompletedJob_TemplateImport_);ok {
564569
messageSize:=protobuf.Size(in)
565570
ifmessageSize>drpcsdk.MaxMessageSize&&
566571
messageSize-len(ti.TemplateImport.ModuleFiles)<drpcsdk.MaxMessageSize {
572+
573+
// Split the module files from the message if it exceeds the max size.
567574
moduleFiles:=ti.TemplateImport.ModuleFiles
568575
ti.TemplateImport.ModuleFiles=nil// Clear the files in the final message
569-
returnp.CompleteJobWithModuleFiles(ctx,in,moduleFiles)
576+
err:=p.UploadModuleFiles(ctx,moduleFiles)
577+
iferr!=nil {
578+
returnerr
579+
}
570580
}
571581
}
572582

‎provisionersdk/proto/dataupload.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more aboutcustomizing how changed files appear on GitHub.

‎provisionersdk/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (s *Session) handleRequests() error {
168168
ifprotobuf.Size(resp)>drpcsdk.MaxMessageSize {
169169
// Send the modules over as a stream
170170
s.Logger.Info(s.Context(),"plan response too large, sending modules as stream")
171-
dataUp,chunks:=proto.BytesToDataUpload(complete.ModuleFiles)
171+
dataUp,chunks:=proto.BytesToDataUpload(proto.DataUploadType_UPLOAD_TYPE_MODULE_FILES,complete.ModuleFiles)
172172

173173
complete.ModuleFiles=nil// sent over the stream
174174
resp.Type=&proto.Response_Plan{Plan:complete}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp