Movatterモバイル変換


[0]ホーム

URL:


Skip to main content
Ctrl+K
Try Ray with $100 credit —Start now.

Pattern: Using nested tasks to achieve nested parallelism#

In this pattern, a remote task can dynamically call other remote tasks (including itself) for nested parallelism.This is useful when sub-tasks can be parallelized.

Keep in mind, though, that nested tasks come with their own cost: extra worker processes, scheduling overhead, bookkeeping overhead, etc.To achieve speedup with nested parallelism, make sure each of your nested tasks does significant work. SeeAnti-pattern: Over-parallelizing with too fine-grained tasks harms speedup for more details.

Example use case#

You want to quick-sort a large list of numbers.By using nested tasks, we can sort the list in a distributed and parallel fashion.

../../_images/tree-of-tasks.svg

Tree of tasks#

Code example#

importrayimporttimefromnumpyimportrandomdefpartition(collection):# Use the last element as the pivotpivot=collection.pop()greater,lesser=[],[]forelementincollection:ifelement>pivot:greater.append(element)else:lesser.append(element)returnlesser,pivot,greaterdefquick_sort(collection):iflen(collection)<=200000:# magic numberreturnsorted(collection)else:lesser,pivot,greater=partition(collection)lesser=quick_sort(lesser)greater=quick_sort(greater)returnlesser+[pivot]+greater@ray.remotedefquick_sort_distributed(collection):# Tiny tasks are an antipattern.# Thus, in our example we have a "magic number" to# toggle when distributed recursion should be used vs# when the sorting should be done in place. The rule# of thumb is that the duration of an individual task# should be at least 1 second.iflen(collection)<=200000:# magic numberreturnsorted(collection)else:lesser,pivot,greater=partition(collection)lesser=quick_sort_distributed.remote(lesser)greater=quick_sort_distributed.remote(greater)returnray.get(lesser)+[pivot]+ray.get(greater)forsizein[200000,4000000,8000000]:print(f"Array size:{size}")unsorted=random.randint(1000000,size=(size)).tolist()s=time.time()quick_sort(unsorted)print(f"Sequential execution:{(time.time()-s):.3f}")s=time.time()ray.get(quick_sort_distributed.remote(unsorted))print(f"Distributed execution:{(time.time()-s):.3f}")print("--"*10)# Outputs:# Array size: 200000# Sequential execution: 0.040# Distributed execution: 0.152# --------------------# Array size: 4000000# Sequential execution: 6.161# Distributed execution: 5.779# --------------------# Array size: 8000000# Sequential execution: 15.459# Distributed execution: 11.282# --------------------

We callray.get() after bothquick_sort_distributed function invocations take place.This allows you to maximize parallelism in the workload. SeeAnti-pattern: Calling ray.get in a loop harms parallelism for more details.

Notice in the execution times above that with smaller tasks, the non-distributed version is faster. However, as the task executiontime increases, i.e. because the lists to sort are larger, the distributed version is faster.


[8]ページ先頭

©2009-2025 Movatter.jp