Ουρές

Πηγαίος κώδικας:Lib/asyncio/queues.py


Οι ουρές asyncio έχουν σχεδιαστεί ώστε να μοιάζουν με τις κλάσεις του modulequeue. Αν και οι ουρές asyncio δεν είναι ασφαλείς για χρήση με νήματα (thread-safe), έχουν σχεδιαστεί για να χρησιμοποιούνται συγκεκριμένα σε κώδικα async/await.

Σημειώστε ότι οι μέθοδοι των ουρών asyncio δεν διαθέτουν παράμετροtimeout. Χρησιμοποιήστε την συνάρτησηasyncio.wait_for() για να εκτελέσετε λειτουργίες ουράς με χρονικό όριο.

Δείτε επίσης την ενότηταΠαραδείγματα παρακάτω.

Ουρά

classasyncio.Queue(maxsize=0)

Μια ουρά τύπου πρώτος που εισέρχεται, πρώτος που εξέρχεται (FIFO).

Αν η τιμή τουmaxsize είναι λιγότερη ή ίση με το μηδέν, το μέγεθος της ουράς είναι άπειρο. Αν είναι ένας ακέραιος μεγαλύτερος από το0, τότε η εντολήawaitput() μπλοκάρει, όταν η ουρά φτάσει τοmaxsize μέχρι να αφαιρεθεί ένα στοιχείο μέσω της μεθόδουget().

Σε αντίθεση με την ουρά τουqueue στην βιβλιοθήκη threading, το μέγεθος της ουράς είναι πάντα γνωστό και μπορεί να επιστραφεί καλώντας τη μέθοδοqsize().

Άλλαξε στην έκδοση 3.10:Αφαιρέθηκε η παράμετροςloop.

Αυτή η κλάση είναιnot thread safe.

maxsize

Αριθμός στοιχείων που επιτρέπονται στην ουρά.

empty()

ΕπιστρέφειTrue αν η ουρά είναι άδεια, διαφορετικάFalse.

full()

ΕπιστρέφειTrue αν υπάρχουνmaxsize αντικείμενα στην ουρά.

Αν η ουρά αρχικοποιήθηκε μεmaxsize=0 (προεπιλογή), τότε ηfull() δεν επιστρέφει ποτέTrue.

asyncget()

Αφαίρεση και επιστροφή ενός αντικειμένου από την ουρά. Αν η ουρά είναι κενή, περιμένετε μέχρι να είναι διαθέσιμο ένα αντικείμενο.

get_nowait()

Επιστρέφει ένα αντικείμενο, αν είναι άμεσα διαθέσιμο, αλλιώς κάνε raise τηνQueueEmpty.

asyncjoin()

Αποκλείει μέχρι να ληφθούν και να υποβληθούν σε επεξεργασία όλα τα στοιχεία στην ουρά.

Ο αριθμός των ημιτελών εργασιών αυξάνεται κάθε φορά που προστίθεται ένα αντικείμενο στην ουρά. Ο αριθμός μειώνεται όταν μια καταναλωτική coroutine καλεί τη μέθοδοtask_done() για να υποδείξει ότι το αντικείμενο λήφθηκε και η εργασία πάνω του έχει ολοκληρωθεί. Όταν ο αριθμός των ατελείωτων εργασιών μειωθεί στο μηδέν, η μέθοδοςjoin() αποδεσμεύεται.

asyncput(item)

Τοποθετεί ένα αντικείμενο στην ουρά. Αν η ουρά είναι γεμάτη, περιμένετε μέχρι να είναι διαθέσιμη μια ελεύθερη θέση, πριν προσθέσετε το αντικείμενο.

put_nowait(item)

Τοποθετεί ένα αντικείμενο στην ουρά χωρίς να μπλοκάρει.

Αν δεν είναι διαθέσιμη μια ελεύθερη θέση αμέσως, γίνεται raise ηQueueFull.

qsize()

Επιστρέφει τον αριθμό των αντικειμένων στην ουρά.

task_done()

Υποδεικνύει ότι μια εργασία που είχε προστεθεί στην ουρά έχει ολοκληρωθεί.

Χρησιμοποιείται από τους καταναλωτές της ουράς. Για κάθε κλήση τηςget() για να ανακτηθεί μια εργασία, μια επακόλουθη κλήση τηςtask_done() ενημερώνει την ουρά ότι η επεξεργασία της εργασίας έχει ολοκληρωθεί.

Εάν μια κλήση τηςjoin() μπλοκάρει αυτή την στιγμή, θα συνεχιστεί όταν όλα τα αντικείμενα έχουν επεξεργαστεί (σημαίνει ότι λήφθηκε μια κλήση τηςtask_done() για κάθε αντικείμενο που είχε προστεθεί μεput() στην ουρά).

Κάνει raise τηνValueError εάν κληθεί περισσότερες φορές από όσες τα αντικείμενα που είχαν τοποθετηθεί στην ουρά.

Σειρά Προτεραιότητας

classasyncio.PriorityQueue

Μια παραλλαγή τηςQueue; η οποία ανακτά τις καταχωρήσεις με σειρά προτεραιότητας (οι χαμηλότερες πρώτες).

Οι καταχωρήσεις είναι συνήθως της μορφής(priority_number,data).

Ουρά LIFO

classasyncio.LifoQueue

Μια παραλλαγή της κλάσηςQueue που ανακτά τις πιο πρόσφατα προστιθέμενες καταχωρίσεις πρώτες (με τη λογική τελευταίος μέσα, πρώτος έξω).

Εξαιρέσεις

exceptionasyncio.QueueEmpty

Αυτή η εξαίρεση γίνεται raise όταν η μέθοδοςget_nowait() καλείται σε μια άδεια ουρά.

exceptionasyncio.QueueFull

Εξαίρεση που γίνεται raise όταν η μέθοδοςput_nowait() καλείται σε μια ουρά που έχει φτάσει στοmaxsize της.

Παραδείγματα

Οι ουρές μπορούν να χρησιμοποιηθούν για τη διανομή εργασίας μεταξύ αρκετών παράλληλων εργασιών:

importasyncioimportrandomimporttimeasyncdefworker(name,queue):whileTrue:# Get a "work item" out of the queue.sleep_for=awaitqueue.get()# Sleep for the "sleep_for" seconds.awaitasyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.queue.task_done()print(f'{name} has slept for{sleep_for:.2f} seconds')asyncdefmain():# Create a queue that we will use to store our "workload".queue=asyncio.Queue()# Generate random timings and put them into the queue.total_sleep_time=0for_inrange(20):sleep_for=random.uniform(0.05,1.0)total_sleep_time+=sleep_forqueue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.tasks=[]foriinrange(3):task=asyncio.create_task(worker(f'worker-{i}',queue))tasks.append(task)# Wait until the queue is fully processed.started_at=time.monotonic()awaitqueue.join()total_slept_for=time.monotonic()-started_at# Cancel our worker tasks.fortaskintasks:task.cancel()# Wait until all worker tasks are cancelled.awaitasyncio.gather(*tasks,return_exceptions=True)print('====')print(f'3 workers slept in parallel for{total_slept_for:.2f} seconds')print(f'total expected sleep time:{total_sleep_time:.2f} seconds')asyncio.run(main())