Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfc92ad1

Browse files
authored
feat: add crc32c_checksum argument to download_chunks_concurrently (#1138)
1 parenta455195 commitfc92ad1

File tree

5 files changed

+262
-22
lines changed

5 files changed

+262
-22
lines changed

‎google/cloud/storage/transfer_manager.py‎

Lines changed: 142 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
importwarnings
2323
importpickle
2424
importcopyreg
25+
importstruct
26+
importbase64
2527
importfunctools
2628

2729
fromgoogle.api_coreimportexceptions
@@ -32,9 +34,11 @@
3234
fromgoogle.cloud.storage._helpersimport_api_core_retry_to_resumable_media_retry
3335
fromgoogle.cloud.storage.retryimportDEFAULT_RETRY
3436

37+
importgoogle_crc32c
38+
3539
fromgoogle.resumable_media.requests.uploadimportXMLMPUContainer
3640
fromgoogle.resumable_media.requests.uploadimportXMLMPUPart
37-
41+
fromgoogle.resumable_media.commonimportDataCorruption
3842

3943
warnings.warn(
4044
"The module `transfer_manager` is a preview feature. Functionality and API "
@@ -44,6 +48,7 @@
4448

4549
TM_DEFAULT_CHUNK_SIZE=32*1024*1024
4650
DEFAULT_MAX_WORKERS=8
51+
MAX_CRC32C_ZERO_ARRAY_SIZE=4*1024*1024
4752
METADATA_HEADER_TRANSLATION= {
4853
"cacheControl":"Cache-Control",
4954
"contentDisposition":"Content-Disposition",
@@ -57,6 +62,20 @@
5762
PROCESS="process"
5863
THREAD="thread"
5964

65+
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE="""\
66+
Checksum mismatch while downloading:
67+
68+
{}
69+
70+
The object metadata indicated a crc32c checksum of:
71+
72+
{}
73+
74+
but the actual crc32c checksum of the downloaded contents was:
75+
76+
{}
77+
"""
78+
6079

6180
_cached_clients= {}
6281

@@ -732,6 +751,8 @@ def download_chunks_concurrently(
732751
deadline=None,
733752
worker_type=PROCESS,
734753
max_workers=DEFAULT_MAX_WORKERS,
754+
*,
755+
crc32c_checksum=True,
735756
):
736757
"""Download a single file in chunks, concurrently.
737758
@@ -744,9 +765,6 @@ def download_chunks_concurrently(
744765
performance under normal circumstances due to Python interpreter threading
745766
behavior. The default is therefore to use processes instead of threads.
746767
747-
Checksumming (md5 or crc32c) is not supported for chunked operations. Any
748-
`checksum` parameter passed in to download_kwargs will be ignored.
749-
750768
:param bucket:
751769
The bucket which contains the blobs to be downloaded
752770
@@ -768,10 +786,13 @@ def download_chunks_concurrently(
768786
:param download_kwargs:
769787
A dictionary of keyword arguments to pass to the download method. Refer
770788
to the documentation for blob.download_to_file() or
771-
blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function.
789+
blob.download_to_filename() for more information. The dict is directly
790+
passed into the download methods and is not validated by this function.
772791
773792
Keyword arguments "start" and "end" which are not supported and will
774-
cause a ValueError if present.
793+
cause a ValueError if present. The key "checksum" is also not supported
794+
in download_kwargs, but see the argument "crc32c_checksum" (which does
795+
not go in download_kwargs) below.
775796
776797
:type deadline: int
777798
:param deadline:
@@ -811,15 +832,33 @@ def download_chunks_concurrently(
811832
and the default is a conservative number that should work okay in most
812833
cases without consuming excessive resources.
813834
814-
:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
835+
:type crc32c_checksum: bool
836+
:param crc32c_checksum:
837+
Whether to compute a checksum for the resulting object, using the crc32c
838+
algorithm. As the checksums for each chunk must be combined using a
839+
feature of crc32c that is not available for md5, md5 is not supported.
840+
841+
:raises:
842+
:exc:`concurrent.futures.TimeoutError`
843+
if deadline is exceeded.
844+
:exc:`google.resumable_media.common.DataCorruption` if the download's
845+
checksum doesn't agree with server-computed checksum. The
846+
`google.resumable_media` exception is used here for consistency
847+
with other download methods despite the exception originating
848+
elsewhere.
815849
"""
850+
client=blob.client
816851

817852
ifdownload_kwargsisNone:
818853
download_kwargs= {}
819854
if"start"indownload_kwargsor"end"indownload_kwargs:
820855
raiseValueError(
821856
"Download arguments 'start' and 'end' are not supported by download_chunks_concurrently."
822857
)
858+
if"checksum"indownload_kwargs:
859+
raiseValueError(
860+
"'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c_checksum' argument on download_chunks_concurrently instead."
861+
)
823862

824863
download_kwargs["command"]="tm.download_sharded"
825864

@@ -851,16 +890,42 @@ def download_chunks_concurrently(
851890
start=start,
852891
end=cursor-1,
853892
download_kwargs=download_kwargs,
893+
crc32c_checksum=crc32c_checksum,
854894
)
855895
)
856896

857897
concurrent.futures.wait(
858898
futures,timeout=deadline,return_when=concurrent.futures.ALL_COMPLETED
859899
)
860900

861-
# Raise any exceptions. Successful results can be ignored.
901+
# Raise any exceptions; combine checksums.
902+
results= []
862903
forfutureinfutures:
863-
future.result()
904+
results.append(future.result())
905+
906+
ifcrc32c_checksumandresults:
907+
crc_digest=_digest_ordered_checksum_and_size_pairs(results)
908+
actual_checksum=base64.b64encode(crc_digest).decode("utf-8")
909+
expected_checksum=blob.crc32c
910+
ifactual_checksum!=expected_checksum:
911+
# For consistency with other download methods we will use
912+
# "google.resumable_media.common.DataCorruption" despite the error
913+
# not originating inside google.resumable_media.
914+
download_url=blob._get_download_url(
915+
client,
916+
if_generation_match=download_kwargs.get("if_generation_match"),
917+
if_generation_not_match=download_kwargs.get("if_generation_not_match"),
918+
if_metageneration_match=download_kwargs.get("if_metageneration_match"),
919+
if_metageneration_not_match=download_kwargs.get(
920+
"if_metageneration_not_match"
921+
),
922+
)
923+
raiseDataCorruption(
924+
None,
925+
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format(
926+
download_url,expected_checksum,actual_checksum
927+
),
928+
)
864929
returnNone
865930

866931

@@ -1118,23 +1183,58 @@ def _headers_from_metadata(metadata):
11181183

11191184

11201185
def_download_and_write_chunk_in_place(
1121-
maybe_pickled_blob,filename,start,end,download_kwargs
1186+
maybe_pickled_blob,filename,start,end,download_kwargs,crc32c_checksum
11221187
):
11231188
"""Helper function that runs inside a thread or subprocess.
11241189
11251190
`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
11261191
Blob (for processes) because the default pickling mangles Client objects
1127-
which are attached to Blobs."""
1192+
which are attached to Blobs.
1193+
1194+
Returns a crc if configured (or None) and the size written.
1195+
"""
11281196

11291197
ifisinstance(maybe_pickled_blob,Blob):
11301198
blob=maybe_pickled_blob
11311199
else:
11321200
blob=pickle.loads(maybe_pickled_blob)
1133-
withopen(
1134-
filename,"rb+"
1135-
)asf:# Open in mixed read/write mode to avoid truncating or appending
1136-
f.seek(start)
1137-
returnblob._prep_and_do_download(f,start=start,end=end,**download_kwargs)
1201+
1202+
with_ChecksummingSparseFileWrapper(filename,start,crc32c_checksum)asf:
1203+
blob._prep_and_do_download(f,start=start,end=end,**download_kwargs)
1204+
return (f.crc, (end-start)+1)
1205+
1206+
1207+
class_ChecksummingSparseFileWrapper:
1208+
"""A file wrapper that writes to a sparse file and optionally checksums.
1209+
1210+
This wrapper only implements write() and does not inherit from `io` module
1211+
base classes.
1212+
"""
1213+
1214+
def__init__(self,filename,start_position,crc32c_enabled):
1215+
# Open in mixed read/write mode to avoid truncating or appending
1216+
self.f=open(filename,"rb+")
1217+
self.f.seek(start_position)
1218+
self._crc=None
1219+
self._crc32c_enabled=crc32c_enabled
1220+
1221+
defwrite(self,chunk):
1222+
ifself._crc32c_enabled:
1223+
ifself._crcisNone:
1224+
self._crc=google_crc32c.value(chunk)
1225+
else:
1226+
self._crc=google_crc32c.extend(self._crc,chunk)
1227+
self.f.write(chunk)
1228+
1229+
@property
1230+
defcrc(self):
1231+
returnself._crc
1232+
1233+
def__enter__(self):
1234+
returnself
1235+
1236+
def__exit__(self,exc_type,exc_value,tb):
1237+
self.f.close()
11381238

11391239

11401240
def_call_method_on_maybe_pickled_blob(
@@ -1208,6 +1308,32 @@ def _get_pool_class_and_requirements(worker_type):
12081308
)
12091309

12101310

1311+
def_digest_ordered_checksum_and_size_pairs(checksum_and_size_pairs):
1312+
base_crc=None
1313+
zeroes=bytes(MAX_CRC32C_ZERO_ARRAY_SIZE)
1314+
forpart_crc,sizeinchecksum_and_size_pairs:
1315+
ifnotbase_crc:
1316+
base_crc=part_crc
1317+
else:
1318+
base_crc^=0xFFFFFFFF# precondition
1319+
1320+
# Zero pad base_crc32c. To conserve memory, do so with only
1321+
# MAX_CRC32C_ZERO_ARRAY_SIZE at a time. Reuse the zeroes array where
1322+
# possible.
1323+
padded=0
1324+
whilepadded<size:
1325+
desired_zeroes_size=min((size-padded),MAX_CRC32C_ZERO_ARRAY_SIZE)
1326+
base_crc=google_crc32c.extend(base_crc,zeroes[:desired_zeroes_size])
1327+
padded+=desired_zeroes_size
1328+
1329+
base_crc^=0xFFFFFFFF# postcondition
1330+
base_crc^=part_crc
1331+
crc_digest=struct.pack(
1332+
">L",base_crc
1333+
)# https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c
1334+
returncrc_digest
1335+
1336+
12111337
class_LazyClient:
12121338
"""An object that will transform into either a cached or a new Client"""
12131339

‎samples/snippets/snippets_test.py‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def test_list_blobs_with_prefix(test_blob, capsys):
213213
deftest_upload_blob(test_bucket):
214214
withtempfile.NamedTemporaryFile()assource_file:
215215
source_file.write(b"test")
216+
source_file.flush()
216217

217218
storage_upload_file.upload_blob(
218219
test_bucket.name,source_file.name,"test_upload_blob"
@@ -243,6 +244,7 @@ def test_upload_blob_with_kms(test_bucket):
243244
blob_name=f"test_upload_with_kms_{uuid.uuid4().hex}"
244245
withtempfile.NamedTemporaryFile()assource_file:
245246
source_file.write(b"test")
247+
source_file.flush()
246248
storage_upload_with_kms_key.upload_blob_with_kms(
247249
test_bucket.name,
248250
source_file.name,
@@ -779,6 +781,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
779781

780782
withtempfile.NamedTemporaryFile()asfile:
781783
file.write(b"test")
784+
file.flush()
782785

783786
storage_upload_file.upload_blob(test_bucket.name,file.name,BLOB_NAME)
784787

‎setup.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"google-cloud-core >= 2.3.0, < 3.0dev",
3434
"google-resumable-media >= 2.6.0",
3535
"requests >= 2.18.0, < 3.0.0dev",
36+
"google-crc32c >= 1.0, < 2.0dev",
3637
]
3738
extras= {"protobuf": ["protobuf<5.0.0dev"]}
3839

‎tests/system/test_transfer_manager.py‎

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,19 @@ def test_download_chunks_concurrently(shared_bucket, file_data):
172172
withopen(trailing_chunk_filename,"rb")asfile_obj:
173173
assert_base64_md5hash(file_obj)==source_file["hash"]
174174

175+
# And for a case where there is only one chunk.
176+
trailing_chunk_filename=os.path.join(tempdir,"chunky_file_3")
177+
transfer_manager.download_chunks_concurrently(
178+
download_blob,
179+
trailing_chunk_filename,
180+
chunk_size=size,
181+
deadline=DEADLINE,
182+
)
183+
withopen(trailing_chunk_filename,"rb")asfile_obj:
184+
assert_base64_md5hash(file_obj)==source_file["hash"]
185+
175186
# Also test threaded mode.
176-
threaded_filename=os.path.join(tempdir,"chunky_file_3")
187+
threaded_filename=os.path.join(tempdir,"chunky_file_4")
177188
transfer_manager.download_chunks_concurrently(
178189
download_blob,
179190
threaded_filename,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp