class Fiber::Scheduler
This is not an existing class, but documentation of the interface thatScheduler object should comply to in order to be used as argument toFiber.scheduler and handle non-blocking fibers. See also the “Non-blocking fibers” section inFiber class docs for explanations of some concepts.
Scheduler’s behavior and usage are expected to be as follows:
When the execution in the non-blocking
Fiberreaches some blocking operation (like sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler’s hook methods, listed below.Schedulersomehow registers what the current fiber is waiting on, and yields control to other fibers withFiber.yield(so the fiber would be suspended while expecting its wait to end, and other fibers in the same thread can perform)At the end of the current thread execution, the scheduler’s method scheduler_close is called
The scheduler runs into a wait loop, checking all the blocked fibers (which it has registered on hook calls) and resuming them when the awaited resource is ready (e.g. I/O ready or sleep time elapsed).
This way concurrent execution will be achieved transparently for every individual Fiber’s code.
Scheduler implementations are provided by gems, likeAsync.
Hook methods are:
io_wait,io_read,io_write,io_pread,io_pwrite, andio_select, io_close(the list is expanded as Ruby developers make more methods having non-blocking calls)
When not specified otherwise, the hook implementations are mandatory: if they are not implemented, the methods trying to call hook will fail. To provide backward compatibility, in the future hooks will be optional (if they are not implemented, due to the scheduler being created for the older Ruby version, the code which needs this hook will not fail, and will just behave in a blocking fashion).
It is also strongly recommended that the scheduler implements thefiber method, which is delegated to byFiber.schedule.
Sampletoy implementation of the scheduler can be found in Ruby’s code, intest/fiber/scheduler.rb
Public Instance Methods
Source
VALUErb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname){ VALUE arguments[] = { hostname }; return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);}Invoked by any method that performs a non-reverse DNS lookup. The most notable method isAddrinfo.getaddrinfo, but there are many other.
The method is expected to return an array of strings corresponding to ip addresses thehostname is resolved to, ornil if it can not be resolved.
Fairly exhaustive list of all possible call-sites:
Addrinfo.marshal_load
Source
VALUErb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout){ return rb_funcall(scheduler, id_block, 2, blocker, timeout);}Invoked by methods likeThread.join, and by Mutex, to signify that currentFiber is blocked until further notice (e.g.unblock) or untiltimeout has elapsed.
blocker is what we are waiting on, informational only (for debugging and logging). There are no guarantee about its value.
Expected to return boolean, specifying whether the blocking operation was successful or not.
Source
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state){ // Check if scheduler supports blocking_operation_wait before creating the object if (!rb_respond_to(scheduler, id_blocking_operation_wait)) { return Qundef; } // Create a new BlockingOperation with the blocking operation VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state); VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation); // Get the operation data to check if it was executed rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation); rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status); // Invalidate the operation now that we're done with it operation->function = NULL; operation->state = NULL; operation->data = NULL; operation->data2 = NULL; operation->unblock_function = NULL; // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { return Qundef; } return result;}Invoked by Ruby’s core methods to run a blocking operation in a non-blocking way. The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
If the scheduler doesn’t implement this method, or if the scheduler doesn’t execute the blocking operation, Ruby will fall back to the non-scheduler implementation.
Minimal suggested implementation is:
defblocking_operation_wait(blocking_operation)Thread.new {blocking_operation.call }.joinend
Source
VALUErb_fiber_scheduler_close(VALUE scheduler){ RUBY_ASSERT(ruby_thread_has_gvl_p()); VALUE result; // The reason for calling `scheduler_close` before calling `close` is for // legacy schedulers which implement `close` and expect the user to call // it. Subsequently, that method would call `Fiber.set_scheduler(nil)` // which should call `scheduler_close`. If it were to call `close`, it // would create an infinite loop. result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL); if (!UNDEF_P(result)) return result; result = rb_check_funcall(scheduler, id_close, 0, NULL); if (!UNDEF_P(result)) return result; return Qnil;}Called when the current thread exits. The scheduler is expected to implement this method in order to allow all waiting fibers to finalize their execution.
The suggested pattern is to implement the main event loop in theclose method.
Source
VALUErb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat){ return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);}Implementation of theFiber.schedule. The method isexpected to immediately run the given block of code in a separate non-blocking fiber, and to return thatFiber.
Minimal suggested implementation is:
deffiber(&block)fiber =Fiber.new(blocking:false,&block)fiber.resumefiberend
Source
VALUErb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset){ if (!rb_respond_to(scheduler, id_io_pread)) { return RUBY_Qundef; } VALUE arguments[] = { scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; if (rb_respond_to(scheduler, id_fiber_interrupt)) { return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments); } else { return fiber_scheduler_io_pread((VALUE)&arguments); }}Invoked byIO#pread orIO::Buffer#pread to readlength bytes fromio at offsetfrom into a specifiedbuffer (seeIO::Buffer) at the givenoffset.
This method is semantically the same asio_read, but it allows to specify the offset to read from and is often better for asynchronousIO on the same file.
The method should be consideredexperimental.
Source
VALUErb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset){ if (!rb_respond_to(scheduler, id_io_pwrite)) { return RUBY_Qundef; } VALUE arguments[] = { scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; if (rb_respond_to(scheduler, id_fiber_interrupt)) { return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments); } else { return fiber_scheduler_io_pwrite((VALUE)&arguments); }}Invoked byIO#pwrite orIO::Buffer#pwrite to writelength bytes toio at offsetfrom into a specifiedbuffer (seeIO::Buffer) at the givenoffset.
This method is semantically the same asio_write, but it allows to specify the offset to write to and is often better for asynchronousIO on the same file.
The method should be consideredexperimental.
Source
VALUErb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset){ if (!rb_respond_to(scheduler, id_io_read)) { return RUBY_Qundef; } VALUE arguments[] = { scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; if (rb_respond_to(scheduler, id_fiber_interrupt)) { return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments); } else { return fiber_scheduler_io_read((VALUE)&arguments); }}Invoked byIO#read or IO#Buffer.read to readlength bytes fromio into a specifiedbuffer (seeIO::Buffer) at the givenoffset.
Thelength argument is the “minimum length to be read”. If theIO buffer size is 8KiB, but thelength is1024 (1KiB), up to 8KiB might be read, but at least 1KiB will be. Generally, the only case where less data thanlength will be read is if there is an error reading the data.
Specifying alength of 0 is valid and means try reading at least once and return any available data.
Suggested implementation should try to read fromio in a non-blocking manner and callio_wait if theio is not ready (which will yield control to other fibers).
SeeIO::Buffer for an interface available to return data.
Expected to return number of bytes read, or, in case of an error,-errno (negated number corresponding to system’s error code).
The method should be consideredexperimental.
Source
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout){ VALUE arguments[] = { readables, writables, exceptables, timeout }; return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);}Invoked byIO.select to ask whether the specified descriptors are ready for specified events within the specifiedtimeout.
Expected to return the 3-tuple ofArray of IOs that are ready.
Source
VALUErb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout){ VALUE arguments[] = { scheduler, io, events, timeout }; if (rb_respond_to(scheduler, id_fiber_interrupt)) { return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments); } else { return fiber_scheduler_io_wait((VALUE)&arguments); }}Invoked byIO#wait,IO#wait_readable,IO#wait_writable to ask whether the specified descriptor is ready for specified events within the specifiedtimeout.
events is a bit mask ofIO::READABLE,IO::WRITABLE, andIO::PRIORITY.
Suggested implementation should register whichFiber is waiting for which resources and immediately callingFiber.yield to pass control to other fibers. Then, in theclose method, the scheduler might dispatch all the I/O resources to fibers waiting for it.
Expected to return the subset of events that are ready immediately.
Source
VALUErb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset){ if (!rb_respond_to(scheduler, id_io_write)) { return RUBY_Qundef; } VALUE arguments[] = { scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; if (rb_respond_to(scheduler, id_fiber_interrupt)) { return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments); } else { return fiber_scheduler_io_write((VALUE)&arguments); }}Invoked byIO#write orIO::Buffer#write to writelength bytes toio from from a specifiedbuffer (seeIO::Buffer) at the givenoffset.
Thelength argument is the “minimum length to be written”. If theIO buffer size is 8KiB, but thelength specified is 1024 (1KiB), at most 8KiB will be written, but at least 1KiB will be. Generally, the only case where less data thanlength will be written is if there is an error writing the data.
Specifying alength of 0 is valid and means try writing at least once, as much data as possible.
Suggested implementation should try to write toio in a non-blocking manner and callio_wait if theio is not ready (which will yield control to other fibers).
SeeIO::Buffer for an interface available to get data from buffer efficiently.
Expected to return number of bytes written, or, in case of an error,-errno (negated number corresponding to system’s error code).
The method should be consideredexperimental.
Source
VALUErb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout){ return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);}Invoked byKernel#sleep and Mutex#sleep and is expected to provide an implementation of sleeping in a non-blocking way. Implementation might register the current fiber in some list of “which fiber wait until what moment”, callFiber.yield to pass control, and then inclose resume the fibers whose wait period has elapsed.
Source
VALUErb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags){ VALUE arguments[] = { PIDT2NUM(pid), RB_INT2NUM(flags) }; return rb_check_funcall(scheduler, id_process_wait, 2, arguments);}Invoked byProcess::Status.wait in order to wait for a specified process. See that method description for arguments description.
Suggested minimal implementation:
Thread.newdoProcess::Status.wait(pid,flags)end.value
This hook is optional: if it is not present in the current scheduler,Process::Status.wait will behave as a blocking method.
Expected to return aProcess::Status instance.
Source
VALUErb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message){ VALUE arguments[] = { timeout, exception, message }; return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);}Invoked byTimeout.timeout to execute the givenblock within the givenduration. It can also be invoked directly by the scheduler or user code.
Attempt to limit the execution time of a givenblock to the givenduration if possible. When a non-blocking operation causes theblock‘s execution time to exceed the specifiedduration, that non-blocking operation should be interrupted by raising the specifiedexception_class constructed with the givenexception_arguments.
General execution timeouts are often considered risky. This implementation will only interrupt non-blocking operations. This is by design because it’s expected that non-blocking operations can fail for a variety of unpredictable reasons, so applications should already be robust in handling these conditions and by implication timeouts.
However, as a result of this design, if theblock does not invoke any non-blocking operations, it will be impossible to interrupt it. If you desire to provide predictable points for timeouts, consider adding +sleep(0)+.
If the block is executed successfully, its result will be returned.
The exception will typically be raised usingFiber#raise.
Source
VALUErb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber){ RUBY_ASSERT(rb_obj_is_fiber(fiber)); VALUE result; enum ruby_tag_type state; // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`. // // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it. int saved_errno = errno; // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`. rb_execution_context_t *ec = GET_EC(); int saved_interrupt_mask = ec->interrupt_mask; ec->interrupt_mask |= PENDING_INTERRUPT_MASK; EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); } EC_POP_TAG(); ec->interrupt_mask = saved_interrupt_mask; if (state) { EC_JUMP_TAG(ec, state); } RUBY_VM_CHECK_INTS(ec); errno = saved_errno; return result;}Invoked to wake upFiber previously blocked withblock (for example, Mutex#lock callsblock and Mutex#unlock callsunblock). The scheduler should use thefiber parameter to understand which fiber is unblocked.
blocker is what was awaited for, but it is informational only (for debugging and logging), and it is not guaranteed to be the same value as theblocker forblock.