Fiber

Fibers provide a mechanism for cooperative concurrency.

Context Switching

Fibers execute a user-provided block. During the execution, the block may callFiber.yield orFiber.transfer to switch to another fiber.Fiber#resume is used to continue execution from the point whereFiber.yield was called.

#!/usr/bin/env rubyputs"1: Start program."f =Fiber.newdoputs"3: Entered fiber."Fiber.yieldputs"5: Resumed fiber."endputs"2: Resume fiber first time."f.resumeputs"4: Resume fiber second time."f.resumeputs"6: Finished."

This program demonstrates the flow control of fibers.

Scheduler

The scheduler interface is used to intercept blocking operations. A typical implementation would be a wrapper for a gem likeEventMachine orAsync. This design provides separation of concerns between the event loop implementation and application code. It also allows for layered schedulers which can perform instrumentation.

To set the scheduler for the current thread:

Fiber.set_scheduler(MyScheduler.new)

When the thread exits, there is an implicit call toset_scheduler:

Fiber.set_scheduler(nil)

Design

The scheduler interface is designed to be a un-opinionated light-weight layer between user code and blocking operations. The scheduler hooks should avoid translating or converting arguments or return values. Ideally, the exact same arguments from the user code are provided directly to the scheduler hook with no changes.

Interface

This is the interface you need to implement.

classScheduler# Wait for the specified process ID to exit.# This hook is optional.# @parameter pid [Integer] The process ID to wait for.# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.# @returns [Process::Status] A process status instance.defprocess_wait(pid,flags)Thread.newdoProcess::Status.wait(pid,flags)end.valueend# Wait for the given io readiness to match the specified events within# the specified timeout.# @parameter event [Integer] A bit mask of `IO::READABLE`,#   `IO::WRITABLE` and `IO::PRIORITY`.# @parameter timeout [Numeric] The amount of time to wait for the event in seconds.# @returns [Integer] The subset of events that are ready.defio_wait(io,events,timeout)end# Read from the given io into the specified buffer.# WARNING: Experimental hook! Do not use in production code!# @parameter io [IO] The io to read from.# @parameter buffer [IO::Buffer] The buffer to read into.# @parameter length [Integer] The minimum amount to read.defio_read(io,buffer,length)end# Write from the given buffer into the specified IO.# WARNING: Experimental hook! Do not use in production code!# @parameter io [IO] The io to write to.# @parameter buffer [IO::Buffer] The buffer to write from.# @parameter length [Integer] The minimum amount to write.defio_write(io,buffer,length)end# Sleep the current task for the specified duration, or forever if not# specified.# @parameter duration [Numeric] The amount of time to sleep in seconds.defkernel_sleep(duration =nil)end# Execute the given block. If the block execution exceeds the given timeout,# the specified exception `klass` will be raised. Typically, only non-blocking# methods which enter the scheduler will raise such exceptions.# @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.# @parameter klass [Class] The exception class to raise.# @parameter *arguments [Array] The arguments to send to the constructor of the exception.# @yields {...} The user code to execute.deftimeout_after(duration,klass,*arguments,&block)end# Resolve hostname to an array of IP addresses.# This hook is optional.# @parameter hostname [String] Example: "www.ruby-lang.org".# @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.defaddress_resolve(hostname)end# Block the calling fiber.# @parameter blocker [Object] What we are waiting on, informational only.# @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.# @returns [Boolean] Whether the blocking operation was successful or not.defblock(blocker,timeout =nil)end# Unblock the specified fiber.# @parameter blocker [Object] What we are waiting on, informational only.# @parameter fiber [Fiber] The fiber to unblock.# @reentrant Thread safe.defunblock(blocker,fiber)end# Intercept the creation of a non-blocking fiber.# @returns [Fiber]deffiber(&block)Fiber.new(blocking:false,&block)end# Invoked when the thread exits.defcloseself.runenddefrun# Implement event loop here.endend

Additional hooks may be introduced in the future, we will use feature detection in order to enable these hooks.

Non-blocking Execution

