Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Concurrent Programming with Effect Handlers

License

NotificationsYou must be signed in to change notification settings

ocaml-multicore/ocaml-effects-tutorial

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

64 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

Originally written as materials for theCUFP 17 tutorial.

Setting up

Install a compatible OCaml compiler

Up to date instructions can be found athttps://github.com/ocaml-multicore/awesome-multicore-ocaml#installation

Install required tools

$opam install ocamlbuild ocamlfind

Outline

The tutorial is structured as follows:

  1. Algebraic Effects and Handlers.
    1.1.Recovering from errors
    1.2.Basics
  2. Shallow vs Deep Handlers.
  3. Delimited Continuations: A deep dive.
    3.1.Examining effect handlers through GDB
  4. Generators & Streams.
    4.1.Message passing
    4.2.Generators from iterators
    4.3.Using the generator
    4.4.Streams
  5. Cooperative Concurrency.
    5.1.Coroutines
    5.2.Async/Await
  6. Asynchronous I/O.
    6.1.Blocking echo server
    6.2.Asynchronous echo server
  7. Conclusion.

The tutorial also includes the following exercises:

  1. Implement exceptions from effects ★☆☆☆☆
  2. Implement state put and history ★★☆☆☆
  3. Derive generator for an arbitrary iterator ★★★★☆
  4. Same fringe problem ★☆☆☆☆
  5. Implement async/await functionality ★★★☆☆
  6. Implement asynchronous accept and send ★☆☆☆☆

1. Algebraic Effects and Handlers

An algebraic effect handler is a programming abstraction for manipulatingcontrol-flow in a first-class fashion. They generalise common abstractions suchas exceptions, generators, asynchronous I/O, or concurrency, as well as otherseemingly esoteric programming abstractions such as transactional memory andprobabilistic computations.

Operationally, effect handlers offer a form of first-class, restartable exceptionmechanism. In this tutorial, we shall introduce gently algebraic effect andhandlers with gentle examples and then continue on to more involved examples.

1.1. Recovering from errors

Lets start with an example. Consider a program which reads a list of numbersfrom standard input and prints the sum of the numbers:

letrecsum_upacc=let l= input_line stdinin    acc:=!acc+ int_of_string l;    sum_up acclet _=let r=ref0intry sum_up rwith|End_of_file ->Printf.printf"Sum is %d\n"!r

The above program is available insources/input_line_exn.ml. You can run thisprogram as:

$cd sources$ocaml input_line_exn.ml1020(* ctrl+d *)Sum is 30

Theinput_line function returns a string for the input line and raisesEnd_of_file if it encounters end of file character. We useint_of_string toconvert the input string to a number. This works as long as the input is anumber. If not,int_of_string raisesFailure and this program blows up:

$ocaml input_line_exn.ml1020MMXVIIFatal error: exception Failure("int_of_string")

We could print a better error message (sources/input_line_exn2.ml):

exceptionConversion_failure ofstringletint_of_stringl=try int_of_string lwith|Failure_ -> raise (Conversion_failure l)letrecsum_upacc=let l= input_line stdinin    acc:=!acc+ int_of_string l;    sum_up acclet _=let r=ref0intry sum_up rwith|End_of_file ->Printf.printf"Sum is %d\n"!r|Conversion_failures ->Printf.fprintf stderr"Conversion failure\"%s\"\n%!" s

The program now prints a friendlier error message:

$ocaml input_line_exn2.ml1020MMXVIIConversion failure "MMXVII"

and, unfortunately, the programterminates. We really wish the program keptgoing:

let _=let r=ref0intry sum_up rwith|End_of_file ->Printf.printf"Sum is %d\n"!r|Conversion_failures ->Printf.fprintf stderr"Conversion failure\"%s\"\n%!" s(* Wish it kept going: continue with 0*)

We could change the code, but ifsum_up function was from a third-partylibrary, changing code is generally not an acceptable option. The issue here isthat the library determines whether the error is fatal or not. What we wouldlike is forthe client of a library determining whether an error is fatal ornot.

1.2. Basics

Algebraic effect handlers allow you to recover from errors. The following codeis available insources/input_line_eff.ml

openEffectopenEffect.Deeptype_ Effect.t +=Conversion_failure :string ->intEffect.tletint_of_stringl=try int_of_string lwith|Failure_ -> perform (Conversion_failure l)letrecsum_upacc=let l= input_line stdinin    acc:=!acc+ int_of_string l;    sum_up acclet _=Printf.printf"Starting up. Please input:\n%!";let r=ref0in  match_with sum_up r  { effc= (fun (typec) (eff: c Effect.t) ->match effwith|Conversion_failures ->Some (fun (k: (c,_) continuation) ->Printf.fprintf stderr"Conversion failure\"%s\"\n%!" s;              continue k0)|_ ->None    );    exnc= (function|End_of_file ->Printf.printf"Sum is %d\n"!r|e -> raise e    );(* Shouldn't reach here, means sum_up returned a value*)    retc=fun_ -> failwith"Impossible, sum_up shouldn't return"  }

First, let’s run this program:

$ocaml input_line_eff.ml1020MMXVIIConversion failure "MMXVII"30(* ctrl+d *)Sum is 60

We've recovered from the conversion error and kept going. Algebraic effects andhandlers are similar to exceptions in that we can declare new effects:

type_ Effect.t +=Conversion_failure :string ->intEffect.t(* c.f. [exception Conversion_failure of string]*)

Effects are declared by adding constructors to anextensible variant typedefined in theEffect module.

Unlike exceptions, performing an effect returns a value. The declaration heresays thatConversion_failure is an algebraic effect that takes a stringparameter, which when performed, returns an integer.

Just like exceptions, effects are values. The type ofConversion_failure "MMXVII" isint Effect.t, whereint is the result of performing the effect.We perform the effect withperform : 'a Effect.t -> 'a primitive (c.f.raise : exn -> 'a (* bottom *)).

Effect handlers are defined in the modulesEffect.Deep andEffect.Shallow.We'll discuss the differences between the two later.

moduleDeep :sig(** Some contents omitted*)type('a,'b) handler=    {retc:'a ->'b;exnc:exn ->'b;effc:'c.'ct -> (('c,'b)continuation ->'b)option }(** [('a,'b) handler] is a handler record with three fields -- [retc]      is the value handler, [exnc] handles exceptions, and [effc] handles the      effects performed by the computation enclosed by the handler.*)valmatch_with: ('c ->'a) ->'c -> ('a,'b)handler ->'b(** [match_with f v h] runs the computation [f v] in the handler [h].*)type'a effect_handler=    {effc:'b.'bt -> (('b,'a)continuation ->'a)option }(** ['a effect_handler] is a deep handler with an identity value handler      [fun x -> x] and an exception handler that raises any exception      [fun e -> raise e].*)valtry_with: ('b ->'a) ->'b ->'aeffect_handler ->'a(** [try_with f v h] runs the computation [f v] under the handler [h].*)endmoduleShallow :sig(** Some contents omitted*)type('a,'b) handler=    {retc:'a ->'b;exnc:exn ->'b;effc:'c.'ct -> (('c,'a)continuation ->'b)option }(** [('a,'b) handler] is a handler record with three fields -- [retc]      is the value handler, [exnc] handles exceptions, and [effc] handles the      effects performed by the computation enclosed by the handler.*)valcontinue_with : ('c,'a)continuation ->'c -> ('a,'b)handler ->'b(** [continue_with k v h] resumes the continuation [k] with value [v] with      the handler [h].      @raise Continuation_already_resumed if the continuation has already been      resumed.*)end

The handlers are records with three fields and are called in the context ofmatch_with,try_with, orcontinue_with:

retc is the function that is called when the computation returns a value -i.e. no effects or exceptions were performed/raised in the computation. Thefunction has one parameter: the value of the computation

exnc is called when the computation throws an exception. It takes the exception as a parameter.

effc is the function that handles the effects. It has type'c. 'c Effect.t -> ('c, 'a) continuation -> 'b) option

Effects are strongly typed, but the handler function can handle multipleeffects and has to be generic over every possible type (which is potentiallyall of them since the effects variant can always be extended further), hencethe'c existential type.effc returns anoption where a None value meansignore the effect (and crash the program if not handled somewhere else). A Somevalue holds a function that takes a parameter commonly calledk

  { effc= (fun (typec) (eff: c Effect.t) ->match effwith|Conversion_failures ->Some (fun (k: (c,_) continuation) ->Printf.fprintf stderr"Conversion failure\"%s\"\n%!" s;              continue k0)|_ ->None    )   }

We need to declare alocally abstract typec in order totell the compiler thateff andk are constrained on the same type.

The parameterk, is thedelimited continuationbetween the point of performing the effect and the effect handler. The delimitedcontinuation is like a dynamically defined function, that can be called andreturns a value. The type ofk in this case is(int, int) continuation,which says that the continuation expects an integer to continue (the first typeparameter), and returns with an integer (the second type parameter).

The delimited continuation is resumed withEffect.Deep'scontinue : ('a,'b) continuation -> 'a -> 'b. In this example,continue k 0 resumes the continuationwith0, and the correspondingperform (Conversion_failure l) returns with0.

If we do want to consider the error to be fatal (sources/input_line_eff2.ml),then we candiscontinue : ('a,'b) continuation -> exn -> 'b the continuationso that it raises an exception at the perform point.

  match_with sum_up r  { effc= (fun (typea) (e: a t) ->match ewith|Conversion_failures ->Some (fun (k: (a,_) continuation) ->Printf.fprintf stderr"Conversion failure\"%s\"\n%!" s;          discontinue k (Failure"int_of_string"))|_ ->None    );    exnc= (function|End_of_file ->Printf.printf"Sum is %d\n"!r|e -> raise e    );(* Shouldn't reach here, means sum_up returned a value*)    retc=funv -> v  }

Now,

$ocaml input_line_eff2.ml1020MMXVIIConversion failure "MMXVII"Fatal error: exception Failure("int_of_string")

1.2.1. Effects are unchecked

UnlikeEff,Koka,Links, and other languages thatsupport effect handlers, effects in Multicore OCaml are uncheckedcurrently. A program that does not handle a performed effect failswith a runtime error.

Let's fire up the OCaml top-level:

$ ocamlOCaml version5.0.0~beta1#openEffect;;#type _Effect.t+=E :unitEffect.t;;type_ Stdlib.Effect.t +=E :unitEffect.t#letf()= performE;;valf :unit ->unit=<fun>#f ();;Exception:Stdlib.Effect.Unhandled(E)#openEffect.Deep;;#try_withf () {effc= (fun (typec) (eff: c Effect.t) ->match effwith|E ->Some (fun (k: (c,_) continuation) -> continue k())|_ ->None  )};;- :unit= ()

Exercise 1: Implement exceptions from effects ★☆☆☆☆

As mentioned before, effects generalise exceptions. Exceptions handlers areeffect handlers that ignore the continuation. Your task is to implementexceptions in terms of effects. The source file issources/exceptions.ml.

2. Shallow vs Deep Handlers

The OCaml standard library provides two different modules for handling effects:Effect.Deep andEffect.Shallow. When a deep handler returns a continuation, the continuation also includes the handler. This means that, when the continuation is resumed, the effect handler is automatically re-installed, and will handle the effect(s) that the computation may perform in the future.

Shallow handlers on the other hand, allow us to change the handlers every time an effect is performed. Let's use them to implement state without refs. The implementation is available insources/state1.ml.

openPrintfopenEffectopenEffect.Shallowmodule typeSTATE=sigtypetvalget :unit ->tvalrun : (unit ->unit) ->init:t->unitendmoduleState (S : sig type t end) :STATEwithtype t=S.t=structtypet=S.ttype_ Effect.t +=Get :tEffect.tletget()= performGetletrunf~init=letrecloop :type a r. t -> (a, r) continuation -> a -> r=funstatekx ->        continue_with k x        { retc= (funresult -> result);          exnc= (fune -> raise e);          effc= (fun (typeb) (eff: b Effect.t) ->match effwith|Get ->Some (fun (k: (b,r) continuation) ->                    loop state k state)|_ ->None)        }in    loop init (fiber f)()end

We useEffect.Shallow by wrapping calculations withcontinue_with : ('c,'a) continuation -> 'c -> ('a,'b) handler -> 'b and getting an initial continuation withval fiber : ('a -> 'b) -> ('a, 'b) continuation

In this example, we define an effectGet that returns avalue of typet when performed.

moduleIS=State (structtype t=intend)moduleSS=State (structtype t=stringend)letfoo() :unit=  printf"%d\n" (IS.get());  printf"%d\n" (IS.get());  printf"%d\n" (IS.get());  printf"%s\n" (SS.get());  printf"%s\n" (SS.get())let _=IS.run (fun() ->SS.run foo"forty two")42

We instantiate two state instances, one with an integer type andanother with string type. Running the program returns:

$ocaml state1.ml424242forty twoforty two

Exercise 2: Implement state put and history ★★☆☆☆

Your task it to implementput : t -> unit that updates the state andhistory : unit -> t list that returns the list of values put. Do not use references.The source file issources/state2.ml.

3. Delimited Continuations: A deep dive

EDITOR'S NOTE: The implementation has changed since this section was written. Results in gdb will differ, but the concepts of the implementation remain mostly the same.

Algebraic effect handlers in Multicore OCaml are very efficient due to severalchoices we make in their implementation. Understanding the implementation ofdelimited continuations also helps to develop a mental model for reasoning aboutprograms that use effect handlers.

Delimited continuations that appear in the effect handler are implemented on topoffibers -- small, heap allocated stack chunks, that grow and shrink ondemand. The execution stack is really a stack of fibers.

Execution stack---------------+----+   +----+|    |   |    || f1 |<--| f2 ||    |   |    |<- stack_pointer+----+   +----+

An effect handler instantiates a new fiber for evaluating the expression.

try ex with| effect e k -> ....Execution stack---------------+----+   +----+    +----+|    |   |    |    |    || f1 |<--| f2 | <--| ex ||    |   |    |    |    |<- stack_pointer+----+   +----+    +----+

Performing an effect may pop one or more of the fibers based on which handlerhandles the effect. The popped sequence of fibers becomes the delimitedcontinuation.

effect E : unit try perform E with| effect E k -> ....Execution stack---------------+----+   +----+                                 +----+|    |   |    |---k (delimited continuation)--->|    || f1 |<--| f2 |                                 | ex ||    |   |    |<- stack_pointer                 |    |+----+   +----+                                 +----+

When you resume the delimited continuation (withcontinue ordiscontinue)the fiber sequence that represents the delimited continuation are push on top ofthe execution stack. Importantly, our continuations areone-shot -- they canonly be resumed once. One shotness means that we never have to copy ourcontinuations in the case that we may need it for a future invocation. For thisreason, context switching between fibers is really fast and is completely inuserland code and the kernel is not involved.

3.1 Examining effect handlers through GDB

The filesources/gdb.ml:

openEffectopenEffect.Deeptype_ Effect.t +=Peek :intEffect.t                 |Poke :unitEffect.tletrecai= performPeek+Random.int iletrecbi= a i+Random.int iletrecci= b i+Random.int iletrecdi=Random.int i+  try_with c i  { effc=fun (typea) (e: a t) ->match ewith|Poke ->Some (fun (k: (a,_) continuation) -> continue k())|_ ->None  }letrecei=Random.int i+  try_with d i  { effc=fun (typea) (e: a t) ->match ewith|Peek ->Some (fun (k: (a,_) continuation) ->Printexc.(print_raw_backtrace stdout (Effect.Deep.get_callstack k100));          flush stdout;          continue k42        )|_ ->None  }let _=Printf.printf"%d\n" (e100)

illustrates the effect handler stack. Let us compile and examine the file underGDB:

$make gdb.native$gdb ./gdb.native

caml_resume is the native stub function through which a fiber is attached tothe top of the execution stack and control switches to it. This happens when anew handler is installed, a continuation is resumed withcontinue ordiscontinue. Similarlycaml_perform is the native function which implementsperform primitive. We set breakpoints on these two functions to observe theprogram as it executes.

(gdb) break caml_performBreakpoint 1 at 0xaeca8(gdb) break caml_resumeBreakpoint 2 at 0xaed38(gdb) rStarting program: /home/sudha/ocaml/temp/ocaml-effects-tutorial/sources/gdb.native [Thread debugging using libthread_db enabled]Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".Breakpoint 1, 0x0000555555602ca8 in caml_perform ()(gdb) bt#0  0x0000555555602ca8 in caml_perform ()#1  0x00005555555a3c08 in camlGdb__b_311 () at gdb.ml:7#2  0x00005555555a3c69 in camlGdb__c_313 () at gdb.ml:9#3  <signal handler called>#4  0x00005555555a3cd8 in camlGdb__d_315 () at gdb.ml:13#5  <signal handler called>#6  0x00005555555a3db8 in camlGdb__e_329 () at gdb.ml:22#7  0x00005555555a4034 in camlGdb__entry () at gdb.ml:33#8  0x00005555555a13ab in caml_program ()#9  <signal handler called>#10 0x000055555560252f in caml_startup_common (argv=0x7fffffffda68, pooling=<optimized out>) at runtime/startup_nat.c:129#11 0x000055555560257b in caml_startup_exn (argv=<optimized out>) at runtime/startup_nat.c:136#12 caml_startup (argv=<optimized out>) at runtime/startup_nat.c:141#13 0x00005555555a108c in main (argc=<optimized out>, argv=<optimized out>) at runtime/main.c:37

Enter effect handler ine. The<signal handler called> frames correspond tothe transition between C frames to OCaml frames, and between OCaml frames of twodifferent fibers. These signal handler frames have nothing to do with signals,but are just a hack to let GDB know that the execution stack is a linked list ofcontiguous stack chunks.

(gdb) cContinuing.Raised by primitive operation at Gdb.a in file "gdb.ml" (inlined), line 7, characters 14-26Called from Gdb.b in file "gdb.ml", line 8, characters 14-17Called from Gdb.c in file "gdb.ml", line 9, characters 14-17Called from Gdb.d in file "gdb.ml", line 13, characters 2-159Breakpoint 2, 0x0000555555602d38 in caml_resume ()(gdb) bt#0  0x0000555555602d38 in caml_resume ()#1  0x00005555555a3db8 in camlGdb__e_329 () at gdb.ml:22#2  0x00005555555a4034 in camlGdb__entry () at gdb.ml:33#3  0x00005555555a13ab in caml_program ()#4  <signal handler called>#5  0x000055555560252f in caml_startup_common (argv=0x7fffffffda68, pooling=<optimized out>) at runtime/startup_nat.c:129#6  0x000055555560257b in caml_startup_exn (argv=<optimized out>) at runtime/startup_nat.c:136#7  caml_startup (argv=<optimized out>) at runtime/startup_nat.c:141#8  0x00005555555a108c in main (argc=<optimized out>, argv=<optimized out>) at runtime/main.c:37

The control switches to the effect handler. In the effect handler forPeek ine, we get the backtrace of the continuation and print it.

This break point corresponds tocontinue k 42 ine.

The program terminates normally.

Continuing.329[Inferior 1 (process 8464) exited normally]

4. Generators & streams.

So far we've seen examples where the handler discards the continuation(exceptions) and immediately resumes the computation (state). Since thecontinuations are first-class values, we can also keep them around and resumethem later, while executing some other code in the mean time. This functionalityallows us to implement programming idioms such as generators, async/await, etc.

4.1. Message passing

Let us being with a simple example that illustrates control switching betweentwo tasks. The two tasks runcooperatively, sending messagesbetween each other. The source code is available insources/msg_passing.ml.

We define an effectXchg : int -> int for exchanging integer messages with theother task. During an exchange, the task sends as well as receives an integer.

type_ Effect.t +=Xchg :int ->intEffect.t

Since the task may suspend, we need a way to represent the status of the task:

typestatus=Done|Pausedofint* (int,status)continuation

The task may either have beenDone or isPaused with the message to send aswell as the continuation, which expects the message to receive. The continuationresults in another status when resumed. We define astep function that runsthe functionf for one step with argumentv.

letstepfv()=  match_with f v  { retc= (fun_ ->Done);    exnc= (fune -> raise e);    effc= (fun (typeb) (eff: b t) ->match effwith|Xchgm ->Some (fun (k: (b,_) continuation) ->Paused (m, k))|_ ->None    )}

The task may perform anXchg in which case we return itsPaused state. Wenow define arun_both function for running two tasks concurrently.

letrecrun_bothab=match a(), b()with|Done,Done ->()|Paused (v1,k1),Paused (v2,k2) ->      run_both (fun() -> continue k1 v2) (fun() -> continue k2 v1)|_ -> failwith"improper synchronization"

Both of the tasks may run to completion, or both may offer to exchange amessage. We consider the other cases to be incorrect programs. In the lattercase, we resume both of the computations with the value from the other.

letrecfname=function|0 ->()|n ->Printf.printf"%s: sending %d\n%!" name n;let v= perform (Xchg n)inPrintf.printf"%s: received %d\n%!" name v;      f name (n-1)let _= run_both (step (f"a")3) (step (f"b")3)

Finally, we test the program with a simple test.

$ocaml msg_passing.mla: sending 3b: sending 3a: received 3a: sending 2b: received 3b: sending 2a: received 2a: sending 1b: received 2b: sending 1a: received 1b: received 1

4.2. Generators from iterators

Iterator is a mechanism to traverse a data structure that retains the control ofthe traversal on the library side. An example isList.iter : ('a -> unit) -> 'a list -> unit that applies the given function to every element in the list.We can provide the following general type for iterators:

type('elt,'container) iterator= ('elt ->unit) ->'container->unit

where'elt is the type of element and'container is the type of the containerover which the function iterates.

On the other hand, a generator is a function where the client of the library hascontrol over the traversal. We can imagine aList.generator : 'a list -> (unit -> 'a option) that returns a function, which when called returns the nextelement in the list. The function returnsNone if there are no more elements.We can provide the following general type for generator:

type'elt generator=unit ->'eltoption

Several languages, including Python and JavaScript, provide generators as aprimitive mechanism, usually through anyield primitive. Typically, thefunctions that can yield require special annotations (such asfunction*) inJavaScript, and only yield values to the immediate caller.

As we've seen in the earlier example, algebraic effect handlers provide amechanism to suspendarbitrary computation and capture it in thecontinuation. Hence, we can derive the generator for an arbitrary iteratorfunction.

Exercise 3: Derive generator for an arbitrary iterator ★★★★☆

Your task is to implement the functiongenerate : ('elt, 'container) iterator -> 'elt generator which derives the generator for any iterator function.

Hint: Since calling the generator function is an effectful operation, you mightthink about saving the state of the traversal in a reference.

4.3. Using the generator

4.3.1. Traversal

You can use thegenerator to traverse a data structure on demand.

$ ocaml##use"generator.ml";;#let gl= generateList.iter [1;2;3];;valgl :intgenerator=<fun>#gl();;- :intoption=Some1#gl();;- :intoption=Some2#gl();;- :intoption=Some3#gl();;- :intoption=None#letga=generateArray.iter [|1.0;2.0;3.0 |];;#ga();;- :floatoption=Some1.#ga();;- :floatoption=Some2.#ga();;- :floatoption=Some3.#ga();;- :floatoption=None

Exercise 4: Same fringe problem ★☆☆☆☆

Two binary trees have the same fringe if they have exactly the same leavesreading from left to right. Given two binary trees decide whether they have thesame fringe. The source file issources/fringe.ml.

4.4. Streams

The iterator need not necessarily be defined on finite data structure. Here isan iterator that iterates over infinite list of integers.

letrecnats :int (* init *) -> (int, unit) iterator=funvf() ->    f v; nats (v+1) f()

Since the iteration is not over any particular container, the container type isunit. We can make a generator over this iterator, which yields an infinitesequence of integers.

letgen_nats :int generator= generate (nats0)()

We know that this generator does not terminate. Hence, the optional return typeof generator is unnecessary. Hence, we define a type'a stream for infinitestreams:

type'a stream=unit ->'a

We can convert a generator to a stream easily:

letinf :'a generator -> 'a stream=fung() ->match g()with|Somen -> n|_ ->assertfalse

Now, an infinite stream of integers starting from 0 is:

letgen_nats :int stream= inf (generate (nats0)());;assert (0= gen_nats());;assert (1= gen_nats());;assert (2= gen_nats());;assert (3= gen_nats());;(* and so on*)

We can define operators over the stream such asmap andfilter:

letrecfilter :'a stream -> ('a -> bool) -> 'a stream=fungp() ->let v= g()inif p vthen velse filter g p()letmap :'a stream -> ('a -> 'b) -> 'b stream=fungf() -> f (g())

We can manipulate the streams using these operators. For example,

(* Even stream*)letgen_even :int stream=let nat_stream= inf (generate (nats0)())in  filter nat_stream (funn -> nmod2=0);;assert (0= gen_even());;assert (2= gen_even());;assert (4= gen_even());;assert (6= gen_even());;(* Odd stream*)letgen_odd :int stream=let nat_stream= inf (generate (nats1)())in  filter nat_stream (funn -> nmod2==1);;assert (1= gen_odd());;assert (3= gen_odd());;assert (5= gen_odd());;assert (7= gen_odd());;(* Primes using sieve of Eratosthenes*)let gen_primes=let s= inf (generate (nats2)())inlet rs=ref sinfun() ->let s=!rsinlet prime= s()in    rs:= filter s (funn -> nmod prime!=0);    primeassert (2= gen_primes());;assert (3= gen_primes());;assert (5= gen_primes());;assert (7= gen_primes());;assert (11= gen_primes());;assert (13= gen_primes());;assert (17= gen_primes());;assert (19= gen_primes());;assert (23= gen_primes());;assert (29= gen_primes());;assert (31= gen_primes());;

5. Cooperative Concurrency

OCaml has two popular libraries for cooperative concurrency: Lwt and Async. Bothlibraries achieve concurrency through aconcurrencymonad. As aresult, the programs that wish to use these libraries have to be written inmonadic style. With effect handlers, the code could be written in direct stylebut also retain the benefit of asynchronous I/O. While the resultant systemclosely resemblesGoroutines in Go,with effect handlers, all of this is implemented in OCaml as a library.

5.1. Coroutines

Let us begin with a simple cooperative scheduler. The source code is availableinsources/cooperative.ml. The interface we'll implement first is:

module typeScheduler=sigvalasync : (unit ->'a) ->unit(** [async f] runs [f] concurrently*)valyield :unit ->unit(** yields control to another task*)valrun   : (unit ->'a) ->unit(** Runs the scheduler*)end

We declare effects forasync andyield:

type_ Effect.t +=Async : (unit ->'a) ->unitEffect.t               |Yield :unitEffect.tletasyncf= perform (Async f)letyield()= performYield

We use a queue for the tasks that are ready to run:

let q=Queue.create()letenqueuet=Queue.push t qletdequeue()=ifQueue.is_empty qthen()elseQueue.pop q()

And finally, the main function is:

letrecrun :'a. (unit -> 'a) -> unit=fun main ->  match_with main()  { retc= (fun_ -> dequeue());    exnc= (fune -> raise e);    effc= (fun (typeb) (eff: b Effect.t) ->match effwith|Asyncf ->Some (fun (k: (b, _) continuation) ->                enqueue (continue k);                run f        )|Yield ->Some (funk ->                enqueue (continue k);                dequeue()        )|_ ->None    )}

If the task runs to completion (value case), then we dequeue and run the nexttask from the scheduler. In the case of anAsync f effect, the current task isenqueued and the new taskf is run. If the scheduler yields, then the currenttask is enqueued and some other task is dequeued and run from the scheduler. Wecan now write a cooperative concurrent program:

letmain()=letmk_taskname()=    printf"starting %s\n%!" name;    yield();    printf"ending %s\n%!" namein  async (mk_task"a");  async (mk_task"b")let _= run main
$ocaml cooperative.mlstarting astarting bending aending b

5.2. Async/await

We can extend the scheduler to implement async/await idiom. The interface wewill implement is:

module typeScheduler=sigtype'a promise(** Type of promises*)valasync : (unit ->'a) ->'apromise(** [async f] runs [f] concurrently*)valawait :'apromise ->'a(** [await p] returns the result of the promise.*)valyield :unit ->unit(** yields control to another task*)valrun   : (unit ->'a) ->unit(** Runs the scheduler*)end

We model a promise as a mutable reference that either is the list of taskswaiting on this promise to resolve (Waiting) or a resolved promise with thevalue (Done).

type'a _promise=Waitingof ('a,unit)continuationlist|Doneof'atype'a promise='a_promiseref

