Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up

Flow-Based Programming framework for Elixir

License

NotificationsYou must be signed in to change notification settings

antonmi/flowex

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build StatusHex.pm

Railway Flow-Based Programming.

The library is not supported anymore, see theALF project.

Flowex is a set of abstractions built on top Elixir GenStage which allows writing program withFlow-Based Programming paradigm.

I would say it is a mix of FBP and so-calledRailway Oriented Programming (ROP) approach.

Flowex DSL allows you to easily create "pipelines" of Elixir GenStages.

Dedicated to my lovely girlfriend Chryścina.

Resources

Contents

Installation

Just addflowex as dependency to themix.exs file.

A simple example to get the idea

Let's consider a simple program which receives a number as an input, then adds one, then multiplies the result by two and finally subtracts 3.

defmoduleFunctionsdodefadd_one(number),do:number+1defmult_by_two(number),do:number*2defminus_three(number),do:number-3enddefmoduleMainModuledodefrun(number)donumber|>Functions.add_one|>Functions.mult_by_two|>Functions.minus_threeendend

So the program is a pipeline of functions with the same interface. The functions are very simple in the example.

In the real world they can be something likevalidate_http_request,get_user_from_db,update_db_from_request andrender_response.Furthermore, each of the function can potentially fail. But for getting the idea let's stick the simplest example.

FBP defines applications as networks of "black box" processes, which exchange data across predefined connections by message passing.

To satisfy the FBP approach we need to place each of the function into a separate process. So the number will be passed from 'add_one' process to 'mult_by_two' and then 'minus_three' process which returns the final result.

That, in short, is the idea of Flowex!

More complex example for understanding interface

Let's define a more strict interface for our function.So each of the function will receive a predefined struct as a first argument and will return a map:

defadd_one(%{number:number},opts)do%{number:number+1,a:opts.a}end

The function receives a structure withnumber field and the options map with fielda and returns map with new number.The second argument is a set of options and will be described later.Let's rewrite the wholeFunctions module in the following way:

defmoduleFunctionsdodefstructnumber:nil,a:nil,b:nil,c:nildefadd_one(%{number:number},%{a:a})do%{number:number+1,a:a}enddefmult_by_two(%{number:number},%{b:b})do%{number:number*2,b:b}enddefminus_three(%{number:number},%{c:c})do%{number:number-3,c:c}endend

The module defines three functions with the similar interface.We also defined as struct%Functions{} which defines a data-structure being passed to the functions.

The main module may look like:

defmoduleMainModuledodefrun(number)doopts=%{a:1,b:2,c:3}%Functions{number:number}|>Functions.add_one(opts)|>Functions.mult_by_two(opts)|>Functions.minus_three(opts)endend

Flowex magic!

Let's add a few lines at the beginning.

defmoduleFunPipelinedouseFlowex.Pipelinepipe:add_onepipe:mult_by_twopipe:minus_threedefstructnumber:nil,a:nil,b:nil,c:nildefadd_one(%{number:number},%{a:a})do%{number:number+1,a:a}end# mult_by_two and minus_three definitions skippedend

We also renamed the module toFunPipeline because we are going to create "Flowex pipeline".Flowex.Pipeline extend our module, so we have:

  • pipe macro to define which function evaluation should be placed into separate GenStage;
  • error_pipe macro to define function which will be called if error occurs;
  • start,supervised_start andstop functions to create and destroy pipelines;
  • call function to run pipeline computations synchronously.
  • cast function to run pipeline computations asynchronously.
  • overridableinit function which, by default, acceptsopts and return them

Let's start a pipeline:

opts=%{a:1,b:2,c:3}pipeline=FunPipeline.start(opts)#returns%Flowex.Pipeline{in_name::"Flowex.Producer_#Reference<0.0.7.504>",module:FunPipeline,out_name::"Flowex.Consumer_#Reference<0.0.7.521>",sup_pid:#PID<0.136.0>}

What happened:

  • Three GenStages have been started - one for each of the function in pipeline. Each of GenStages is:producer_consumer;
  • One additional GenStage for error processing has been started (it is also:producer_consumer);
  • 'producer' and 'consumer' GenStages for input and output have been added;
  • All the components have been placed under Supervisor.

The next picture shows what the 'pipeline' is.alt text

Thestart function returns a%Flowex.Pipeline{} struct with the following fields:

  • module - the name of the module
  • in_name - unique name of 'producer';
  • out_name - unique name of 'consumer';
  • sup_name - unique name of the pipeline supervisor

Note, we have passed options tostart function. This options will be passed to each function of the pipeline as a second argument.There issupervised_start function which allows to place pipeline's under external supervisor.See details inStarting strategies section.

Run the pipeline

One can run calculations in pipeline synchronously and asynchronously:

  • call function to run pipeline computations synchronously.
  • cast function to run pipeline computations asynchronously.

FunPipeline.call/2 function receive a%Flowex.Pipeline{} struct as a first argument and must receive a%FunPipeline{} struct as a second one.Thecall function returns a %FunPipeline{} struct.

FunPipeline.call(pipeline,%FunPipeline{number:2})# returns%FunPipeline{a:1,b:2,c:3,number:3}

