1313# limitations under the License.
1414
1515from __future__import annotations
16+ import asyncio
1617import google_crc32c
1718from google .api_core import exceptions
1819from google_crc32c import Checksum
2930from io import BytesIO
3031from google .cloud import _storage_v2
3132from google .cloud .storage .exceptions import DataCorruption
33+ from google .cloud .storage ._helpers import generate_random_56_bit_integer
3234
3335
3436_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
@@ -78,7 +80,7 @@ class AsyncMultiRangeDownloader:
7880 my_buff2 = BytesIO()
7981 my_buff3 = BytesIO()
8082 my_buff4 = any_object_which_provides_BytesIO_like_interface()
81- results_arr = await mrd.download_ranges(
83+ await mrd.download_ranges(
8284 [
8385 # (start_byte, bytes_to_read, writeable_buffer)
8486 (0, 100, my_buff1),
@@ -88,8 +90,8 @@ class AsyncMultiRangeDownloader:
8890 ]
8991 )
9092
91- for result inresults_arr:
92- print("downloaded bytes", result)
93+ # verify data inbuffers...
94+ assert my_buff2.getbuffer().nbytes == 20
9395
9496
9597 """
@@ -175,6 +177,10 @@ def __init__(
175177self .read_obj_str :Optional [_AsyncReadObjectStream ]= None
176178self ._is_stream_open :bool = False
177179
180+ self ._read_id_to_writable_buffer_dict = {}
181+ self ._read_id_to_download_ranges_id = {}
182+ self ._download_ranges_id_to_pending_read_ids = {}
183+
178184async def open (self )-> None :
179185"""Opens the bidi-gRPC connection to read from the object.
180186
@@ -203,8 +209,8 @@ async def open(self) -> None:
203209return
204210
205211async def download_ranges (
206- self ,read_ranges :List [Tuple [int ,int ,BytesIO ]]
207- )-> List [ Result ] :
212+ self ,read_ranges :List [Tuple [int ,int ,BytesIO ]], lock : asyncio . Lock = None
213+ )-> None :
208214"""Downloads multiple byte ranges from the object into the buffers
209215 provided by user.
210216
@@ -214,9 +220,36 @@ async def download_ranges(
214220 to be provided by the user, and user has to make sure appropriate
215221 memory is available in the application to avoid out-of-memory crash.
216222
217- :rtype: List[:class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Result`]
218- :returns: A list of ``Result`` objects, where each object corresponds
219- to a requested range.
223+ :type lock: asyncio.Lock
224+ :param lock: (Optional) An asyncio lock to synchronize sends and recvs
225+ on the underlying bidi-GRPC stream. This is required when multiple
226+ coroutines are calling this method concurrently.
227+
228+ i.e. Example usage with multiple coroutines:
229+
230+ ```
231+ lock = asyncio.Lock()
232+ task1 = asyncio.create_task(mrd.download_ranges(ranges1, lock))
233+ task2 = asyncio.create_task(mrd.download_ranges(ranges2, lock))
234+ await asyncio.gather(task1, task2)
235+
236+ ```
237+
238+ If user want to call this method serially from multiple coroutines,
239+ then providing a lock is not necessary.
240+
241+ ```
242+ await mrd.download_ranges(ranges1)
243+ await mrd.download_ranges(ranges2)
244+
245+ # ... some other code code...
246+
247+ ```
248+
249+
250+ :raises ValueError: if the underlying bidi-GRPC stream is not open.
251+ :raises ValueError: if the length of read_ranges is more than 1000.
252+ :raises DataCorruption: if a checksum mismatch is detected while reading data.
220253
221254 """
222255
@@ -228,32 +261,43 @@ async def download_ranges(
228261if not self ._is_stream_open :
229262raise ValueError ("Underlying bidi-gRPC stream is not open" )
230263
231- read_id_to_writable_buffer_dict = {}
232- results = []
264+ if lock is None :
265+ lock = asyncio .Lock ()
266+
267+ _func_id = generate_random_56_bit_integer ()
268+ read_ids_in_current_func = set ()
233269for i in range (0 ,len (read_ranges ),_MAX_READ_RANGES_PER_BIDI_READ_REQUEST ):
234270read_ranges_segment = read_ranges [
235271i :i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
236272 ]
237273
238274read_ranges_for_bidi_req = []
239275for j ,read_range in enumerate (read_ranges_segment ):
240- read_id = i + j
241- read_id_to_writable_buffer_dict [read_id ]= read_range [2 ]
276+ read_id = generate_random_56_bit_integer ()
277+ read_ids_in_current_func .add (read_id )
278+ self ._read_id_to_download_ranges_id [read_id ]= _func_id
279+ self ._read_id_to_writable_buffer_dict [read_id ]= read_range [2 ]
242280bytes_requested = read_range [1 ]
243- results .append (Result (bytes_requested ))
244281read_ranges_for_bidi_req .append (
245282_storage_v2 .ReadRange (
246283read_offset = read_range [0 ],
247284read_length = bytes_requested ,
248285read_id = read_id ,
249286 )
250287 )
251- await self .read_obj_str .send (
252- _storage_v2 .BidiReadObjectRequest (read_ranges = read_ranges_for_bidi_req )
253- )
288+ async with lock :
289+ await self .read_obj_str .send (
290+ _storage_v2 .BidiReadObjectRequest (
291+ read_ranges = read_ranges_for_bidi_req
292+ )
293+ )
294+ self ._download_ranges_id_to_pending_read_ids [
295+ _func_id
296+ ]= read_ids_in_current_func
254297
255- while len (read_id_to_writable_buffer_dict )> 0 :
256- response = await self .read_obj_str .recv ()
298+ while len (self ._download_ranges_id_to_pending_read_ids [_func_id ])> 0 :
299+ async with lock :
300+ response = await self .read_obj_str .recv ()
257301
258302if response is None :
259303raise Exception ("None response received, something went wrong." )
@@ -277,16 +321,15 @@ async def download_ranges(
277321 )
278322
279323read_id = object_data_range .read_range .read_id
280- buffer = read_id_to_writable_buffer_dict [read_id ]
324+ buffer = self . _read_id_to_writable_buffer_dict [read_id ]
281325buffer .write (data )
282- results [read_id ].bytes_written += len (data )
283326
284327if object_data_range .range_end :
285- del read_id_to_writable_buffer_dict [
286- object_data_range . read_range . read_id
287- ]
288-
289- return results
328+ tmp_dn_ranges_id = self . _read_id_to_download_ranges_id [ read_id ]
329+ self . _download_ranges_id_to_pending_read_ids [
330+ tmp_dn_ranges_id
331+ ]. remove ( read_id )
332+ del self . _read_id_to_download_ranges_id [ read_id ]
290333
291334async def close (self ):
292335"""