Movatterモバイル変換


[0]ホーム

URL:


GitHub

Tasks

Core.TaskType
Task(func)

Create aTask (i.e. coroutine) to execute the given functionfunc (which must be callable with no arguments). The task exits when this function returns. The task will run in the "world age" from the parent at construction whenscheduled.

Warning

By default tasks will have the sticky bit set to truet.sticky. This models the historic default for@async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior ofThreads.@spawn set the sticky bit manually tofalse.

Examples

julia> a() = sum(i for i in 1:1000);julia> b = Task(a);

In this example,b is a runnableTask that hasn't started yet.

source
Base.@taskMacro
@task

Wrap an expression in aTask without executing it, and return theTask. This only creates a task, and does not run it.

Warning

By default tasks will have the sticky bit set to truet.sticky. This models the historic default for@async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior ofThreads.@spawn set the sticky bit manually tofalse.

Examples

julia> a1() = sum(i for i in 1:1000);julia> b = @task a1();julia> istaskstarted(b)falsejulia> schedule(b);julia> yield();julia> istaskdone(b)true
source
Base.@asyncMacro
@async

Wrap an expression in aTask and add it to the local machine's scheduler queue.

Values can be interpolated into@async via$, which copies the value directly into the constructed underlying closure. This allows you to insert thevalue of a variable, isolating the asynchronous code from changes to the variable's value in the current task.

Warning

It is strongly encouraged to favorThreads.@spawn over@async alwayseven when no parallelism is required especially in publicly distributed libraries. This is because a use of@async disables the migration of theparent task across worker threads in the current implementation of Julia. Thus, seemingly innocent use of@async in a library function can have a large impact on the performance of very different parts of user applications.

Julia 1.4

Interpolating values via$ is available as of Julia 1.4.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

Uses multiple concurrent tasks to mapf over a collection (or multiple equal length collections). For multiple collection arguments,f is applied elementwise.

ntasks specifies the number of tasks to run concurrently. Depending on the length of the collections, ifntasks is unspecified, up to 100 tasks will be used for concurrent mapping.

ntasks can also be specified as a zero-arg function. In this case, the number of tasks to run in parallel is checked before processing every element and a new task started if the value ofntasks_func is greater than the current number of tasks.

Ifbatch_size is specified, the collection is processed in batch mode.f must then be a function that must accept aVector of argument tuples and must return a vector of results. The input vector will have a length ofbatch_size or less.

The following examples highlight execution in different tasks by returning theobjectid of the tasks in which the mapping function is executed.

First, withntasks undefined, each element is processed in a different task.

julia> tskoid() = objectid(current_task());julia> asyncmap(x->tskoid(), 1:5)5-element Array{UInt64,1}: 0x6e15e66c75c75853 0x440f8819a1baa682 0x9fb3eeadd0c83985 0xebd3e35fe90d4050 0x29efc93edce2b961julia> length(unique(asyncmap(x->tskoid(), 1:5)))5

Withntasks=2 all elements are processed in 2 tasks.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)5-element Array{UInt64,1}: 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94 0xa23d2f80cd7cf157 0x027ab1680df7ae94julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))2

Withbatch_size defined, the mapping function needs to be changed to accept an array of argument tuples and return an array of results.map is used in the modified mapping function to achieve this.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)batch_func (generic function with 1 method)julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)5-element Array{String,1}: "args_tuple: (1,), element_val: 1, task: 9118321258196414413" "args_tuple: (2,), element_val: 2, task: 4904288162898683522" "args_tuple: (3,), element_val: 3, task: 9118321258196414413" "args_tuple: (4,), element_val: 4, task: 4904288162898683522" "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Likeasyncmap, but stores output inresults rather than returning a collection.

Warning

Behavior can be unexpected when any mutated argument shares memory with any other argument.

source
Base.current_taskFunction
current_task()

Get the currently runningTask.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

Determine whether a task has exited.

Examples

julia> a2() = sum(i for i in 1:1000);julia> b = Task(a2);julia> istaskdone(b)falsejulia> schedule(b);julia> yield();julia> istaskdone(b)true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

Determine whether a task has started executing.

Examples

julia> a3() = sum(i for i in 1:1000);julia> b = Task(a3);julia> istaskstarted(b)false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

Determine whether a task has exited because an exception was thrown.

Examples

julia> a4() = error("task failed");julia> b = Task(a4);julia> istaskfailed(b)falsejulia> schedule(b);julia> yield();julia> istaskfailed(b)true
Julia 1.3

This function requires at least Julia 1.3.

source
Base.task_local_storageMethod
task_local_storage(key)

Look up the value of a key in the current task's task-local storage.

