Par_incr - Parallel Self Adjusting Computations
A simple library for parallel incremental computations. Based onEfficient Parallel Self-Adjusting Computation. The documentation existshere
How it works
with certain values.Perform
operation onVar.t
and change it toincremental
signifies a computation in itself.Use different combinators provided by the library on the
s and make even biggerincremental
s.Obtain value of a certain
by running it (arun
operation is provided by the library).To run an
, we must pass anexecutor
to therun
function. The executor is the thing that allows for parallelism.executor
is a record with two fields:run
. More on this can be found in thedocumentationRunning an
'a incremental
returns a'a computation
.When we change some
(done withVar.set
operation), it marks all dependent computations dirty.Running
operation on a dirtycomputation
updates its value efficiently.Destroy (with
when its no more required.
Add 1
This is a simple incremental computation which increments value by 1. It can be easily done by using themap
function provided by the library.
# #require "par_incr"# open Par_incr
We'll start off creating aVar.t
with value 10
# let x = Var.create 10val x : int Var.t = <abstr>
We can usemap
to make a computation depending onx
, which basically returnsx+1
# let x_plus_1 = ~fn:(fun x -> x+1) ( x)val x_plus_1 : int t = <abstr>
As mentioned inHow It Works at the start, we need to create anexecutor
to pass to therun
function. Here we are creating a simple executor which doesn't do anything parallely(hence named seq_executor). You would implement thisexecutor
differently if you wanted parallelism.
# let seq_executor = { run = (fun f -> f()); par_do=(fun l r -> let lres = l() in (lres, r()) )}val seq_executor : executor = {run = <fun>; par_do = <fun>}
Now werun
to get it's value and record all the computations involved in getting the value as well.
# let x_plus_1_comp = ~executor:seq_executor x_plus_1val x_plus_1_comp : int computation = <abstr>
To get the value(type'a
) out of'a computation
, we just callvalue
on it.
# Par_incr.value x_plus_1_comp- : int = 11
We'll change the input value(in our casex
) and see what happens. On changing the input to any computation, we must runpropagate
to update the output.propagate
is clever enough to not do any work if there's no need to(i.e in case some inputs change but the final output doesn't change,propagate
will make sure to not do any extra work and stop as soon as possible).
# Var.set x 20 (* Change value of x to 20*)- : unit = ()# Par_incr.propagate x_plus_1_comp (* Propagate changes*)- : unit = ()# Par_incr.value x_plus_1_comp- : int = 21# Par_incr.propagate x_plus_1_comp (* Propagating when there's no changes will do nothing*)- : unit = ()# Par_incr.value x_plus_1_comp- : int = 21
Since we are done with the computation now, we should destroy it (withdestroy_comp
# Par_incr.destroy_comp x_plus_1_comp- : unit = ()
Running propagate on a destroyed computation will raise an exception
# Par_incr.propagate x_plus_1_compException: Failure "Cannot propagate destroyed/ill-formed computation".
Adding 3 numbers
This example demonstratesmap2
and shows the usage of some of the convenient syntax/operators exposed by theSyntax
module of the library.
# let a = Var.create 10val a : int Var.t = <abstr># let b = Var.create 20val b : int Var.t = <abstr># let c = Var.create 30val c : int Var.t = <abstr>
We can usemap2
provided by the library to adda
together. There's a convenient syntax to achieve the same thing which can be used by opening theSyntax
module. You can see that they give the same results.
# let abc_sum = Par_incr.map2 ~fn:(Int.add) ( c) (Par_incr.map2 ~fn:(Int.add) ( a) ( b))val abc_sum : int t = <abstr># let abc_sum' =(*If we open Syntax module, we can write this more cleanly*) let open Par_incr.Syntax in let+ a = a and+ b = b and+ c = c in a + b + cval abc_sum' : int t = <abstr># let abc_sum_comp = ~executor:seq_executor abc_sumval abc_sum_comp : int computation = <abstr># let abc_sum'_comp = ~executor:seq_executor abc_sum'val abc_sum'_comp : int computation = <abstr># assert (Par_incr.value abc_sum_comp = Par_incr.value abc_sum'_comp)- : unit = ()
On changing the inputs, we can see that the output is updated accordingly. The below code snippet shows off the operators exposed byVar.Syntax
module as well.
# Var.set a 40- : unit = ()# Par_incr.propagate abc_sum'_comp- : unit = ()# Par_incr.value abc_sum'_comp- : int = 90# let () = (*Var.Syntax module provides some convenient operators*) let open Var.Syntax in (*Equivalent to Var.set b (Var.value b + Var.value b) *) b := (!b) + (!b);# Par_incr.propagate abc_sum'_comp; Par_incr.propagate abc_sum_comp- : unit = ()# Par_incr.value abc_sum'_comp- : int = 110# assert (Par_incr.value abc_sum'_comp = Par_incr.value abc_sum_comp )- : unit = ()# Par_incr.destroy_comp abc_sum_comp; Par_incr.destroy_comp abc_sum'_comp- : unit = ()
Exploiting Parallelism
This examples shows usage ofDomainslib to parallelize incremental computations. In this example as well, there's use of the nice syntax/operators that theSyntax
module provides.
# #require "domainslib"# module T = Domainslib.Taskmodule T = Domainslib.Task
We start off by creating a parallelexecutor
since we want to actually run things in parallel this time. This does require somedomainslib
knowledge but other than that this is pretty easy to understand.
# let get_par_executor ~num_domains () = (* A useful function to give us a parallel executor*) let pool = T.setup_pool ~num_domains () in let par_runner f = pool f in let par_do l r = let lres = T.async pool l in let rres = r () in (T.await pool lres, rres) in (pool, {run = par_runner; par_do})val get_par_executor : num_domains:int -> unit -> T.pool * executor = <fun># let pool, par_executor = get_par_executor ~num_domains:4 ()val pool : T.pool = <abstr>val par_executor : executor = {run = <fun>; par_do = <fun>}
In thesum_range
function, we're dividing the array in half at each level and computing the sum of both halves in parallel. There's multiple ways to write this function and alternative ways are shown as part of the comments.
# let rec sum_range ~lo ~hi xs = Par_incr.delay @@ fun () -> if hi - lo = 1 then begin xs.(lo) end else let mid = lo + ((hi - lo) asr 1) in let open Par_incr.Syntax in (*Using let+ and and+ instead of let& and and& would make this sequential*) let& lhalf = sum_range ~lo ~hi:mid xs and& rhalf = sum_range ~lo:mid ~hi xs in lhalf + rhalf (*Can be written alternatively as: Par_incr.map2 ~mode:`Par ~fn:(Int.add) (sum_range ~lo ~hi:mid xs) (sum_range ~lo:mid ~hi xs) or let res = par ~left:(sum_range ~lo ~hi:mid xs) ~right:(sum_range ~lo:mid ~hi xs) in map ~fn:(fun (x,y) -> Int.add x y) *)val sum_range : lo:int -> hi:int -> int t array -> int t = <fun>
We'll define the array we want to sum here and run the computation withpar_executor
defined above. The example demonstrates input change and propagation as well.
# let arr = Var.create [|1;2;3;4;5;6;7;8;9;10|]val arr : int Var.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]# let t_arr = arrval t_arr : int t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]# let arr_sum = sum_range ~lo:0 ~hi:(Array.length t_arr) t_arrval arr_sum : int t = <abstr># let arr_sum_comp = run ~executor:par_executor arr_sumval arr_sum_comp : int computation = <abstr># Par_incr.value arr_sum_comp- : int = 55# Var.set arr.(0) 11- : unit = ()# Par_incr.propagate arr_sum_comp; Par_incr.value arr_sum_comp- : int = 65# Par_incr.destroy_comp arr_sum_comp- : unit = ()
Filtering a Cons List
Although this example is very inefficient, it helps us realize why we need thebind
operation. All other operations only builds static computation trees, but for writing complex computations, like an incremental filter which adapts to changes in the list, we need dynamism. We get that withbind
. In the example, we uselet*
operator which is just a syntactic sugar forbind
provided bySyntax
First off, we define some helper functions.
# let rec to_var_list xs = (*Helper function*) Var.create ( match xs with | [] -> `Nil | x :: xs -> `Cons (x, to_var_list xs) )val to_var_list : 'a list -> ([> `Cons of 'a * 'b | `Nil ] Var.t as 'b) = <fun># let rec to_incr_list xs = (*Helper function*) let open Par_incr.Syntax in let+ l = xs in (*map operation*) match l with | `Nil -> `Nil | `Cons(x,xs) -> `Cons(x, to_incr_list xs)val to_incr_list : ([< `Cons of 'b * 'a | `Nil ] Var.t as 'a) -> ([> `Cons of 'b * 'c | `Nil ] t as 'c) = <fun>
function is then defined. It bindsxs
and returns another incremental based on whatxs
was. The reason we need to usebind
here instead of something likemap
is because the tail of the list is also fully dynamic and we want the tail to be computed incrementally as well. If we're using map, the~fn
passed to map is expected to be pure (pure, in our case, can be defined to be something doesn't use anything that isincremental
in its body). The behaviour is not well-defined if~fn
is not pure.
# let rec filter predicate xs = let open Par_incr.Syntax in let* l= xs in (*Syntactic sugar for bind*) match l with | `Nil -> Par_incr.return [] | `Cons (x, xs) -> let xs = filter predicate xs in if predicate x then (let* xs = xs in Par_incr.return (x::xs)) (*Can also be written as: bind xs (fun xs -> return (x::xs)) *) else xsval filter : ('a -> bool) -> ([< `Cons of 'a * 'b | `Nil ] t as 'b) -> 'a list t = <fun>
We can seefilter
does indeed work as expected.
# let var_list = to_var_list [2;3;5]val var_list : _[> `Cons of int * 'a | `Nil ] Var.t as 'a = <abstr># let incr_list = to_incr_list var_listval incr_list : _[> `Cons of int * 'a | `Nil ] t as 'a = <abstr># let res_list = filter (fun x -> x mod 2 = 1) incr_listval res_list : int list t = <abstr># let filter_comp = ~executor:seq_executor res_listval filter_comp : int list computation = <abstr># Par_incr.value filter_comp- : int list = [3; 5]# let () = let open Var.Syntax in (*Let's change first element to 5 and see what happens*) match !var_list with | `Nil -> failwith "Impossible" | `Cons(x,xs) -> var_list:= `Cons(5, xs)# Par_incr.propagate filter_comp; Par_incr.value filter_comp- : int list = [5; 3; 5]# Par_incr.destroy_comp filter_comp- : unit = ()# T.teardown_pool pool (*Teardown the domainslib Task pool we created before*)- : unit = ()
Dev Dependencies (6)
- odoc
- incremental
>= "v0.15.0" & with-test
- current_incr
>= "0.6.1" & with-test
- mdx
>= "2.3.0" & with-test
- alcotest
>= "1.7.0" & with-test
- domainslib
>= "0.5.0" & with-test
