Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Concurrent iterators and pipeline operations.

License

NotificationsYou must be signed in to change notification settings

amphp/pipeline

Repository files navigation

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind.amphp/pipeline provides concurrent iterators and collection operators.

Installation

This package can be installed as aComposer dependency.

composer require amphp/pipeline

Requirements

This package requires PHP 8.1 or later.

Usage

Using fiber-based coroutines, asynchronous sets can now be created and consumed within a single fiber using PHP's built-inIterator. Attempting to consume anIterator instance from multiple fibers is problematic though, as one fiber may modify the state of the iterator while another is suspended.

This library provides aConcurrentIterator interface which provides a fiber-safe iterator that may be consumed by multiple fibers concurrently, as well as tools for creating asynchronous sets.

Concurrent Iterators

AConcurrentIterator may be used in place of anIterator, meaning it can be used withforeach,yield from,iterator_to_array(), argument unpacking, and more!

Like anIterator, aConcurrentIterator may also be iterated manually, with separate methods for advancing and retrieving the current value.

useAmp\Pipeline\Pipeline;// Pipeline::getIterator() returns a ConcurrentIterator$concurrentIterator = Pipeline::fromIterable([1,2,3])->getIterator();while ($concurrentIterator->continue()) {$position =$concurrentIterator->getPosition();$value =$concurrentIterator->getValue();// ...}// Equivalently, multiple fibers may consume a single ConcurrentIterator// instance using foreach.$concurrentIterator = Pipeline::fromIterable([1,2,3])->getIterator();foreach ($concurrentIteratoras$position =>$value) {// ...}

continue() suspends the current fiber until a value becomes available or the iterator completes, returningtrue orfalse respectively. An exception is thrown fromcontinue() the source of the iterator throws an exception while generating the next value.

getValue() returns the last value emitted on the iterator within the current fiber. The return value of this function will not change within the current fiber untilcontinue() is called again.continue() must be invoked and return before this method can be called.

getPosition() returns the current 0-indexed position within the current iterator. If consuming from multiple fibers, this value may not be sequential within a single fiber. Similar togetValue(),continue() must be invoked and return before this method can be called.

NoteIn general, it is not necessary to call these methods directly within application code. Concurrent iterators typically should be used withforeach.

Queue

AQueue is used to create an asynchronous set of values with the ability to await consumption of the values produced, providing back-pressure to the production of more values, so consumption and production can be synchronized.

Values may be added to aQueue in two ways.

  • push() adds the value to the queue, only returning once the value has been consumed from the queue.
  • pushAsync() adds the value to the queue, returning aFuture immediately which is completed only once the value has been consumed from the queue.
useAmp\Pipeline\Queue;usefunctionAmp\async;usefunctionAmp\delay;$queue =newQueue();$start =\microtime(true);$elapsed =fn () =>\microtime(true) -$start;// Generate values in a separate fiberasync(function ()use ($queue,$elapsed):void {printf("Starting production loop at %.3fs\n",$elapsed());foreach (range(1,10)as$value) {delay(0.1);// Production of a value takes between 100ms$queue->push($value);    }printf("Completing production loop at %.3fs\n",$elapsed());// Queue must be completed, otherwise foreach loop below will never exit!$queue->complete();});foreach ($queue->iterate()as$value) {printf("Iterator yielded %d at %.3fs\n",$value,$elapsed());delay(0.5);// Listener consumption takes 500 ms}

Once all values have been pushed into aQueue, the producer must callcomplete() to end the concurrent iterator. Failure to do so will leave the consumer suspended indefinitely. Alternatively to indicate an error, the producer may useerror() to throw an exception to the concurrent iterator consumer and end the concurrent iterator.

DisposedException

If the consumer of the concurrent iterator generated by theQueue is destroyed,push() will throw aDisposedException (or the future returned frompushAsync() will error with aDisposedException). This indicates that no additional values need to be generated since consumption of those values has ended. If for some reason the producer wishes to continue (e.g., to consume bytes from a buffer), either catch the exception or ignore the future. (TheDisposedException instance is created only once for each queue.)

Pipeline

APipeline represents an asynchronous set and provides operations which can be applied over the set.

useAmp\Pipeline\Pipeline;usefunctionAmp\delay;$pipeline = Pipeline::fromIterable(function ():\Generator {for ($i =0;$i <100; ++$i) {yield$i;    }});$pipeline =$pipeline    ->concurrent(10)// Process up to 10 items concurrently    ->unordered()// Results may be consumed eagerly and out of order    ->tap(fn () =>delay(random_int(1,10) /10))// Observe each value with a delay for 0.1 to 1 seconds, simulating I/O    ->map(fn (int$input) =>$input *10)// Apply an operation to each value    ->filter(fn (int$input) =>$input %3 ===0);// Filter only values divisible by 3foreach ($pipelineas$value) {echo$value,"\n";}

Alternatively,Pipeline also has methods which consume the set, such asforEach() orreduce(), which return only once the set is complete or throws an exception.

useAmp\Pipeline\Pipeline;Pipeline::generate(function ():int {static$v =0;return ++$v; })    ->take(10)// Take only 10 values from the generation function.    ->concurrent(3)// Process 3 values concurrently    ->delay(1)// Delay for 1 second to simulate I/O    ->forEach(function (int$value):void {echo$value,"\n";    });

Versioning

amphp/pipeline follows thesemver semantic versioning specification like all otheramphp packages.

Security

If you discover any security related issues, please emailcontact@amphp.org instead of using the issue tracker.

License

The MIT License (MIT). Please seeLICENSE for more information.

About

Concurrent iterators and pipeline operations.

Resources

License

Security policy

Stars

Watchers

Forks

Sponsor this project

 

Languages


[8]ページ先頭

©2009-2025 Movatter.jp