- Notifications
You must be signed in to change notification settings - Fork5
Lwt Translations of the Async Code Examples in Real World OCaml
License
dkim/rwo-lwt
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Deokhwan Kim - Version 1.1.0, 2017-06-03
Below areLwt translations ofthe code examples inReal World OCaml - Chapter 18. Concurrent Programming with Async. The section titles follow those in the book for easy cross-reference. Here is the version information of the software components that I have used:
$ ocamlc -version4.04.1$ opam show --field=version lwt3.0.0$ opam show --field=version cohttp0.22.0$ utop -versionThe universal toplevelfor OCaml, version 1.19.3, compiledfor OCaml version 4.04.1
The latest version of this document is available athttps://github.com/dkim/rwo-lwt/.
##require"lwt.unix";;##require"lwt.ppx";;#letfile_contentsfilename=Lwt_io.with_file~mode:Lwt_io.input filename (funchannel ->Lwt_io.read channel);;valfile_contents :string ->stringLwt.t=<fun>
#let contents= file_contents"test.txt";;valcontents :stringLwt.t=<abstr>#Lwt.statecontents;;(* if test.txt exists*)- :stringLwt.state=Lwt.Return"This is only a test.\n"#Lwt.statecontents;;(* if test.txt does not exist*)- :stringLwt.state=Lwt.Fail (Unix.Unix_error (Unix.ENOENT,"open","test.txt"))
# contents;;- :string="This is only a test.\n"
#Lwt.bind;;- : 'aLwt.t -> ('a -> 'bLwt.t) -> 'bLwt.t=<fun>
I will uselet%lwt x = e1 in e2
in preference toLwt.bind e1 (fun x -> e2)
ande1 >>= (fun x -> e2)
. The Lwt manual states that the former will produce better backtraces than the latter[1]:
Backtrace support
In debug mode, the
lwt
andlet%lwt
constructs will properly propagate backtraces.
val bind : 'a t -> ('a -> 'b t) -> 'b t
Note that
bind
will not propagate backtraces correctly.
#letsavefilename~contents=Lwt_io.with_file~mode:Lwt_io.output filename (funchannel ->Lwt_io.write channel contents);;valsave :string ->contents:string->unitLwt.t=<fun>#letuppercase_filefilename=let%lwttext=file_contentsfilenameinsavefilename~contents:(String.uppercase_asciitext);;valuppercase_file :string ->unitLwt.t=<fun>#uppercase_file"test.txt";;- :unit= ()#file_contents"test.txt";;- :string="THIS IS ONLY A TEST.\n"
#letcount_linesfilename=let%lwt text= file_contents filenameinString.split_on_char'\n' text|>List.length;;Error:This expression hastypeint but an expression was expected oftype 'aLwt.t
#Lwt.return;;- : 'a -> 'aLwt.t=<fun>#let three=Lwt.return3;;valthree :intLwt.t=<abstr>#three;;- :int=3
#letcount_linesfilename=let%lwt text= file_contents filenameinString.split_on_char'\n' text|>List.length|>Lwt.return;;valcount_lines :string ->intLwt.t=<fun>
#Lwt.map;;- : ('a -> 'b) -> 'aLwt.t -> 'bLwt.t=<fun>
As withLwt.bind
, I will use the combination of thelet%lwt
construct and theLwt.return
function rather thanLwt.map
.[1]
#let waiter, wakener=Lwt.wait();;valwaiter :'_aLwt.t=<abstr>valwakener :'_aLwt.u=<abstr>#Lwt.statewaiter;;- :'_aLwt.state=Lwt.Sleep#Lwt.wakeupwakener"Hello";;- :unit= ()#Lwt.statewaiter;;- :stringLwt.state=Lwt.Return"Hello"
#moduletypeDelayer_intf=sigtypetvalcreate :float ->tvalschedule :t -> (unit ->'aLwt.t) ->'aLwt.tend;;module typeDelayer_intf=sigtypetvalcreate :float ->tvalschedule :t -> (unit ->'aLwt.t) ->'aLwt.tend
#Lwt.on_success;;- : 'aLwt.t -> ('a ->unit) ->unit=<fun>#Lwt.on_failure;;- : 'aLwt.t -> (exn ->unit) ->unit=<fun>#Lwt.on_termination;;- : 'aLwt.t -> (unit ->unit) ->unit=<fun>#Lwt.on_any;;- : 'aLwt.t -> ('a ->unit) -> (exn ->unit) ->unit=<fun>
#moduleDelayer :Delayer_intf=structtypet= {delay:float;jobs: (unit ->unit)Queue.t}letcreatedelay= {delay; jobs=Queue.create()}letscheduletthunk=let waiter, wakener=Lwt.wait()inQueue.add (fun() ->Lwt.on_any (thunk()) (Lwt.wakeup wakener) (Lwt.wakeup_exn wakener)) t.jobs;Lwt.on_termination (Lwt_unix.sleep t.delay) (Queue.take t.jobs); waiterend;;moduleDelayer :Delayer_intf
letreccopy_blocksbufferrw=match%lwtLwt_io.read_into r buffer0 (Bytes.length buffer)with|0 ->Lwt.return_unit|bytes_read ->let%lwt()=Lwt_io.write_from_exactly w buffer0 bytes_readin copy_blocks buffer r w
let%lwt () = e1 in e2
can be shortened toe1 >> e2
, but>>
will getdeprecated in the near future.
letrun()= ((let%lwt server=Lwt_io.establish_server (Lwt_unix.ADDR_INET (Unix.inet_addr_any,8765)) (fun (r,w) ->let buffer=Bytes.create (16*1024)in copy_blocks buffer r w)inLwt.return server) :Lwt_io.serverLwt.t)|> ignore
let never_terminate= fst (Lwt.wait())let()=Sys.set_signalSys.sigpipeSys.Signal_ignore; (tryLwt_engine.set (newLwt_engine.libev())withLwt_sys.Not_available_ ->()); run();Lwt_main.run never_terminate
letrunuppercaseport=let%lwt server=Lwt_io.establish_server (Lwt_unix.ADDR_INET (Unix.inet_addr_any, port)) (fun (r,w) ->Lwt_io.read_chars r|> (if uppercasethenLwt_stream.mapChar.uppercase_asciielsefunx -> x)|>Lwt_io.write_chars w)in (server : Lwt_io.server)|> ignore; never_terminatelet()=let uppercase=reffalseand port=ref8765inlet options=Arg.align [ ("-uppercase",Arg.Set uppercase," Convert to uppercase before echoing back"); ("-port",Arg.Set_int port,"num Port to listen on (default 8765)"); ]inlet usage="Usage:"^Sys.executable_name^" [-uppercase] [-port num]"inArg.parse options (funarg -> raise (Arg.Bad (Printf.sprintf"invalid argument -- '%s'" arg))) usage;Sys.set_signalSys.sigpipeSys.Signal_ignore; (tryLwt_engine.set (newLwt_engine.libev())withLwt_sys.Not_available_ ->());Lwt_main.run (run!uppercase!port)
The Lwt manual states that theLwt_stream
module may get deprecated or redesigned, and suggests considering alternatives, such as Simon Cruanes'slwt-pipe. Below is an equivalent version of the code above that uses lwt-pipe.
$ opam pin add -k git lwt-pipe https://github.com/c-cube/lwt-pipe.git$ opam install lwt-pipe
letrunuppercaseport=let%lwt server=Lwt_io.establish_server (Lwt_unix.ADDR_INET (Unix.inet_addr_any, port)) (fun (r,w) ->let reader=Lwt_pipe.IO.read rinlet writer=Lwt_pipe.IO.write w|> (if uppercasethenLwt_pipe.Writer.map~f:String.uppercase_asciielsefunx -> x)inLwt_pipe.connect~ownership:`OutOwnsIn reader writer;Lwt_pipe.wait writer)in (server : Lwt_io.server)|> ignore; never_terminate
$ opam install tls cohttp# Or opam install lwt_ssl cohttp
let query_uri=let base_uri=Uri.of_string"https://api.duckduckgo.com/?format=json"in (funquery ->Uri.add_query_param base_uri ("q", [query]))
letget_definition_from_jsonjson=matchYojson.Safe.from_string jsonwith|`Assockv_list ->letfindkey=matchList.assoc key kv_listwith|exceptionNot_found ->None|`String"" ->None|s ->Some (Yojson.Safe.to_string s)inbeginmatch find"Abstract"with|Some_asx -> x|None -> find"Definition"end|_ ->None
letget_definitionword=let%lwt _resp, body=Cohttp_lwt_unix.Client.get (query_uri word)inlet%lwt body'=Cohttp_lwt_body.to_string bodyinLwt.return (word, get_definition_from_json body')
##require"cohttp.lwt";;#Cohttp_lwt_unix.Client.get;;- : ?ctx:Cohttp_lwt_unix.Client.ctx -> ?headers:Cohttp.Header.t ->Uri.t -> (Cohttp_lwt.Response.t*Cohttp_lwt_body.t)Lwt.t=<fun>
letprint_result (word,definition)=Lwt_io.printf"%s\n%s\n\n%s\n\n" word (String.init (String.length word) (fun_ ->'-')) (match definitionwith|None ->"No definition found"|Somedef ->Format.pp_set_marginFormat.str_formatter70;Format.pp_print_textFormat.str_formatter def;Format.flush_str_formatter())
letsearch_and_printwords=let%lwt results=Lwt_list.map_p get_definition wordsinLwt_list.iter_s print_result results
#Lwt_list.map_p;;- : ('a -> 'bLwt.t) -> 'alist -> 'blistLwt.t=<fun>
letsearch_and_printwords=Lwt_list.iter_p (funword ->let%lwt result= get_definition wordin print_result result) words
#Lwt_list.iter_p;;- : ('a ->unitLwt.t) -> 'alist ->unitLwt.t=<fun>
let()=let words=ref[]inlet usage="Usage:"^Sys.executable_name^" [word ...]"inArg.parse[] (funw -> words:= w ::!words) usage; words:=List.rev!words; (tryLwt_engine.set (newLwt_engine.libev())withLwt_sys.Not_available_ ->());Lwt_main.run (search_and_print!words)
#let maybe_raise=let should_fail=reffalseinfun() ->let will_fail=!should_failin should_fail:=not will_fail;let%lwt()=Lwt_unix.sleep0.5inif will_failthen [%lwt raiseExit]elseLwt.return_unit;;valmaybe_raise :unit ->unitLwt.t=<fun>#maybe_raise ();;- :unit= ()#maybe_raise ();;Exception:Pervasives.Exit.Raisedatfile"src/core/lwt.ml",line805,characters22-23Calledfromfile"src/unix/lwt_main.ml",line34,characters8-18Calledfromfile"toplevel/toploop.ml",line180,characters17-56
Note that I wrote[%lwt raise Exit]
rather thanLwt.fail Exit
. The Lwt manual states that the former will produce better backtraces than the latter[1]:
It allows to encode the old
raise_lwt <e>
as[%lwt raise <e>]
, ...
raise_lwt exn
which is the same as Lwt.fail exn but with backtrace support.
#lethandle_error()=trylet%lwt()= maybe_raise()inLwt.return"success"with_ ->Lwt.return"failure";;valhandle_error :unit ->stringLwt.t=<fun>#handle_error ();;- :string="success"#handle_error ();;Exception:Pervasives.Exit.Raisedatfile"src/core/lwt.ml",line805,characters22-23Calledfromfile"src/unix/lwt_main.ml",line34,characters8-18Calledfromfile"toplevel/toploop.ml",line180,characters17-56
#lethandle_error()=try%lwtlet%lwt()= maybe_raise()inLwt.return"success"with_ ->Lwt.return"failure";;valhandle_error :unit ->stringLwt.t=<fun>#handle_error ();;- :string="success"#handle_error ();;- :string="failure"
Although the manual does not state it explicitly,try%lwt ... with ...
appears to be intended to provide a better backtrace thanLwt.catch
.[1] For instance, thehandle_error
function is expanded to:
lethandle_error()=Lwt.backtrace_catch (funexn ->try raiseexnwith|exn ->exn) (fun() ->Lwt.backtrace_bind (funexn ->try raiseexnwith|exn ->exn) (maybe_raise()) (fun() ->Lwt.return"success")) (function|_ ->Lwt.return"failure")
Lwt does not have a concept corresponding to a monitor.
letquery_uri~serverquery=let base_uri=Uri.of_string (String.concat"" ["https://"; server;"/?format=json"])inUri.add_query_param base_uri ("q", [query])
letget_definition~serverword=try%lwtlet%lwt _resp, body=Cohttp_lwt_unix.Client.get (query_uri~server word)inlet%lwt body'=Cohttp_lwt_body.to_string bodyinLwt.return (word,Ok (get_definition_from_json body'))with_ ->Lwt.return (word,Error"Unexpected failure")
letprint_result (word,definition)=Lwt_io.printf"%s\n%s\n\n%s\n\n" word (String.init (String.length word) (fun_ ->'-')) (match definitionwith|Errors ->"DuckDuckGo query failed:"^ s|OkNone ->"No definition found"|Ok (Somedef) ->Format.pp_set_marginFormat.str_formatter70;Format.pp_print_textFormat.str_formatter def;Format.flush_str_formatter())
letsearch_and_print~serverswords=let servers=Array.of_list serversinlet%lwt results=Lwt_list.mapi_p (funiword ->let server= servers.(imodArray.length servers)in get_definition~server word) wordsinLwt_list.iter_s print_result resultslet()=let servers=ref ["api.duckduckgo.com"]and words=ref[]inlet options=Arg.align [ ("-servers",Arg.String (funs -> servers:=String.split_on_char',' s),"s1,...,sn Specify servers to connect to"); ]inlet usage="Usage:"^Sys.executable_name^" [-servers s1,...,sn] [word ...]"inArg.parse options (funw -> words:= w ::!words) usage; words:=List.rev!words; (tryLwt_engine.set (newLwt_engine.libev())withLwt_sys.Not_available_ ->());Lwt_main.run (search_and_print~servers:!servers!words)
#letbothxy=let%lwt x'= xand y'= yinLwt.return (x', y');;valboth :'aLwt.t ->'bLwt.t -> ('a*'b)Lwt.t=<fun>#letstring_and_float=both (let%lwt ()=Lwt_unix.sleep0.5inLwt.return"A") (let%lwt ()=Lwt_unix.sleep0.25inLwt.return32.33);;valstring_and_float : (string*float)Lwt.t=<abstr>#string_and_float;;- :string*float= ("A",32.33)
#Lwt.choose [ (let%lwt()=Lwt_unix.sleep0.5inLwt.return"half a second"); (let%lwt()=Lwt_unix.sleep10.inLwt.return"ten seconds"); ];;- :string="half a second"
#Lwt.pick;;- : 'aLwt.tlist -> 'aLwt.t=<fun>
letget_definition~serverword=try%lwtlet%lwt _resp, body=Cohttp_lwt_unix.Client.get (query_uri~server word)inlet%lwt body'=Cohttp_lwt_body.to_string bodyinLwt.return (word,Ok (get_definition_from_json body'))withexn ->Lwt.return (word,Errorexn)letget_definition_with_timeout~servertimeoutword=Lwt.pick [ (let%lwt()=Lwt_unix.sleep timeoutinLwt.return (word,Error"Timed out")); (let%lwt word, result= get_definition~server wordinlet result'=match resultwith|Ok_asx -> x|Error_ ->Error"Unexpected failure"inLwt.return (word, result')); ]letsearch_and_print~serverstimeoutwords=let servers=Array.of_list serversinlet%lwt results=Lwt_list.mapi_p (funiword ->let server= servers.(imodArray.length servers)in get_definition_with_timeout~server timeout word) wordsinLwt_list.iter_s print_result resultslet()=let servers=ref ["api.duckduckgo.com"]and timeout=ref5.0and words=ref[]inlet options=Arg.align [ ("-servers",Arg.String (funs -> servers:=String.split_on_char',' s),"s1,...,sn Specify servers to connect to"); ("-timeout",Arg.Set_float timeout,"secs Abandon queries that take longer than this time"); ]inlet usage="Usage:"^Sys.executable_name^" [-servers s1,...,sn] [-timeout secs] [word ...]"inArg.parse options (funw -> words:= w ::!words) usage; words:=List.rev!words; (tryLwt_engine.set (newLwt_engine.libev())withLwt_sys.Not_available_ ->());Lwt_main.run (search_and_print~servers:!servers!timeout!words)
Cohttp_lwt_unix.Client.get
does not take the labeled~interrupt
argument unlikeCohttp_async.Client.get
. However, the thread thatCohttp_lwt_unix.Client.get
returns iscancelable and can be naturally used withLwt.pick
.
#letrecrange?(acc =[])startstop=if start>= stopthenList.rev accelse range~acc:(start :: acc) (start+1) stop;;valrange :?acc:intlist ->int ->int ->intlist=<fun>#letdef=Lwt_preemptive.detach (fun () ->range110) ();;valdef :intlistLwt.t=<abstr>#def;;- :intlist= [1;2;3;4;5;6;7;8;9]
#letrecevery?(stop = never_terminate)span (f: unit -> unit Lwt.t) :unit Lwt.t=ifLwt.is_sleeping stopthenlet%lwt()= f()inlet%lwt()=Lwt.pick [Lwt_unix.sleep span;Lwt.protected stop]in every~stop span felseLwt.return_unit;;valevery :?stop:unitLwt.t ->float -> (unit ->unitLwt.t) ->unitLwt.t=<fun>#letlog_delaysthunk=letstart=Unix.gettimeofday ()inletprint_time ()=letdiff=Unix.gettimeofday ()-.startinLwt_io.printf"%f, "diffinletd=thunk ()inlet%lwt ()=every0.1~stop:dprint_timeinlet%lwt ()=dinlet%lwt ()=print_time ()inLwt_io.print"\n";;vallog_delays : (unit ->unitLwt.t) ->unitLwt.t=<fun>
# log_delays (fun() ->Lwt_unix.sleep0.5);;0.000006,0.101822,0.201969,0.306260,0.411472,0.505199,
#letbusy_loop()=let x=refNoneinfor i=1to500_000_000do x:=Some idone;;valbusy_loop :unit ->unit=<fun>#log_delays (fun () ->Lwt.return (busy_loop ()));;6.890156,- :unit= ()
# log_delays (fun() ->Lwt_preemptive.detach busy_loop());;0.000033,0.158420,0.264950,0.370093,0.475191,0.585002,0.685192,0.786619,0.894304,0.997954,1.103635,1.213693,1.316856,1.426929,1.583395,1.686367,1.786517,1.894609,1.998529,2.103606,2.208725,2.363542,2.571035,2.680959,2.945979,3.056136,3.161278,3.430440,3.531169,3.742274,3.847282,3.951309,4.114742,4.215642,4.315771,4.421812,4.530823,4.741970,4.848297,5.008062,5.114670,5.430785,5.535985,5.644637,5.802193,6.015593,6.226784,6.330944,6.546150,6.703104,6.806751,6.912780,6.992610,- :unit=()
#letnoallc_busy_loop()=for _i=0to500_000_000do()done;;valnoallc_busy_loop :unit ->unit=<fun>#log_delays (fun () ->Lwt_preemptive.detachnoallc_busy_loop ());;0.000010,0.137578,0.240112,0.345218,0.450686,0.555763,0.660168,0.766587,0.872521,0.977615,1.078819,1.184021,1.289587,1.394786,1.552426,1.657563,1.764036,1.922921,2.078783,2.287458,2.501932,2.663988,2.768908,2.978174,3.188819,3.297128,3.460475,3.568800,3.670217,3.803641,3.803730,- :unit= ()
1. It has beenreported that the backtrace mechanism appears not to work well with the recent versions of OCaml. For the present, the choice between the Ppx constructs and the regular functions (or operators) may be more a matter of style.
About
Lwt Translations of the Async Code Examples in Real World OCaml