source
Base.task_local_storageMethod
task_local_storage(key, value)

Assign a value to a key in the current task's task-local storage.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

Call the functionbody with a modified task-local storage, in whichvalue is assigned tokey; the previous value ofkey, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.

source

Scheduling

Base.yieldFunction
yield()

Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.

source
yield(t::Task, arg = nothing)

A fast, unfair-scheduling version ofschedule(t, arg); yield() which immediately yields tot before calling the scheduler.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches,arg is returned from the task's last call toyieldto. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.

source
Base.sleepFunction
sleep(seconds)

Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of0.001.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Add aTask to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such aswait.

If a second argumentval is provided, it will be passed to the task (via the return value ofyieldto) when it runs again. Iferror istrue, the value is raised as an exception in the woken task.

Warning

It is incorrect to useschedule on an arbitraryTask that has already been started. Seethe API reference for more information.

Warning

By default tasks will have the sticky bit set to truet.sticky. This models the historic default for@async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior ofThreads.@spawn set the sticky bit manually tofalse.

Examples

julia> a5() = sum(i for i in 1:1000);julia> b = Task(a5);julia> istaskstarted(b)falsejulia> schedule(b);julia> yield();julia> istaskstarted(b)truejulia> istaskdone(b)true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

Print an error log tostderr if taskt fails.

Examples

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))Unhandled Task ERROR: task failedStacktrace:[...]
source
Base.@syncMacro
@sync

Wait until all lexically-enclosed uses of@async,@spawn,Distributed.@spawnat andDistributed.@distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as aCompositeException.

Examples

julia> Threads.nthreads()4julia> @sync begin           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")       end;Thread-id 3, task 1Thread-id 1, task 2
source
Base.waitFunction

Special note forThreads.Condition:

The caller must be holding thelock that owns aThreads.Condition before calling this method. The calling task will be blocked until some other task wakes it, usually by callingnotify on the sameThreads.Condition object. The lock will be atomically released when blocking (even if it was locked recursively), and will be reacquired before returning.

source
wait(r::Future)

Wait for a value to become available for the specifiedFuture.

wait(r::RemoteChannel, args...)

Wait for a value to become available on the specifiedRemoteChannel.

wait([x])

Block the current task until some event occurs, depending on the type of the argument:

  • Channel: Wait for a value to be appended to the channel.
  • Condition: Wait fornotify on a condition and return theval parameter passed tonotify. Waiting on a condition additionally allows passingfirst=true which results in the waiter being putfirst in line to wake up onnotify instead of the usual first-in-first-out behavior.
  • Process: Wait for a process or process chain to exit. Theexitcode field of a process can be used to determine success or failure.
  • Task: Wait for aTask to finish. If the task fails with an exception, aTaskFailedException (which wraps the failed task) is thrown.
  • RawFD: Wait for changes on a file descriptor (see theFileWatching package).

If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call toschedule oryieldto.

Oftenwait is called within awhile loop to ensure a waited-for condition is met before proceeding.

source
wait(c::Channel)

Blocks until theChannelisready.

julia> c = Channel(1);julia> isready(c)falsejulia> task = Task(() -> wait(c));julia> schedule(task);julia> istaskdone(task)  # task is blocked because channel is not readyfalsejulia> put!(c, 1);julia> istaskdone(task)  # task is now unblockedtrue
source
Base.fetchMethod
fetch(t::Task)

Wait for aTask to finish, then return its result value. If the task fails with an exception, aTaskFailedException (which wraps the failed task) is thrown.

source
Base.fetchMethod
fetch(x::Any)

Returnx.

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

Wait untiltestcb() returnstrue ortimeout seconds have passed, whichever is earlier. The test function is polled everypollint seconds. The minimum value forpollint is 0.001 seconds, that is, 1 millisecond.

Return:ok or:timed_out.

Examples

julia> cb() = (sleep(5); return);julia> t = @async cb();julia> timedwait(()->istaskdone(t), 1):timed_outjulia> timedwait(()->istaskdone(t), 6.5):ok
source
Base.ConditionType
Condition()

Create an edge-triggered event source that tasks can wait for. Tasks that callwait on aCondition are suspended and queued. Tasks are woken up whennotify is later called on theCondition. Waiting on a condition can return a value or raise an error if the optional arguments ofnotify are used. Edge triggering means that only tasks waiting at the timenotify is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. TheChannel andThreads.Event types do this, and can be used for level-triggered events.

This object is NOT thread-safe. SeeThreads.Condition for a thread-safe version.

source
Base.Threads.ConditionType
Threads.Condition([lock])

A thread-safe version ofBase.Condition.

To callwait ornotify on aThreads.Condition, you must first calllock on it. Whenwait is called, the lock is atomically released during blocking, and will be reacquired beforewait returns. Therefore idiomatic use of aThreads.Conditionc looks like the following:

lock(c)try    while !thing_we_are_waiting_for        wait(c)    endfinally    unlock(c)end
Julia 1.2

This functionality requires at least Julia 1.2.

source
Base.EventType
Event([autoreset=false])

Create a level-triggered event source. Tasks that callwait on anEvent are suspended and queued untilnotify is called on theEvent. Afternotify is called, theEvent remains in a signaled state and tasks will no longer block when waiting for it, untilreset is called.

Ifautoreset is true, at most one task will be released fromwait for each call tonotify.

This provides an acquire & release memory ordering on notify/wait.

Julia 1.1

This functionality requires at least Julia 1.1.

Julia 1.8

Theautoreset functionality and memory ordering guarantee requires at least Julia 1.8.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

Wake up tasks waiting for a condition, passing themval. Ifall istrue (the default), all waiting tasks are woken, otherwise only one is. Iferror istrue, the passed value is raised as an exception in the woken tasks.

Return the count of tasks woken up. Return 0 if no tasks are waiting oncondition.

source
Base.resetMethod
reset(::Event)

Reset anEvent back into an un-set state. Then any future calls towait will block untilnotify is called again.

source
Base.SemaphoreType
Semaphore(sem_size)

Create a counting semaphore that allows at mostsem_size acquires to be in use at any time. Each acquire must be matched with a release.

This provides a acquire & release memory ordering on acquire/release calls.

source
Base.acquireFunction
acquire(s::Semaphore)

Wait for one of thesem_size permits to be available, blocking until one can be acquired.

source
acquire(f, s::Semaphore)

Executef after acquiring from Semaphores, andrelease on completion or error.

For example, a do-block form that ensures only 2 calls offoo will be active at the same time:

s = Base.Semaphore(2)@sync for _ in 1:100    Threads.@spawn begin        Base.acquire(s) do            foo()        end    endend
Julia 1.8

This method requires at least Julia 1.8.

source
Base.releaseFunction
release(s::Semaphore)

Return one permit to the pool, possibly allowing another task to acquire it and resume execution.

source
Base.AbstractLockType
AbstractLock

Abstract supertype describing types that implement the synchronization primitives:lock,trylock,unlock, andislocked.

source
Base.lockFunction
lock(lock)

Acquire thelock when it becomes available. If the lock is already locked by a different task/thread, wait for it to become available.

Eachlock must be matched by anunlock.

source
lock(f::Function, lock)

Acquire thelock, executef with thelock held, and release thelock whenf returns. If the lock is already locked by a different task/thread, wait for it to become available.

When this function returns, thelock has been released, so the caller should not attempt tounlock it.

See also:@lock.

Julia 1.7

Using aChannel as the second argument requires Julia 1.7 or later.

source
lock(f::Function, l::Lockable)

Acquire the lock associated withl, executef with the lock held, and release the lock whenf returns.f will receive one positional argument: the value wrapped byl. If the lock is already locked by a different task/thread, wait for it to become available. When this function returns, thelock has been released, so the caller should not attempt tounlock it.

Julia 1.11

Requires at least Julia 1.11.

source
Base.unlockFunction
unlock(lock)

Releases ownership of thelock.

If this is a recursive lock which has been acquired before, decrement an internal counter and return immediately.

source
Base.trylockFunction
trylock(lock) -> Success (Boolean)

Acquire the lock if it is available, and returntrue if successful. If the lock is already locked by a different task/thread, returnfalse.

Each successfultrylock must be matched by anunlock.

Functiontrylock combined withislocked can be used for writing the test-and-test-and-set or exponential backoff algorithmsif it is supported by thetypeof(lock) (read its documentation).

source
Base.islockedFunction
islocked(lock) -> Status (Boolean)

Check whether thelock is held by any task/thread. This function alone should not be used for synchronization. However,islocked combined withtrylock can be used for writing the test-and-test-and-set or exponential backoff algorithmsif it is supported by thetypeof(lock) (read its documentation).

Extended help

For example, an exponential backoff can be implemented as follows if thelock implementation satisfied the properties documented below.

nspins = 0while true    while islocked(lock)        GC.safepoint()        nspins += 1        nspins > LIMIT && error("timeout")    end    trylock(lock) && break    backoff()end

Implementation

A lock implementation is advised to defineislocked with the following properties and note it in its docstring.

  • islocked(lock) is data-race-free.
  • Ifislocked(lock) returnsfalse, an immediate invocation oftrylock(lock) must succeed (returnstrue) if there is no interference from other tasks.
source
Base.ReentrantLockType
ReentrantLock()

Creates a re-entrant lock for synchronizingTasks. The same task can acquire the lock as many times as required (this is what the "Reentrant" part of the name means). Eachlock must be matched with anunlock.

Callinglock will also inhibit running of finalizers on that thread until the correspondingunlock. Use of the standard lock pattern illustrated below should naturally be supported, but beware of inverting the try/lock order or missing the try block entirely (e.g. attempting to return with the lock still held):

This provides a acquire/release memory ordering on lock/unlock calls.

lock(l)try    <atomic work>finally    unlock(l)end

If!islocked(lck::ReentrantLock) holds,trylock(lck) succeeds unless there are other tasks attempting to hold the lock "at the same time."

source
Base.@lockMacro
@lock l expr

Macro version oflock(f, l::AbstractLock) but withexpr instead off function. Expands to:

lock(l)try    exprfinally    unlock(l)end

This is similar to usinglock with ado block, but avoids creating a closure and thus can improve the performance.

Compat

@lock was added in Julia 1.3, and exported in Julia 1.10.

source
Base.LockableType
Lockable(value, lock = ReentrantLock())

Creates aLockable object that wrapsvalue and associates it with the providedlock. This object supports@lock,lock,trylock,unlock. To access the value, index the lockable object while holding the lock.

Julia 1.11

Requires at least Julia 1.11.

Example

julia> locked_list = Base.Lockable(Int[]);julia> @lock(locked_list, push!(locked_list[], 1)) # must hold the lock to access the value1-element Vector{Int64}: 1julia> lock(summary, locked_list)"1-element Vector{Int64}"
source

Channels

Base.AbstractChannelType
AbstractChannel{T}

Representation of a channel passing objects of typeT.

source
Base.ChannelType
Channel{T=Any}(size::Int=0)

Constructs aChannel with an internal buffer that can hold a maximum ofsize objects of typeT.put! calls on a full channel block until an object is removed withtake!.

Channel(0) constructs an unbuffered channel.put! blocks until a matchingtake! is called. And vice-versa.

Other constructors:

  • Channel(): default constructor, equivalent toChannel{Any}(0)
  • Channel(Inf): equivalent toChannel{Any}(typemax(Int))
  • Channel(sz): equivalent toChannel{Any}(sz)
Julia 1.3

The default constructorChannel() and defaultsize=0 were added in Julia 1.3.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Create a new task fromfunc,bind it to a new channel of typeT and sizesize, and schedule the task, all in a single call. The channel is automatically closed when the task terminates.

func must accept the bound channel as its only argument.

If you need a reference to the created task, pass aRef{Task} object via the keyword argumenttaskref.

Ifspawn=true, theTask created forfunc may be scheduled on another thread in parallel, equivalent to creating a task viaThreads.@spawn.

Ifspawn=true and thethreadpool argument is not set, it defaults to:default.

If thethreadpool argument is set (to:default or:interactive), this implies thatspawn=true and the new Task is spawned to the specified threadpool.

Return aChannel.

Examples

julia> chnl = Channel() do ch           foreach(i -> put!(ch, i), 1:4)       end;julia> typeof(chnl)Channel{Any}julia> for i in chnl           @show i       end;i = 1i = 2i = 3i = 4

Referencing the created task:

julia> taskref = Ref{Task}();julia> chnl = Channel(taskref=taskref) do ch           println(take!(ch))       end;julia> istaskdone(taskref[])falsejulia> put!(chnl, "Hello");Hellojulia> istaskdone(taskref[])true
Julia 1.3

Thespawn= parameter was added in Julia 1.3. This constructor was added in Julia 1.3. In earlier versions of Julia, Channel used keyword arguments to setsize andT, but those constructors are deprecated.

Julia 1.9

Thethreadpool= argument was added in Julia 1.9.

julia> chnl = Channel{Char}(1, spawn=true) do ch           for c in "hello world"               put!(ch, c)           end       endChannel{Char}(1) (2 items available)julia> String(collect(chnl))"hello world"
source
Base.put!Method
put!(c::Channel, v)

Append an itemv to the channelc. Blocks if the channel is full.

For unbuffered channels, blocks until atake! is performed by a different task.

Julia 1.1

v now gets converted to the channel's type withconvert asput! is called.

source
Base.take!Method
take!(c::Channel)

Removes and returns a value from aChannel in order. Blocks until data is available. For unbuffered channels, blocks until aput! is performed by a different task.

Examples

Buffered channel:

julia> c = Channel(1);julia> put!(c, 1);julia> take!(c)1

Unbuffered channel:

julia> c = Channel(0);julia> task = Task(() -> put!(c, 1));julia> schedule(task);julia> take!(c)1
source
Base.isreadyMethod
isready(c::Channel)

Determines whether aChannel has a value stored in it. Returns immediately, does not block.

For unbuffered channels returnstrue if there are tasks waiting on aput!.

Examples

Buffered channel:

julia> c = Channel(1);julia> isready(c)falsejulia> put!(c, 1);julia> isready(c)true

Unbuffered channel:

julia> c = Channel();julia> isready(c)  # no tasks waiting to put!falsejulia> task = Task(() -> put!(c, 1));julia> schedule(task);  # schedule a put! taskjulia> isready(c)true
source
Base.fetchMethod
fetch(c::Channel)

Waits for and returns (without removing) the first available item from theChannel. Note:fetch is unsupported on an unbuffered (0-size)Channel.

Examples

Buffered channel:

julia> c = Channel(3) do ch           foreach(i -> put!(ch, i), 1:3)       end;julia> fetch(c)1julia> collect(c)  # item is not removed3-element Vector{Any}: 1 2 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

Close a channel. An exception (optionally given byexcp), is thrown by:

source
Base.bindMethod
bind(chnl::Channel, task::Task)

Associate the lifetime ofchnl with a task.Channelchnl is automatically closed when the task terminates. Any uncaught exception in the task is propagated to all waiters onchnl.

Thechnl object can be explicitly closed independent of task termination. Terminating tasks have no effect on already closedChannel objects.

When a channel is bound to multiple tasks, the first task to terminate will close the channel. When multiple channels are bound to the same task, termination of the task will close all of the bound channels.

Examples

julia> c = Channel(0);julia> task = @async foreach(i->put!(c, i), 1:4);julia> bind(c,task);julia> for i in c           @show i       end;i = 1i = 2i = 3i = 4julia> isopen(c)false
julia> c = Channel(0);julia> task = @async (put!(c, 1); error("foo"));julia> bind(c, task);julia> take!(c)1julia> put!(c, 1);ERROR: TaskFailedExceptionStacktrace:[...]    nested task error: foo[...]
source

Low-level synchronization usingschedule andwait

The easiest correct use ofschedule is on aTask that is not started (scheduled) yet. However, it is possible to useschedule andwait as a very low-level building block for constructing synchronization interfaces. A crucial pre-condition of callingschedule(task) is that the caller must "own" thetask; i.e., it must know that the call towait in the giventask is happening at the locations known to the code callingschedule(task). One strategy for ensuring such pre-condition is to use atomics, as demonstrated in the following example:

@enum OWEState begin    OWE_EMPTY    OWE_WAITING    OWE_NOTIFYINGendmutable struct OneWayEvent    @atomic state::OWEState    task::Task    OneWayEvent() = new(OWE_EMPTY)endfunction Base.notify(ev::OneWayEvent)    state = @atomic ev.state    while state !== OWE_NOTIFYING        # Spin until we successfully update the state to OWE_NOTIFYING:        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)        if ok            if state == OWE_WAITING                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is                # already waiting or about to call `wait`. The notifier task must wake up                # the waiter task.                schedule(ev.task)            else                @assert state == OWE_EMPTY                # Since we are assuming that there is only one notifier task (for                # simplicity), we know that the other possible case here is OWE_EMPTY.                # We do not need to do anything because we know that the waiter task has                # not called `wait(ev::OneWayEvent)` yet.            end            break        end    end    returnendfunction Base.wait(ev::OneWayEvent)    ev.task = current_task()    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)    if ok        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may        # yield to the scheduler at this point in code.        wait()    else        @assert state == OWE_NOTIFYING        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the        # notifier task.    end    returnendev = OneWayEvent()@sync begin    @async begin        wait(ev)        println("done")    end    println("notifying...")    notify(ev)end# outputnotifying...done

OneWayEvent lets one task towait for another task'snotify. It is a limited communication interface sincewait can only be used once from a single task (note the non-atomic assignment ofev.task)

In this example,notify(ev::OneWayEvent) is allowed to callschedule(ev.task) if and only ifit modifies the state fromOWE_WAITING toOWE_NOTIFYING. This lets us know that the task executingwait(ev::OneWayEvent) is now in theok branch and that there cannot be other tasks that tries toschedule(ev.task) since their@atomicreplace(ev.state, state => OWE_NOTIFYING) will fail.

Settings


This document was generated withDocumenter.jl version 1.8.0 onWednesday 9 July 2025. Using Julia version 1.11.6.


[8]ページ先頭

©2009-2025 Movatter.jp