Ουρές¶
Πηγαίος κώδικας:Lib/asyncio/queues.py
Οι ουρές asyncio έχουν σχεδιαστεί ώστε να μοιάζουν με τις κλάσεις του modulequeue
. Αν και οι ουρές asyncio δεν είναι ασφαλείς για χρήση με νήματα (thread-safe), έχουν σχεδιαστεί για να χρησιμοποιούνται συγκεκριμένα σε κώδικα async/await.
Σημειώστε ότι οι μέθοδοι των ουρών asyncio δεν διαθέτουν παράμετροtimeout. Χρησιμοποιήστε την συνάρτησηasyncio.wait_for()
για να εκτελέσετε λειτουργίες ουράς με χρονικό όριο.
Δείτε επίσης την ενότηταΠαραδείγματα παρακάτω.
Ουρά¶
- classasyncio.Queue(maxsize=0)¶
Μια ουρά τύπου πρώτος που εισέρχεται, πρώτος που εξέρχεται (FIFO).
Αν η τιμή τουmaxsize είναι λιγότερη ή ίση με το μηδέν, το μέγεθος της ουράς είναι άπειρο. Αν είναι ένας ακέραιος μεγαλύτερος από το
0
, τότε η εντολήawaitput()
μπλοκάρει, όταν η ουρά φτάσει τοmaxsize μέχρι να αφαιρεθεί ένα στοιχείο μέσω της μεθόδουget()
.Σε αντίθεση με την ουρά του
queue
στην βιβλιοθήκη threading, το μέγεθος της ουράς είναι πάντα γνωστό και μπορεί να επιστραφεί καλώντας τη μέθοδοqsize()
.Άλλαξε στην έκδοση 3.10:Αφαιρέθηκε η παράμετροςloop.
Αυτή η κλάση είναιnot thread safe.
- maxsize¶
Αριθμός στοιχείων που επιτρέπονται στην ουρά.
- empty()¶
Επιστρέφει
True
αν η ουρά είναι άδεια, διαφορετικάFalse
.
- full()¶
Επιστρέφει
True
αν υπάρχουνmaxsize
αντικείμενα στην ουρά.Αν η ουρά αρχικοποιήθηκε με
maxsize=0
(προεπιλογή), τότε ηfull()
δεν επιστρέφει ποτέTrue
.
- asyncget()¶
Αφαίρεση και επιστροφή ενός αντικειμένου από την ουρά. Αν η ουρά είναι κενή, περιμένετε μέχρι να είναι διαθέσιμο ένα αντικείμενο.
- get_nowait()¶
Επιστρέφει ένα αντικείμενο, αν είναι άμεσα διαθέσιμο, αλλιώς κάνε raise την
QueueEmpty
.
- asyncjoin()¶
Αποκλείει μέχρι να ληφθούν και να υποβληθούν σε επεξεργασία όλα τα στοιχεία στην ουρά.
Ο αριθμός των ημιτελών εργασιών αυξάνεται κάθε φορά που προστίθεται ένα αντικείμενο στην ουρά. Ο αριθμός μειώνεται όταν μια καταναλωτική coroutine καλεί τη μέθοδο
task_done()
για να υποδείξει ότι το αντικείμενο λήφθηκε και η εργασία πάνω του έχει ολοκληρωθεί. Όταν ο αριθμός των ατελείωτων εργασιών μειωθεί στο μηδέν, η μέθοδοςjoin()
αποδεσμεύεται.
- asyncput(item)¶
Τοποθετεί ένα αντικείμενο στην ουρά. Αν η ουρά είναι γεμάτη, περιμένετε μέχρι να είναι διαθέσιμη μια ελεύθερη θέση, πριν προσθέσετε το αντικείμενο.
- put_nowait(item)¶
Τοποθετεί ένα αντικείμενο στην ουρά χωρίς να μπλοκάρει.
Αν δεν είναι διαθέσιμη μια ελεύθερη θέση αμέσως, γίνεται raise η
QueueFull
.
- qsize()¶
Επιστρέφει τον αριθμό των αντικειμένων στην ουρά.
- task_done()¶
Υποδεικνύει ότι μια εργασία που είχε προστεθεί στην ουρά έχει ολοκληρωθεί.
Χρησιμοποιείται από τους καταναλωτές της ουράς. Για κάθε κλήση της
get()
για να ανακτηθεί μια εργασία, μια επακόλουθη κλήση τηςtask_done()
ενημερώνει την ουρά ότι η επεξεργασία της εργασίας έχει ολοκληρωθεί.Εάν μια κλήση της
join()
μπλοκάρει αυτή την στιγμή, θα συνεχιστεί όταν όλα τα αντικείμενα έχουν επεξεργαστεί (σημαίνει ότι λήφθηκε μια κλήση τηςtask_done()
για κάθε αντικείμενο που είχε προστεθεί μεput()
στην ουρά).Κάνει raise την
ValueError
εάν κληθεί περισσότερες φορές από όσες τα αντικείμενα που είχαν τοποθετηθεί στην ουρά.
Σειρά Προτεραιότητας¶
Ουρά LIFO¶
Εξαιρέσεις¶
- exceptionasyncio.QueueEmpty¶
Αυτή η εξαίρεση γίνεται raise όταν η μέθοδος
get_nowait()
καλείται σε μια άδεια ουρά.
- exceptionasyncio.QueueFull¶
Εξαίρεση που γίνεται raise όταν η μέθοδος
put_nowait()
καλείται σε μια ουρά που έχει φτάσει στοmaxsize της.
Παραδείγματα¶
Οι ουρές μπορούν να χρησιμοποιηθούν για τη διανομή εργασίας μεταξύ αρκετών παράλληλων εργασιών:
importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())