Edge Features are under active development and may change frequently.
concurrent-ruby-edge are expected to movetoconcurrent-ruby when finalised.Modules:Actor,Async,Concern,Edge,ErlangActor,ImmutableStruct,MutableStruct,Promises,SettableStructClasses:Agent,Array,Atom,AtomicBoolean,AtomicFixnum,AtomicMarkableReference,AtomicReference,CachedThreadPool,Cancellation,Channel,ConcurrentUpdateError,CountDownLatch,CyclicBarrier,Delay,Event,Exchanger,FiberLocalVar,FixedThreadPool,Future,Hash,IVar,ImmediateExecutor,IndirectImmediateExecutor,LazyRegister,LockFreeStack,LockLocalVar,MVar,Map,Maybe,MultipleAssignmentError,MultipleErrors,ProcessingActor,Promise,ReadWriteLock,ReentrantReadWriteLock,SafeTaskExecutor,ScheduledTask,Semaphore,SerializedExecution,SerializedExecutionDelegator,Set,SimpleExecutorService,SingleThreadExecutor,TVar,ThreadLocalVar,ThreadPoolExecutor,Throttle,TimerSet,TimerTask,Tuple,WrappingExecutor
Class.new(StandardError)
Raised when errors occur during configuration.
Class.new(Error)
Raised when an asynchronous operation is cancelled before execution.
Class.new(Error)
Raised when a lifecycle method (such asstop) is called in an impropersequence or when the object is in an inappropriate state.
Class.new(Error)
Raised when an attempt is made to violate an immutability guarantee.
Class.new(Error)
Raised when an operation is attempted which is not legal given thereceiver's current state
Class.new(Error)
Raised when an object's methods are called when it has not beenproperly initialized.
Class.new(Error)
Raised when an object with a start/stop lifecycle has been started anexcessive number of times. Often used in conjunction with a restartpolicy or strategy.
Class.new(Error)
Raised by anExecutor when it is unable to process a given task,possibly because of a reject policy or other internal error.
Class.new(Error)
Raised when any finite resource, such as a lock counter, exceeds itsmaximum limit/threshold.
Class.new(Error)
Raised when an operation times out.
Class.new(Error)
Class.new(StandardError)
'1.3.5'Suppresses all output when used for logging.
lambda{|level,progname,=nil,&block|}
'0.7.2'Abort a currently running transaction - seeConcurrent::atomically.
Run a block that reads and writesTVars as a single atomic transaction.
Number of processors cores available for process scheduling.
The maximum number of processors cores available for process scheduling.
The CPU shares requested by the process.
Create a simple logger with provided level and output.
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
Has no effect since it is no longer needed, seehttps://github.com/ruby-concurrency/concurrent-ruby/pull/841.
General access point to global executors.
Global thread pool optimized for short, fastoperations.
Global thread pool optimized for long, blocking (IO)tasks.
Global thread pool user for globaltimers.
Leave a transaction without committing or aborting - seeConcurrent::atomically.
Returns the current time as tracked by the application monotonic clock.
Number of physical processor cores on the current system.
Number of processors seen by the OS and used for process scheduling.
Use logger created by #create_simple_logger to log concurrent-ruby messages.
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
Create a new thread pool.
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
Abort a currently running transaction - seeConcurrent::atomically.
Raises:
139140141 | # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139defabort_transactionraiseTransaction::AbortError.newend |
Run a block that reads and writesTVars as a single atomic transaction.With respect to the value ofTVar objects, the transaction is atomic, inthat it either happens or it does not, consistent, in that theTVarobjects involved will never enter an illegal state, and isolated, in thattransactions never interfere with each other. You may recognise theseproperties from database transactions.
There are some very important and unusual semantics that you must be aware of:
Most importantly, the block that you pass to atomically may be executedmore than once. In most cases your code should be free ofside-effects, except for via TVar.
If an exception escapes an atomically block it will abort the transaction.
It is undefined behaviour to use callcc or Fiber with atomically.
If you create a new thread within an atomically, it will not be part ofthe transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
a=newTVar(100_000)b=newTVar(100)Concurrent::atomicallydoa.value-=10b.value+=10endRaises:
828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 | # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82defatomicallyraiseArgumentError.new('no block given')unlessblock_given?# Get the current transactiontransaction=Transaction::current# Are we not already in a transaction (not nested)?iftransaction.nil?# New transactionbegin# Retry looploopdo# Create a new transactiontransaction=Transaction.newTransaction::current=transaction# Run the block, aborting on exceptionsbeginresult=yieldrescueTransaction::AbortError=>etransaction.abortresult=Transaction::ABORTEDrescueTransaction::LeaveError=>etransaction.abortbreakresultrescue=>etransaction.abortraiseeend# If we can commit, break out of the loopifresult!=Transaction::ABORTEDiftransaction.commitbreakresultendendendensure# Clear the current transactionTransaction::current=nilendelse# Nested transaction - flatten it and just run the blockyieldendend |
Number of processors cores available for process scheduling.This method takes in account the CPU quota if the process is inside a cgroup with adedicated CPU quota (typically Docker).Otherwise it returns the same value as #processor_count but as a Float.
For performance reasons the calculated value will be memoized on the firstcall.
Returns:
number of available processors
194195196 | # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 194defself.available_processor_countprocessor_counter.available_processor_countend |
Raises:
565758596061626364656667686970717273747576777879 | # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56defcall_dataflow(method,executor,*inputs,&block)raiseArgumentError.new('an executor must be provided')ifexecutor.nil?raiseArgumentError.new('no block given')unlessblock_given?unlessinputs.all?{|input|input.is_a?IVar}raiseArgumentError.new("Not all dependencies are IVars.\nDependencies:#{inputs.inspect}")endresult=Future.new(executor:executor)dovalues=inputs.map{|input|input.send(method)}block.call(*values)endifinputs.empty?result.executeelsecounter=DependencyCounter.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observercounterendendresultend |
The maximum number of processors cores available for process scheduling.Returnsnil if there is no enforced limit, or aFloat if theprocess is inside a cgroup with a dedicated CPU quota (typically Docker).
Note that nothing prevents setting a CPU quota higher than the actual number ofcores on the system.
For performance reasons the calculated value will be memoized on the firstcall.
Returns:
Maximum number of available processors as set by a cgroup CPU quota, or nil if none set
209210211 | # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 209defself.cpu_quotaprocessor_counter.cpu_quotaend |
The CPU shares requested by the process. For performance reasons the calculatedvalue will be memoized on the first call.
Returns:
CPU shares requested by the process, or nil if not set
217218219 | # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 217defself.cpu_sharesprocessor_counter.cpu_sharesend |
Create a simple logger with provided level and output.
3839404142434445464748495051525354555657585960616263 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 38defself.create_simple_logger(level=:FATAL,output=$stderr)level=Concern::Logging.const_get(level)unlesslevel.is_a?(Integer)# TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlockinglambdado|severity,progname,=nil,&block|returnfalseifseverity<level=block?block.call:=casewhenStringwhenExceptionformat"%s (%s)\n%s",.,.class,(.backtrace||[]).join("\n")else.inspectendoutput.printformat"[%s] %5s -- %s: %s\n",Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),Concern::Logging::SEV_LABEL[severity],progname,trueendend |
Create a stdlib logger with provided level and output.If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+.
73747576777879808182838485868788899091929394959697 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 73defself.create_stdlib_logger(level=:FATAL,output=$stderr)require'logger'logger=Logger.new(output)logger.level=levellogger.formatter=lambdado|severity,datetime,progname,msg|=casemsgwhenStringmsgwhenExceptionformat"%s (%s)\n%s",msg.,msg.class,(msg.backtrace||[]).join("\n")elsemsg.inspectendformat"[%s] %5s -- %s: %s\n",datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),severity,progname,endlambdado|loglevel,progname,=nil,&block|logger.addloglevel,,progname,&blockendend |
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.Data dependencies areFuture values. The dataflow task itself is also aFuture value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.
Our syntax is somewhat related to that of Akka'sflow and Habanero Java'sDataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.
The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
A dataflow task is created with thedataflow method, passing in a block.
task=Concurrent::dataflow{14}This produces a simpleFuture value. The task will run immediately, as it has no dependencies. We can also specifyFuture values that must be available before a task will run. When we do this we get the value of those futures passed to our block.
a=Concurrent::dataflow{1}b=Concurrent::dataflow{2}c=Concurrent::dataflow(a,b){|av,bv|av+bv}Using thedataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.
This section describes how we could derive dataflow from other primitives in this library.
Consider a naive fibonacci calculator.
deffib(n)ifn<2nelsefib(n-1)+fib(n-2)endendputsfib(14)#=> 377We could modify this to use futures.
deffib(n)ifn<2Concurrent::Future.new{n}elsen1=fib(n-1).executen2=fib(n-2).executeConcurrent::Future.new{n1.value+n2.value}endendf=fib(14)#=> #f.execute#=> #sleep(0.5)putsf.value#=> 377 One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.
To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.
classCountingObserverdefinitialize(count,&block)@count=count@block=blockenddefupdate(time,value,reason)@count-=1if@count<=0@block.call()endendenddeffib(n)ifn<2Concurrent::Future.new{n}.executeelsen1=fib(n-1)n2=fib(n-2)result=Concurrent::Future.new{n1.value+n2.value}=CountingObserver.new(2){result.execute}n1.add_observern2.add_observern1.executen2.executeresultendendWe can wrap this up in a dataflow utility.
f=fib(14)#=> #sleep(0.5)putsf.value#=> 377defdataflow(*inputs,&block)result=Concurrent::Future.new(&block)ifinputs.empty?result.executeelse=CountingObserver.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observerendendresultenddeffib(n)ifn<2dataflow{n}elsen1=fib(n-1)n2=fib(n-2)dataflow(n1,n2){n1.value+n2.value}endendf=fib(14)#=> #sleep(0.5)putsf.value#=> 377 Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.
defdataflow(*inputs,&block)result=Concurrent::Future.newdovalues=inputs.map{|input|input.value}block.call(*values)endifinputs.empty?result.executeelse=CountingObserver.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observerendendresultenddeffib(n)ifn<2Concurrent::dataflow{n}elsen1=fib(n-1)n2=fib(n-2)Concurrent::dataflow(n1,n2){|v1,v2|v1+v2}endendf=fib(14)#=> #sleep(0.5)putsf.value#=> 377 Parameters:
zero or moreFuture operations that this dataflow depends upon
Yields:
The operation to perform once all the dependencies are met
Yield Parameters:
each of theFuture inputs to the dataflow
Yield Returns:
the result of the block operation
Returns:
the result of all the operations
Raises:
if no block is given
if any of the inputs are notIVars
343536 | # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34defdataflow(*inputs,&block)dataflow_with(Concurrent.global_io_executor,*inputs,&block)end |
444546 | # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44defdataflow!(*inputs,&block)dataflow_with!(Concurrent.global_io_executor,*inputs,&block)end |
394041 | # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39defdataflow_with(executor,*inputs,&block)call_dataflow(:value,executor,*inputs,&block)end |
495051 | # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49defdataflow_with!(executor,*inputs,&block)call_dataflow(:value!,executor,*inputs,&block)end |
Has no effect since it is no longer needed, seehttps://github.com/ruby-concurrency/concurrent-ruby/pull/841.
this option should be needed only because ofat_exit orderingissues which may arise when running some of the testing frameworks.E.g. Minitest's test-suite runs itself inat_exit callback whichexecutes after the pools are already terminated. Then auto terminationneeds to be disabled and called manually after test-suite ends.
This method shouldnever be calledfrom within a gem. It shouldonly be used from within the mainapplication and even then it should be used only when necessary.
Disables AtExit handlers including pool auto-termination handlers.When disabled it will be the application programmer's responsibilityto ensure that the handlers are shutdown properly prior to applicationexit by callingAtExit.run method.
484950 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 48defself.disable_at_exit_handlers!deprecated"Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."end |
General access point to global executors.
Parameters:
symbols:
Returns:
838485 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 83defself.executor(executor_identifier)Options.executor(executor_identifier)end |
Global thread pool optimized for short, fastoperations.
Returns:
the thread pool
555657 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 55defself.global_fast_executorGLOBAL_FAST_EXECUTOR.value!end |
666768 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 66defself.global_immediate_executorGLOBAL_IMMEDIATE_EXECUTORend |
Global thread pool optimized for long, blocking (IO)tasks.
Returns:
the thread pool
626364 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 62defself.global_io_executorGLOBAL_IO_EXECUTOR.value!end |
114115116 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 114defself.global_loggerGLOBAL_LOGGER.valueend |
118119120 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 118defself.global_logger=(value)GLOBAL_LOGGER.value=valueend |
Global thread pool user for globaltimers.
Returns:
the thread pool
737475 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 73defself.global_timer_setGLOBAL_TIMER_SET.value!end |
Leave a transaction without committing or aborting - seeConcurrent::atomically.
Raises:
144145146 | # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144defleave_transactionraiseTransaction::LeaveError.newend |
Time calculations on all platforms and languages are sensitive tochanges to the system clock. To alleviate the potential problemsassociated with changing the system clock while an application is running,most modern operating systems provide a monotonic clock that operatesindependently of the system clock. A monotonic clock cannot be used todetermine human-friendly clock times. A monotonic clock is used exclusivelyfor calculating time intervals. Not all Ruby platforms provide access to anoperating system monotonic clock. On these platforms a pure-Ruby monotonicclock will be used as a fallback. An operating system monotonic clock is bothfaster and more reliable than the pure-Ruby implementation. The pure-Rubyimplementation should be fast and reliable enough for most non-realtimeoperations. At this time the common Ruby platforms that provide access to anoperating system monotonic clock are MRI 2.1 and above and JRuby (all versions).
Returns the current time as tracked by the application monotonic clock.
Parameters:
the time unit to be returned, can be either:float_second, :float_millisecond, :float_microsecond, :second,:millisecond, :microsecond, or :nanosecond default to :float_second.
Returns:
The current monotonic time since some unspecifiedstarting point
See Also:
151617 | # File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 15defmonotonic_time(unit=:float_second)Process.clock_gettime(Process::CLOCK_MONOTONIC,unit)end |
87888990919293949596 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 87defself.new_fast_executor(opts={})FixedThreadPool.new([2,Concurrent.processor_count].max,auto_terminate:opts.fetch(:auto_terminate,true),idletime:60,# 1 minutemax_queue:0,# unlimitedfallback_policy::abort,# shouldn't matter -- 0 max queuename:"fast")end |
9899100101102103104 | # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 98defself.new_io_executor(opts={})CachedThreadPool.new(auto_terminate:opts.fetch(:auto_terminate,true),fallback_policy::abort,# shouldn't matter -- 0 max queuename:"io")end |
Number of physical processor cores on the current system. For performancereasons the calculated value will be memoized on the first call.
On Windows the Win32 API will be queried for theNumberOfCores fromWin32_Processor. This will return the total number "of cores for thecurrent instance of the processor." On Unix-like operating systems eitherthehwprefs orsysctl utility will be called in a subshell and thereturned value will be used. In the rare case where none of these methodswork or an exception is raised the function will simply return 1.
Returns:
number physical processor cores on the current system
See Also:
181182183 | # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 181defself.physical_processor_countprocessor_counter.physical_processor_countend |
Number of processors seen by the OS and used for process scheduling. Forperformance reasons the calculated value will be memoized on the firstcall.
When running under JRuby the Java runtime calljava.lang.Runtime.getRuntime.availableProcessors will be used. Accordingto the Java documentation this "value may change during a particularinvocation of the virtual machine... [applications] should thereforeoccasionally poll this property." We still memoize this value once underJRuby.
Otherwise Ruby's Etc.nprocessors will be used.
Returns:
number of processors seen by the OS or Java runtime
See Also:
160161162 | # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 160defself.processor_countprocessor_counter.processor_countend |
Use logger created by #create_simple_logger to log concurrent-ruby messages.
666768 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 66defself.use_simple_logger(level=:FATAL,output=$stderr)Concurrent.global_logger=create_simple_loggerlevel,outputend |
Use logger created by #create_stdlib_logger to log concurrent-ruby messages.
101102103 | # File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 101defself.use_stdlib_logger(level=:FATAL,output=$stderr)Concurrent.global_logger=create_stdlib_loggerlevel,outputend |
Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.
In some edge cases when atimeout is given a return value ofnil may beambiguous. Specifically, ifnil is a valid value in the exchange it willbe impossible to tell whethernil is the actual return value or if itsignifies timeout. Whennil is a valid value in the exchange considerusing#exchange! or#try_exchange instead.
Parameters:
the value to exchange with another thread
in seconds,nil blocks indefinitely
Returns:
the value exchanged by the other thread ornil on timeout
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 341 |
Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.
On timeout aTimeoutError exception will be raised.
Parameters:
the value to exchange with another thread
in seconds,nil blocks indefinitely
Returns:
the value exchanged by the other thread
Raises:
on timeout
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 345 |
Create a new thread pool.
Options Hash (opts):
the policy for handling newtasks that are received when the queue size has reachedmax_queue or the executor has shut down
Raises:
if:fallback_policy is not one of the values specifiedinFALLBACK_POLICIES
See Also:
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 338 |
Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.
The return value will be aMaybe set toJust on success orNothing on timeout.
exchanger=Concurrent::Exchanger.newresult=exchanger.exchange(:foo,0.5)ifresult.just?putsresult.value#=> :barelseputs'timeout'endParameters:
the value to exchange with another thread
in seconds,nil blocks indefinitely
Returns:
on success aJust maybe will be returned withthe item exchanged by the other thread as#value; on timeout aNothing maybe will be returned withTimeoutError as#reason
# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 349 |