Exercise 5: Implement async/await functionality ★★★☆☆

In this task, you will implement the core async/await functionality. Unlike theprevious scheduler, additional work has to be done at theAsync handler caseto create the promise, and at task termination (value case) to update thepromise and resume the waiting threads. In addition, theAwait case needs tobe implemented. The source file issources/async_await.ml.

6. Asynchronous I/O

Effect handlers let us write asynchronous I/O libraries in direct-style.

6.1. Blocking echo server

As an example,sources/echo.ml is a implementation of an echo server thataccepts client messages and echoes them back. Observe that all of the code iswritten in direct, function-calling, and apparently blocking style. We will seethat the same code can be used to implement a blocking as well as non-blockingserver. A non-blocking server can concurrently host multiple client sessionsunlike the blocking server which serialises client sessions.

The echo server is functorized over the network interface:

module typeAio=sigvalaccept :Unix.file_descr ->Unix.file_descr*Unix.sockaddrvalrecv   :Unix.file_descr ->bytes ->int ->int ->Unix.msg_flaglist ->intvalsend   :Unix.file_descr ->bytes ->int ->int ->Unix.msg_flaglist ->intvalfork   : (unit ->unit) ->unitvalrun    : (unit ->unit) ->unitvalnon_blocking_mode :bool(* Are the sockets non-blocking*)end