The scheduler hooks will only be used in special non-blocking execution contexts. Non-blocking execution contexts introduce non-determinism because the execution of scheduler hooks may introduce context switching points into your program.

Fibers

Fibers can be used to create non-blocking execution contexts.

Fiber.new do  puts Fiber.current.blocking? # false  # May invoke `Fiber.scheduler&.io_wait`.  io.read(...)  # May invoke `Fiber.scheduler&.io_wait`.  io.write(...)  # Will invoke `Fiber.scheduler&.kernel_sleep`.  sleep(n)end.resume

We also introduce a new method which simplifies the creation of these non-blocking fibers:

Fiber.scheduledoputsFiber.current.blocking?# falseend

The purpose of this method is to allow the scheduler to internally decide the policy for when to start the fiber, and whether to use symmetric or asymmetric fibers.

You can also create blocking execution contexts:

Fiber.new(blocking:true)do# Won't use the scheduler:sleep(n)end

However you should generally avoid this unless you are implementing a scheduler.

IO

By default, I/O is non-blocking. Not all operating systems support non-blocking I/O. Windows is a notable example where socket I/O can be non-blocking but pipe I/O is blocking. Provided that thereis a scheduler and the current threadis non-blocking, the operation will invoke the scheduler.

IO#close

Closing anIO interrupts all blocking operations on thatIO. When a thread callsIO#close, it first attempts to interrupt any threads or fibers that are blocked on thatIO. The closing thread waits until all blocked threads and fibers have been properly interrupted and removed from the IO’s blocking list. Each interrupted thread or fiber receives anIOError and is cleanly removed from the blocking operation. Only after all blocking operations have been interrupted and cleaned up will the actual file descriptor be closed, ensuring proper resource cleanup and preventing potential race conditions.

For fibers managed by a scheduler, the interruption process involves callingrb_fiber_scheduler_fiber_interrupt on the scheduler. This allows the scheduler to handle the interruption in a way that’s appropriate for its event loop implementation. The scheduler can then notify the fiber, which will receive anIOError and be removed from the blocking operation. This mechanism ensures that fiber-based concurrency works correctly withIO operations, even when those operations are interrupted byIO#close.

sequenceDiagram    participant ThreadB    participant ThreadA    participant Scheduler    participant IO    participant Fiber1    participant Fiber2    Note over ThreadA: Thread A has a fiber scheduler    activate Scheduler    ThreadA->>Fiber1: Schedule Fiber 1    activate Fiber1    Fiber1->>IO: IO.read    IO->>Scheduler: rb_thread_io_blocking_region    deactivate Fiber1    ThreadA->>Fiber2: Schedule Fiber 2    activate Fiber2    Fiber2->>IO: IO.read    IO->>Scheduler: rb_thread_io_blocking_region    deactivate Fiber2    Note over Fiber1,Fiber2: Both fibers blocked on same IO    Note over ThreadB: IO.close    activate ThreadB    ThreadB->>IO: thread_io_close_notify_all    Note over ThreadB: rb_mutex_sleep    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)    Scheduler->>Fiber1: fiber_interrupt with IOError    activate Fiber1    Note over IO: fiber_interrupt causes removal from blocking list    Fiber1->>IO: rb_io_blocking_operation_exit()    IO-->>ThreadB: Wakeup thread    deactivate Fiber1    IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)    Scheduler->>Fiber2: fiber_interrupt with IOError    activate Fiber2    Note over IO: fiber_interrupt causes removal from blocking list    Fiber2->>IO: rb_io_blocking_operation_exit()    IO-->>ThreadB: Wakeup thread    deactivate Fiber2    deactivate Scheduler    Note over ThreadB: Blocking operations list empty    ThreadB->>IO: close(fd)    deactivate ThreadB

Mutex

TheMutex class can be used in a non-blocking context and is fiber specific.

ConditionVariable

TheConditionVariable class can be used in a non-blocking context and is fiber-specific.

Queue / SizedQueue

TheQueue andSizedQueue classes can be used in a non-blocking context and are fiber-specific.

Thread

TheThread#join operation can be used in a non-blocking context and is fiber-specific.