As expected, pipeline returned%FunPipeline{} struct withnumber: 3.a,b andc were set from options.

If you don't care about the result, you should usecast/2 function to run and forget.

FunPipeline.cast(pipeline,%FunPipeline{number:2})# returns:ok

Run via client

Another way is usingFlowex.Client module which implements GenServer behavior.TheFlowex.Client.start\1 function receives pipeline struct as an argument.Then you can usecall/2 function orcast/2. See example below:

{:ok,client_pid}=Flowex.Client.start(pipeline)Flowex.Client.call(client_pid,%FunPipeline{number:2})# returns%FunPipeline{a:1,b:2,c:3,number:3}#orFlowex.Client.cast(client_pid,%FunPipeline{number:2})# returns:ok

How it works

The following figure demonstrates the way data follows:alt textNote:error_pipe is not on the picture in order to save place.

The things happen when you callFlowex.Client.call (synchronous):

  • self process makes synchronous call to the client gen_server with%FunPipeline{number: 2} struct;
  • the client makes synchronous call 'FunPipeline.call(pipeline, %FunPipeline{number: 2})';
  • the struct is wrapped into%Flowex.IP{} struct and begins its asynchronous journey from one GenStage to another;
  • when the consumer receives the Information Packet (IP), it sends it back to the client which sends it back to the caller process.

The things happen when youcast pipeline (asynchronous):

  • self process makescast call to the client and immediately receives:ok
  • the client makescast to pipeline;
  • the struct is wrapped into%Flowex.IP{} struct and begins its asynchronous journey from one GenStage to another;
  • consumer does not send data back, because this iscast

Error handling

What happens when error occurs in some pipe?

The pipeline behavior is like Either monad. If everything ok, each 'pipe' function will be called one by one and result data will skip the 'error_pipe'.But if error happens, for example, in the first pipe, the:mult_by_two and:minus_three functions will not be called.IP will bypass to the 'error_pipe'. If you don't specify 'error_pipe' flowex will add the default one:

defhandle_error(error,_struct,_opts)doraiseerrorend

which just raises an exception.

To specify the 'error' function useerror_pipe macro:

defmoduleFunPipelinedouseFlowex.Pipeline# ...error_pipe:if_errordefif_error(error,struct,opts)do# error is %Flowex.PipeError{} structure# with :message, :pipe, and :struct fields%{number::oops}end#...end

You can specify only one error_pipe!Note: The 'error_pipe' function accepts three arguments.The first argument is a%Flowex.PipeError{} structure which has the following fields:

  • :message - error message;
  • :pipe - is{module, function, opts} tuple containing info about the pipe where error occured;
  • :struct - the input of the pipe.

Pipeline and pipe options

In addition to specifying options when starting pipeline one can pass component's options to thepipe macro.And remember about pipeline'sinit function which can add or override options.The flow is the following:The options passed tostart function are available in pipelineinit function. The function can merge additional options. Thenopts passed topipe macro are merged.So there are three levels that options pass before appearing in component:

  • pipelinestart function;
  • pipelineinit function;
  • pipeopts.

Let's consider an example:

defmoduleInitOptsFunPipelinedouseFlowex.Pipelinedefstruct[:from_start,:from_init,:from_opts]pipe:component,opts:%{from_opts:3}definit(opts)do# opts passed to start function is available hereMap.put(opts,:from_init,2)enddefcomponent(_data,opts)do# here all the options is availableoptsendend

Suppose we've started the pipeline with options%{from_start: 1}.init function adds:from_init option. Then:from_opts are merged.

The test below illustrates what is going on:

describe"function pipeline"dolet:pipeline,do:InitOptsFunPipeline.start(%{from_start:1})let:result,do:InitOptsFunPipeline.call(pipeline(),%InitOptsFunPipeline{})it"returns values from different init functions"doexpect(result())|>to(eq%InitOptsFunPipeline{from_start:1,from_init:2,from_opts:3})endend

Synchronous and asynchronous calls

Note, thatcall function on pipeline module orFlowex.Client is synchronous. While communication inside the pipeline is asynchronous:alt textOne might think that there is no way to effectively use the pipeline viacall/2 method.

That's not true!

In order to send a large number of IP's and process them in parallel one can use several clients connected to the pipeline:alt text

Bottlenecks

Each component of pipeline takes a some to finish IP processing. One component does simple work, another can process data for a long time.So if several clients continuously push data they will stack before the slowest component. And data processing speed will be limited by that component.

Flowex has a solution! One can define a number of execution processes for each component.

defmoduleFunPipelinedouseFlowex.Pipelinepipe:add_one,count:1pipe:mult_by_two,count:3pipe:minus_three,count:2error_pipe:if_error,count:2# ...end

And the pipeline will look like on the figure below:alt text

Module pipes

One can create reusable 'pipe' - module which implements init and call functions.Each module must define a struct it works with. Only fields defined it the stuct will be passed tocall function.

defmoduleModulePipelinedouseFlowex.Pipelinedefstruct[:number,:a,:b,:c]pipeAddOne,count:1pipeMultByTwo,count:3pipeMinusThree,count:2error_pipeIfError,count:2end#pipesdefmoduleAddOnedodefstruct[:number]definit(opts)do%{opts|a::add_one}enddefcall(%{number:number},%{a:a})do%{number:number+1,a:a}endenddefmoduleMultByTwododefstruct[:number]definit(opts)do%{opts|b::mult_by_two}enddefcall(%{number:number},%{b:b})do%{number:number*2,b:b}endenddefmoduleMinusThreedodefstruct[:number]definit(opts)do%{opts|c::minus_three}enddefcall(%{number:number},%{c:c})do%{number:number-3,c:c}endenddefmoduleIfErrordodefstruct[:number]definit(opts),do:optsdefcall(error,%{number:_number},_opts)do%{number:error}endend

Of course, one can combine module and functional 'pipes'!

Data available in pipes

If your pipeline consists of function pipes only, each function will receive pipeline struct as an input.The situation is a little more complex with module pipes.Each module defines its own struct and data will be cast to that struct.Map returned from thecall function will be merged to the previos data.Let's consider an example:

defmoduleDataAvailabledouseFlowex.Pipelinedefstruct[:top,:c1,:foo]pipeComponent1pipe:component2pipeComponent3defcomponent2(%__MODULE__{top:top},_opts)do%{top:top+2,c3:2}endenddefmoduleComponent1dodefstruct[:top,:c1]definit(opts),do:optsdefcall(%__MODULE__{c1:c1,top:top},_opts)do%{top:top+c1,bar::baz}endenddefmoduleComponent3dodefstruct[:c3,:top]definit(opts),do:optsdefcall(%__MODULE__{c3:c3,top:top},_opts)do%{top:top+c3,c3:top-c3,foo::set_foo}endend

And suppose we passed%DataAvailable{top: 100, c1: 1} toDataAvailable.call function.

Data in IP before calling first pipe is%{c1: 1, foo: nil, top: 100}.Before entering the first pipe the data will be cast to%Component1{c1: 1, top: 100}.The returned value of first pipe is merged to IP data, so the data is%{bar: :baz, c1: 1, foo: nil, top: 101}.

Functioncomponent2 receives%DataAvailable{c1: 1, foo: nil, top: 101} structure and returned value%{c3: 2, top: 103} is merged with previous data,so IP data is%{bar: :baz, c1: 1, c3: 2, foo: nil, top: 103}

Last component receives%Component3{c3: 2, top: 103}, returns%{c3: 101, foo: :set_foo, top: 105} and data is%{bar: :baz, c1: 1, c3: 101, foo: :set_foo, top: 105}.Before returning data from pipeline they are casted toDataAvailable structure, so final result is%DataAvailable{c1: 1, foo: :set_foo, top: 105}}

Starting strategies

Usingstart/1 function one can start pipelines in any process. Pipelines will be alive while the process is alive.Thesupervised_start function accepts supervisorpid as the first argument andopts as the second argument.And starts pipeline's supervisor under predefined supervisor process.

In general there are three ways to start pipelines in your project:

  1. Start pipelines in arbitrary supervised process:
defmodulePipelineGenServerdouseGenServerdefinit(_opts)dopipeline_one=PipelineOne.startpipeline_two=PipelineTwo.start{:ok,%{pipeline_one:pipeline_one,pipeline_two:pipeline_two}}endend

You can also store pipeline structure in Agent or Application environment.

  1. Start one pipeline per application. In that case pipeline supervisor will be the main supervisor in the application:
defmoduleOnePipelinePerAppdouseApplicationdefstart(_type,_opts)dopipeline=PipelineOne.startApplication.put_env(:start_flowex,:pipeline,pipeline){:ok,pipeline.sup_pid}endend
  1. Start several pipelines inside one application usingsupervised_start function. In that case pipeline supervisors will be placed under application supervisor:
defmoduleTwoPipelinesPerAppdouseApplicationdefstart(_type,_opts)do{:ok,supervisor_pid}=Supervisor.start_link([],strategy::one_for_one,name::multi_flowex_sup)pipeline_one=PipelineOne.supervised_start(supervisor_pid)pipeline_two=PipelineTwo.supervised_start(supervisor_pid)Application.put_env(:start_flowex,:pipeline_one,pipeline_one)Application.put_env(:start_flowex,:pipeline_two,pipeline_two){:ok,supervisor_pid}endend

You can find the examples in'Start-Flowex' project

Debugging with Flowex.Sync.Pipeline

If you are faced with some error that is hard to debug or an error that causes GenServers to crash, you may find theFlowex.Sync.Pipeline module useful.Adding oneSync word will completely change the behavior.

defmoduleFunPipelinedouseFlowex.Sync.Pipeline# The same code as before# ...end

Interface remains the same but all the code will be evaluated in one simple GenServer.So all you pipes will be evaluated synchronously in separate process.Use this option only for debug purposes.

Contributing

Contributions are welcome and appreciated!

Request a new feature by creating an issue.

Create a pull request with new features or fixes.

Flowex is tested using ESpec. So run:

mix espec

About

Flow-Based Programming framework for Elixir

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp