Movatterモバイル変換


[0]ホーム

URL:


Index (C) »Concurrent

Module: Concurrent

Defined in:
lib/concurrent-ruby/concurrent.rb,
lib/concurrent-ruby/concurrent/map.rb,
lib/concurrent-ruby/concurrent/set.rb,
lib/concurrent-ruby/concurrent/atom.rb,
lib/concurrent-ruby/concurrent/hash.rb,
lib/concurrent-ruby/concurrent/ivar.rb,
lib/concurrent-ruby/concurrent/mvar.rb,
lib/concurrent-ruby/concurrent/tvar.rb,
lib/concurrent-ruby/concurrent/agent.rb,
lib/concurrent-ruby/concurrent/array.rb,
lib/concurrent-ruby/concurrent/async.rb,
lib/concurrent-ruby/concurrent/delay.rb,
lib/concurrent-ruby/concurrent/maybe.rb,
lib/concurrent-ruby/concurrent/tuple.rb,
lib/concurrent-ruby/concurrent/errors.rb,
lib/concurrent-ruby/concurrent/future.rb,
lib/concurrent-ruby/concurrent/options.rb,
lib/concurrent-ruby/concurrent/promise.rb,
lib/concurrent-ruby/concurrent/version.rb,
lib/concurrent-ruby/concurrent/dataflow.rb,
lib/concurrent-ruby/concurrent/promises.rb,
lib/concurrent-ruby/concurrent/constants.rb,
lib/concurrent-ruby/concurrent/exchanger.rb,
lib/concurrent-ruby/concurrent/re_include.rb,
lib/concurrent-ruby/concurrent/timer_task.rb,
lib/concurrent-ruby/concurrent/atomic/event.rb,
lib/concurrent-ruby/concurrent/atomic/locals.rb,
lib/concurrent-ruby/concurrent/configuration.rb,
lib/concurrent-ruby/concurrent/mutable_struct.rb,
lib/concurrent-ruby/concurrent/scheduled_task.rb,
lib/concurrent-ruby/concurrent/utility/engine.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/settable_struct.rb,
lib/concurrent-ruby/concurrent/synchronization.rb,
lib/concurrent-ruby/concurrent/atomic/semaphore.rb,
lib/concurrent-ruby/concurrent/immutable_struct.rb,
lib/concurrent-ruby/concurrent/thread_safe/util.rb,
lib/concurrent-ruby/concurrent/concern/obligation.rb,
lib/concurrent-ruby/concurrent/concern/observable.rb,
lib/concurrent-ruby/concurrent/executor/timer_set.rb,
lib/concurrent-ruby/concurrent/concern/deprecation.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/synchronization/lock.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb,
lib/concurrent-ruby/concurrent/synchronization/object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb,
lib/concurrent-ruby/concurrent/utility/monotonic_time.rb,
lib/concurrent-ruby/concurrent/utility/native_integer.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_reference.rb,
lib/concurrent-ruby/concurrent/atomic/count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb,
lib/concurrent-ruby/concurrent/concern/dereferenceable.rb,
lib/concurrent-ruby/concurrent/synchronization/volatile.rb,
lib/concurrent-ruby/concurrent/executor/executor_service.rb,
lib/concurrent-ruby/concurrent/synchronization/condition.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/volatile.rb,
lib/concurrent-ruby/concurrent/utility/processor_counter.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/collection/lock_free_stack.rb,
lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/striped64.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/safe_task_executor.rb,
lib/concurrent-ruby/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution.rb,
lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/java_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb,
lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb,
lib/concurrent-ruby/concurrent/utility/native_extension_loader.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_markable_reference.rb,
lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent-ruby/concurrent/executor/serial_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent-ruby/concurrent/synchronization/full_memory_barrier.rb,
lib/concurrent-ruby/concurrent/synchronization/safe_initialization.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent-ruby/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent-ruby/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/atomic_reference/atomic_direct_update.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/map/truffleruby_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent-ruby/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent-ruby-edge/concurrent/edge.rb,
lib/concurrent-ruby-edge/concurrent/actor.rb,
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/actor/core.rb,
lib/concurrent-ruby-edge/concurrent/actor/root.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils.rb,
lib/concurrent-ruby-edge/concurrent/actor/errors.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/edge/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge/version.rb,
lib/concurrent-ruby-edge/concurrent/actor/context.rb,
lib/concurrent-ruby-edge/concurrent/edge/promises.rb,
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb,
lib/concurrent-ruby-edge/concurrent/lazy_register.rb,
lib/concurrent-ruby-edge/concurrent/actor/envelope.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/reference.rb,
lib/concurrent-ruby-edge/concurrent/actor/type_check.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb,
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_queue.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/public_delegations.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb,
lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent-ruby-edge/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb

Overview

Note:

Edge Features are under active development and may change frequently.

  • Deprecations are not added before incompatible changes.
  • Edge version:major is always 0,minor bump means incompatible change,patch bump means compatible change.
  • Edge features may also lack tests and documentation.
  • Features developed inconcurrent-ruby-edge are expected to movetoconcurrent-ruby when finalised.

Defined Under Namespace

Modules:Actor,Async,Concern,Edge,ErlangActor,ImmutableStruct,MutableStruct,Promises,SettableStructClasses:Agent,Array,Atom,AtomicBoolean,AtomicFixnum,AtomicMarkableReference,AtomicReference,CachedThreadPool,Cancellation,Channel,ConcurrentUpdateError,CountDownLatch,CyclicBarrier,Delay,Event,Exchanger,FiberLocalVar,FixedThreadPool,Future,Hash,IVar,ImmediateExecutor,IndirectImmediateExecutor,LazyRegister,LockFreeStack,LockLocalVar,MVar,Map,Maybe,MultipleAssignmentError,MultipleErrors,ProcessingActor,Promise,ReadWriteLock,ReentrantReadWriteLock,SafeTaskExecutor,ScheduledTask,Semaphore,SerializedExecution,SerializedExecutionDelegator,Set,SimpleExecutorService,SingleThreadExecutor,TVar,ThreadLocalVar,ThreadPoolExecutor,Throttle,TimerSet,TimerTask,Tuple,WrappingExecutor

Constant Summarycollapse

Error =
Class.new(StandardError)
ConfigurationError =

Raised when errors occur during configuration.

Class.new(Error)
CancelledOperationError =

Raised when an asynchronous operation is cancelled before execution.

Class.new(Error)
LifecycleError =

Raised when a lifecycle method (such asstop) is called in an impropersequence or when the object is in an inappropriate state.

Class.new(Error)
ImmutabilityError =

Raised when an attempt is made to violate an immutability guarantee.

Class.new(Error)
IllegalOperationError =

Raised when an operation is attempted which is not legal given thereceiver's current state

Class.new(Error)
InitializationError =

Raised when an object's methods are called when it has not beenproperly initialized.

Class.new(Error)
MaxRestartFrequencyError =

Raised when an object with a start/stop lifecycle has been started anexcessive number of times. Often used in conjunction with a restartpolicy or strategy.

Class.new(Error)
RejectedExecutionError =

Raised by anExecutor when it is unable to process a given task,possibly because of a reject policy or other internal error.

Class.new(Error)
ResourceLimitError =

Raised when any finite resource, such as a lock counter, exceeds itsmaximum limit/threshold.

Class.new(Error)
TimeoutError =

Raised when an operation times out.

Class.new(Error)
PromiseExecutionError =
Class.new(StandardError)
VERSION =
'1.3.5'
NULL_LOGGER =

Suppresses all output when used for logging.

lambda{|level,progname,message=nil,&block|}
EDGE_VERSION =
'0.7.2'

Class Method Summarycollapse

Instance Method Summarycollapse

Class Method Details

.abort_transactionundocumented

Abort a currently running transaction - seeConcurrent::atomically.

Raises:

139140141
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139defabort_transactionraiseTransaction::AbortError.newend

.atomicallyundocumented

Run a block that reads and writesTVars as a single atomic transaction.With respect to the value ofTVar objects, the transaction is atomic, inthat it either happens or it does not, consistent, in that theTVarobjects involved will never enter an illegal state, and isolated, in thattransactions never interfere with each other. You may recognise theseproperties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executedmore than once. In most cases your code should be free ofside-effects, except for via TVar.

  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part ofthe transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

Examples:

a=newTVar(100_000)b=newTVar(100)Concurrent::atomicallydoa.value-=10b.value+=10end

Raises:

  • (ArgumentError)
828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82defatomicallyraiseArgumentError.new('no block given')unlessblock_given?# Get the current transactiontransaction=Transaction::current# Are we not already in a transaction (not nested)?iftransaction.nil?# New transactionbegin# Retry looploopdo# Create a new transactiontransaction=Transaction.newTransaction::current=transaction# Run the block, aborting on exceptionsbeginresult=yieldrescueTransaction::AbortError=>etransaction.abortresult=Transaction::ABORTEDrescueTransaction::LeaveError=>etransaction.abortbreakresultrescue=>etransaction.abortraiseeend# If we can commit, break out of the loopifresult!=Transaction::ABORTEDiftransaction.commitbreakresultendendendensure# Clear the current transactionTransaction::current=nilendelse# Nested transaction - flatten it and just run the blockyieldendend

.available_processor_countFloat

Number of processors cores available for process scheduling.This method takes in account the CPU quota if the process is inside a cgroup with adedicated CPU quota (typically Docker).Otherwise it returns the same value as #processor_count but as a Float.

For performance reasons the calculated value will be memoized on the firstcall.

Returns:

  • (Float)

    number of available processors

194195196
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 194defself.available_processor_countprocessor_counter.available_processor_countend

.call_dataflow(method, executor, *inputs, &block) ⇒undocumented

Raises:

  • (ArgumentError)
565758596061626364656667686970717273747576777879
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56defcall_dataflow(method,executor,*inputs,&block)raiseArgumentError.new('an executor must be provided')ifexecutor.nil?raiseArgumentError.new('no block given')unlessblock_given?unlessinputs.all?{|input|input.is_a?IVar}raiseArgumentError.new("Not all dependencies are IVars.\nDependencies:#{inputs.inspect}")endresult=Future.new(executor:executor)dovalues=inputs.map{|input|input.send(method)}block.call(*values)endifinputs.empty?result.executeelsecounter=DependencyCounter.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observercounterendendresultend

.cpu_quotanil,Float

The maximum number of processors cores available for process scheduling.Returnsnil if there is no enforced limit, or aFloat if theprocess is inside a cgroup with a dedicated CPU quota (typically Docker).

Note that nothing prevents setting a CPU quota higher than the actual number ofcores on the system.

For performance reasons the calculated value will be memoized on the firstcall.

Returns:

  • (nil,Float)

    Maximum number of available processors as set by a cgroup CPU quota, or nil if none set

209210211
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 209defself.cpu_quotaprocessor_counter.cpu_quotaend

.cpu_sharesFloat,nil

The CPU shares requested by the process. For performance reasons the calculatedvalue will be memoized on the first call.

Returns:

  • (Float,nil)

    CPU shares requested by the process, or nil if not set

217218219
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 217defself.cpu_sharesprocessor_counter.cpu_sharesend

.create_simple_logger(level = :FATAL, output = $stderr) ⇒undocumented

Create a simple logger with provided level and output.

3839404142434445464748495051525354555657585960616263
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 38defself.create_simple_logger(level=:FATAL,output=$stderr)level=Concern::Logging.const_get(level)unlesslevel.is_a?(Integer)# TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlockinglambdado|severity,progname,message=nil,&block|returnfalseifseverity<levelmessage=block?block.call:messageformatted_message=casemessagewhenStringmessagewhenExceptionformat"%s (%s)\n%s",message.message,message.class,(message.backtrace||[]).join("\n")elsemessage.inspectendoutput.printformat"[%s] %5s -- %s: %s\n",Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),Concern::Logging::SEV_LABEL[severity],progname,formatted_messagetrueendend

.create_stdlib_logger(level = :FATAL, output = $stderr) ⇒undocumented

Deprecated.

Create a stdlib logger with provided level and output.If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+.

73747576777879808182838485868788899091929394959697
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 73defself.create_stdlib_logger(level=:FATAL,output=$stderr)require'logger'logger=Logger.new(output)logger.level=levellogger.formatter=lambdado|severity,datetime,progname,msg|formatted_message=casemsgwhenStringmsgwhenExceptionformat"%s (%s)\n%s",msg.message,msg.class,(msg.backtrace||[]).join("\n")elsemsg.inspectendformat"[%s] %5s -- %s: %s\n",datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),severity,progname,formatted_messageendlambdado|loglevel,progname,message=nil,&block|logger.addloglevel,message,progname,&blockendend

.dataflow(*inputs) {|inputs| ... } ⇒Object

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.Data dependencies areFuture values. The dataflow task itself is also aFuture value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.

Our syntax is somewhat related to that of Akka'sflow and Habanero Java'sDataDrivenFuture. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.

The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.

Example

A dataflow task is created with thedataflow method, passing in a block.

task=Concurrent::dataflow{14}

This produces a simpleFuture value. The task will run immediately, as it has no dependencies. We can also specifyFuture values that must be available before a task will run. When we do this we get the value of those futures passed to our block.

a=Concurrent::dataflow{1}b=Concurrent::dataflow{2}c=Concurrent::dataflow(a,b){|av,bv|av+bv}

Using thedataflow method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.

Derivation

This section describes how we could derive dataflow from other primitives in this library.

Consider a naive fibonacci calculator.

deffib(n)ifn<2nelsefib(n-1)+fib(n-2)endendputsfib(14)#=> 377

We could modify this to use futures.

deffib(n)ifn<2Concurrent::Future.new{n}elsen1=fib(n-1).executen2=fib(n-2).executeConcurrent::Future.new{n1.value+n2.value}endendf=fib(14)#=> #f.execute#=> #sleep(0.5)putsf.value#=> 377

One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.

To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.

classCountingObserverdefinitialize(count,&block)@count=count@block=blockenddefupdate(time,value,reason)@count-=1if@count<=0@block.call()endendenddeffib(n)ifn<2Concurrent::Future.new{n}.executeelsen1=fib(n-1)n2=fib(n-2)result=Concurrent::Future.new{n1.value+n2.value}barrier=CountingObserver.new(2){result.execute}n1.add_observerbarriern2.add_observerbarriern1.executen2.executeresultendend

We can wrap this up in a dataflow utility.

f=fib(14)#=> #sleep(0.5)putsf.value#=> 377defdataflow(*inputs,&block)result=Concurrent::Future.new(&block)ifinputs.empty?result.executeelsebarrier=CountingObserver.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observerbarrierendendresultenddeffib(n)ifn<2dataflow{n}elsen1=fib(n-1)n2=fib(n-2)dataflow(n1,n2){n1.value+n2.value}endendf=fib(14)#=> #sleep(0.5)putsf.value#=> 377

Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.

defdataflow(*inputs,&block)result=Concurrent::Future.newdovalues=inputs.map{|input|input.value}block.call(*values)endifinputs.empty?result.executeelsebarrier=CountingObserver.new(inputs.size){result.execute}inputs.eachdo|input|input.add_observerbarrierendendresultenddeffib(n)ifn<2Concurrent::dataflow{n}elsen1=fib(n-1)n2=fib(n-2)Concurrent::dataflow(n1,n2){|v1,v2|v1+v2}endendf=fib(14)#=> #sleep(0.5)putsf.value#=> 377

Parameters:

  • inputs(Future)

    zero or moreFuture operations that this dataflow depends upon

Yields:

  • The operation to perform once all the dependencies are met

Yield Parameters:

  • inputs(Future)

    each of theFuture inputs to the dataflow

Yield Returns:

  • (Object)

    the result of the block operation

Returns:

  • (Object)

    the result of all the operations

Raises:

  • (ArgumentError)

    if no block is given

  • (ArgumentError)

    if any of the inputs are notIVars

343536
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34defdataflow(*inputs,&block)dataflow_with(Concurrent.global_io_executor,*inputs,&block)end

.dataflow!(*inputs, &block) ⇒undocumented

444546
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44defdataflow!(*inputs,&block)dataflow_with!(Concurrent.global_io_executor,*inputs,&block)end

.dataflow_with(executor, *inputs, &block) ⇒undocumented

394041
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39defdataflow_with(executor,*inputs,&block)call_dataflow(:value,executor,*inputs,&block)end

.dataflow_with!(executor, *inputs, &block) ⇒undocumented

495051
# File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49defdataflow_with!(executor,*inputs,&block)call_dataflow(:value!,executor,*inputs,&block)end

.disable_at_exit_handlers!undocumented

Deprecated.

Has no effect since it is no longer needed, seehttps://github.com/ruby-concurrency/concurrent-ruby/pull/841.

Note:

this option should be needed only because ofat_exit orderingissues which may arise when running some of the testing frameworks.E.g. Minitest's test-suite runs itself inat_exit callback whichexecutes after the pools are already terminated. Then auto terminationneeds to be disabled and called manually after test-suite ends.

Note:

This method shouldnever be calledfrom within a gem. It shouldonly be used from within the mainapplication and even then it should be used only when necessary.

Disables AtExit handlers including pool auto-termination handlers.When disabled it will be the application programmer's responsibilityto ensure that the handlers are shutdown properly prior to applicationexit by callingAtExit.run method.

484950
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 48defself.disable_at_exit_handlers!deprecated"Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."end

.executor(executor_identifier) ⇒Executor

General access point to global executors.

Parameters:

Returns:

  • (Executor)
838485
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 83defself.executor(executor_identifier)Options.executor(executor_identifier)end

.global_fast_executorThreadPoolExecutor

Global thread pool optimized for short, fastoperations.

Returns:

555657
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 55defself.global_fast_executorGLOBAL_FAST_EXECUTOR.value!end

.global_immediate_executorundocumented

666768
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 66defself.global_immediate_executorGLOBAL_IMMEDIATE_EXECUTORend

.global_io_executorThreadPoolExecutor

Global thread pool optimized for long, blocking (IO)tasks.

Returns:

626364
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 62defself.global_io_executorGLOBAL_IO_EXECUTOR.value!end

.global_loggerundocumented

114115116
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 114defself.global_loggerGLOBAL_LOGGER.valueend

.global_logger=(value) ⇒undocumented

118119120
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 118defself.global_logger=(value)GLOBAL_LOGGER.value=valueend

.global_timer_setConcurrent::TimerSet

Global thread pool user for globaltimers.

Returns:

737475
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 73defself.global_timer_setGLOBAL_TIMER_SET.value!end

.leave_transactionundocumented

Leave a transaction without committing or aborting - seeConcurrent::atomically.

Raises:

144145146
# File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144defleave_transactionraiseTransaction::LeaveError.newend

.monotonic_time(unit = :float_second) ⇒Float

Note:

Time calculations on all platforms and languages are sensitive tochanges to the system clock. To alleviate the potential problemsassociated with changing the system clock while an application is running,most modern operating systems provide a monotonic clock that operatesindependently of the system clock. A monotonic clock cannot be used todetermine human-friendly clock times. A monotonic clock is used exclusivelyfor calculating time intervals. Not all Ruby platforms provide access to anoperating system monotonic clock. On these platforms a pure-Ruby monotonicclock will be used as a fallback. An operating system monotonic clock is bothfaster and more reliable than the pure-Ruby implementation. The pure-Rubyimplementation should be fast and reliable enough for most non-realtimeoperations. At this time the common Ruby platforms that provide access to anoperating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

Returns the current time as tracked by the application monotonic clock.

Parameters:

  • unit(Symbol)(defaults to::float_second)

    the time unit to be returned, can be either:float_second, :float_millisecond, :float_microsecond, :second,:millisecond, :microsecond, or :nanosecond default to :float_second.

Returns:

  • (Float)

    The current monotonic time since some unspecifiedstarting point

See Also:

151617
# File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 15defmonotonic_time(unit=:float_second)Process.clock_gettime(Process::CLOCK_MONOTONIC,unit)end

.new_fast_executor(opts = {}) ⇒undocumented

87888990919293949596
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 87defself.new_fast_executor(opts={})FixedThreadPool.new([2,Concurrent.processor_count].max,auto_terminate:opts.fetch(:auto_terminate,true),idletime:60,# 1 minutemax_queue:0,# unlimitedfallback_policy::abort,# shouldn't matter -- 0 max queuename:"fast")end

.new_io_executor(opts = {}) ⇒undocumented

9899100101102103104
# File 'lib/concurrent-ruby/concurrent/configuration.rb', line 98defself.new_io_executor(opts={})CachedThreadPool.new(auto_terminate:opts.fetch(:auto_terminate,true),fallback_policy::abort,# shouldn't matter -- 0 max queuename:"io")end

.physical_processor_countInteger

Number of physical processor cores on the current system. For performancereasons the calculated value will be memoized on the first call.

On Windows the Win32 API will be queried for theNumberOfCores fromWin32_Processor. This will return the total number "of cores for thecurrent instance of the processor." On Unix-like operating systems eitherthehwprefs orsysctl utility will be called in a subshell and thereturned value will be used. In the rare case where none of these methodswork or an exception is raised the function will simply return 1.

Returns:

  • (Integer)

    number physical processor cores on the current system

See Also:

181182183
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 181defself.physical_processor_countprocessor_counter.physical_processor_countend

.processor_countInteger

Number of processors seen by the OS and used for process scheduling. Forperformance reasons the calculated value will be memoized on the firstcall.

When running under JRuby the Java runtime calljava.lang.Runtime.getRuntime.availableProcessors will be used. Accordingto the Java documentation this "value may change during a particularinvocation of the virtual machine... [applications] should thereforeoccasionally poll this property." We still memoize this value once underJRuby.

Otherwise Ruby's Etc.nprocessors will be used.

Returns:

  • (Integer)

    number of processors seen by the OS or Java runtime

See Also:

160161162
# File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 160defself.processor_countprocessor_counter.processor_countend

.use_simple_logger(level = :FATAL, output = $stderr) ⇒undocumented

Use logger created by #create_simple_logger to log concurrent-ruby messages.

666768
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 66defself.use_simple_logger(level=:FATAL,output=$stderr)Concurrent.global_logger=create_simple_loggerlevel,outputend

.use_stdlib_logger(level = :FATAL, output = $stderr) ⇒undocumented

Deprecated.

Use logger created by #create_stdlib_logger to log concurrent-ruby messages.

101102103
# File 'lib/concurrent-ruby/concurrent/concern/logging.rb', line 101defself.use_stdlib_logger(level=:FATAL,output=$stderr)Concurrent.global_logger=create_stdlib_loggerlevel,outputend

Instance Method Details

#exchange(value, timeout = nil) ⇒Object

Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.

In some edge cases when atimeout is given a return value ofnil may beambiguous. Specifically, ifnil is a valid value in the exchange it willbe impossible to tell whethernil is the actual return value or if itsignifies timeout. Whennil is a valid value in the exchange considerusing#exchange! or#try_exchange instead.

Parameters:

  • value(Object)

    the value to exchange with another thread

  • timeout(Numeric,nil)(defaults to:nil)

    in seconds,nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread ornil on timeout

# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 341

#exchange!(value, timeout = nil) ⇒Object

Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.

On timeout aTimeoutError exception will be raised.

Parameters:

  • value(Object)

    the value to exchange with another thread

  • timeout(Numeric,nil)(defaults to:nil)

    in seconds,nil blocks indefinitely

Returns:

  • (Object)

    the value exchanged by the other thread

Raises:

# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 345

#initialize(opts = {}) ⇒undocumented

Create a new thread pool.

Options Hash (opts):

  • :fallback_policy(Symbol) — default::discard

    the policy for handling newtasks that are received when the queue size has reachedmax_queue or the executor has shut down

Raises:

  • (ArgumentError)

    if:fallback_policy is not one of the values specifiedinFALLBACK_POLICIES

See Also:

# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 338

#try_exchange(value, timeout = nil) ⇒Concurrent::Maybe

Waits for another thread to arrive at this exchange point (unless thecurrent thread is interrupted), and then transfers the given object toit, receiving its object in return. The timeout value indicates theapproximate number of seconds the method should block while waitingfor the exchange. When the timeout value isnil the method willblock indefinitely.

The return value will be aMaybe set toJust on success orNothing on timeout.

Examples:

exchanger=Concurrent::Exchanger.newresult=exchanger.exchange(:foo,0.5)ifresult.just?putsresult.value#=> :barelseputs'timeout'end

Parameters:

  • value(Object)

    the value to exchange with another thread

  • timeout(Numeric,nil)(defaults to:nil)

    in seconds,nil blocks indefinitely

Returns:

  • (Concurrent::Maybe)

    on success aJust maybe will be returned withthe item exchanged by the other thread as#value; on timeout aNothing maybe will be returned withTimeoutError as#reason

# File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 349
Generated byyard.

[8]ページ先頭

©2009-2025 Movatter.jp