We can satisfy this interface with functions from theUnix module:

structlet accept=Unix.acceptlet recv=Unix.recvlet send=Unix.sendletforkf= f()letrunf= f()let non_blocking_mode=falseend

You can test this echo server as follows:

$make echo_unix.native$./echo_unix.nativeEcho server listening on 127.0.0.1:9301

In another terminal, establish a client connection:

(* first client *)$telnet localhost 9301Trying 127.0.0.1...Connected to localhost.Escape character is '^]'.helloserver says: helloworldserver says: world

The server echoes whatever message that is sent. In another terminal, establisha second concurrent client connection:

(* second client *)$telnet localhost 9301Trying 127.0.0.1...Connected to localhost.Escape character is '^]'.helloworld

The server does not echo the messages since it is blocked serving the firstclient. Now, switch to the first client terminal, and terminate the connection:

(* first client *)^]telnet> (* ctrl+d *)$

At this point, you should see that all of the messages sent by the second clienthas been echoed:

(* second client *)server says: helloserver says: world

and further messages from the second client are immediately echoed.

6.2. Asynchronous echo server

We will extend our async/await implementation to support asynchronous I/Ooperations. The source file issources/echo_async.ml. As usual, we declare theeffects and functions to perform the effects:

typefile_descr=Unix.file_descrtypesockaddr=Unix.sockaddrtypemsg_flag=Unix.msg_flagtype_ Effect.t +=Accept :file_descr -> (file_descr*sockaddr)Effect.tletacceptfd= perform (Accept fd)type_ Effect.t +=Recv :file_descr*bytes*int*int*msg_flaglist ->intEffect.tletrecvfdbufposlenmode= perform (Recv (fd, buf, pos, len, mode))type_ Effect.t +=Send :file_descr*bytes*int*int*msg_flaglist ->intEffect.tletsendfdbusposlenmode= perform (Send (fd, bus, pos, len, mode))

We define functions to poll whether a file descriptor is ready to read or write:

letready_to_readfd=matchUnix.select [fd][][]0.with|[],_,_ ->false|_ ->trueletready_to_writefd=matchUnix.select[] [fd][]0.with|_,[],_ ->false|_ ->true

We define a type for tasks blocked on I/O, and a pair of hash tables to hold thecontinuations blocked on reads and writes:

typeblocked=Blocked :'aeff* ('a,unit)continuation ->blocked(* tasks blocked on reads*)let br=Hashtbl.create13(* tasks blocked on writes*)let bw=Hashtbl.create13

Now, the handler forRecv is:

|effect (Recv (fd,buf,pos,len,mode)ase)k ->if ready_to_read fdthen      continue k (Unix.recv fd buf pos len mode)elsebeginHashtbl.add br fd (Blocked (e, k));      schedule()end

If the file descriptor is ready to be read, then we perform the read immediatelywith the blocking read formUnix module knowing that the read would not block.If not, we add the task to the blocked-on-read hash tablebr, and schedule thenext task. The main schedule loop is:

letrecschedule()=ifnot (Queue.is_empty q)then(* runnable tasks available*)Queue.pop q()elseifHashtbl.length br=0&&Hashtbl.length bw=0then(* no runnable tasks, and no blocked tasks => we're done.*)()elsebegin(* no runnable tasks, but blocked tasks available*)let rd_fds=Hashtbl.fold (funfd_acc -> fd::acc) br[]inlet wr_fds=Hashtbl.fold (funfd_acc -> fd::acc) bw[]inlet rdy_rd_fds, rdy_wr_fds, _=Unix.select rd_fds wr_fds[] (-1.)inletrecresumeht=function|[] ->()|x::xs ->beginmatchHashtbl.find ht xwith|Blocked (Recv (fd,buf,pos,len,mode),k) ->                enqueue (fun() -> continue k (Unix.recv fd buf pos len mode))|Blocked (Acceptfd,k) -> failwith"not implemented"|Blocked (Send (fd,buf,pos,len,mode),k) -> failwith"not implemented"|Blocked_ -> failwith"impossible"end;Hashtbl.remove ht xin      resume br rdy_rd_fds;      resume bw rdy_wr_fds;      schedule()end

The interesting case is when runnable tasks are not available and there areblocked tasks. In this case, we run an iteration of the event loop. Thismay unblock further tasks and we continue running them.

Exercise 6: Implement asynchronous accept and send ★☆☆☆☆

In the file,sources/echo_async.ml, some of the functionality for handlingAccept andSend event are not implemented. Your task is to implement these.Once you implement these primitives, you can runecho_async.native to startthe asynchronous echo server. This server is able to respond to multipleconcurrent clients.

7. Conclusion

Hopefully you've enjoyed the tutorial on algebraic effect handlers in MulticoreOCaml. You should be familiar with:

  • What algebraic effects and handlers are.
  • Programming with algebraic effect handlers in Multicore OCaml.
  • Implementation of algebraic effect handlers in Multicore OCaml.
  • Developing control-flow abstractions such as restartable exceptions,generators, streams, coroutines, and asynchronous I/O.

7.1 Other resources

About

Concurrent Programming with Effect Handlers

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp