- Notifications
You must be signed in to change notification settings - Fork3
Plugin for Apache Airflow to execute serverless tasks using Lithops
License
lithops-cloud/airflow-plugin
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This repository contains an Apache Airflow Plugin that implements new operators to easily deploy serverless functions tasks using Lithops.
Lithops is a Python multicloud library for running serverless jobs. Litops transparently runs local sequential code over thousands of serverless functions. This plugin allows Airflow to benefit from serverless functions to achieve higher performance for highly parallelizable tasks such as big data analysis workflows whithout consuming all the resources of the cluser where Airflow is running on or without having to provision Airflow workers using Celery executor.
- Apache Airflow:https://github.com/apache/airflow
This plugin provides three new operators.
Important note: Due to the way Airflow manages DAGs, the callables passed to the Lithops operators can not be declared in the DAG definition script. Instead, they must be declared inside a separate file or module. To access the functions from the DAG file, import them as regular modules.
LithopsCallAsyncOperator
It invokes a single function.
Parameter Description Default func Python callable mandatory data Key word arguments {}data_from_task Get the output from another task as an input parameter for this function NoneExample:
defadd(x,y):returnx+y
frommy_functionsimportaddmy_task=LithopsCallAsyncOperator(task_id='add_task',func=add,data={'x' :1,'y' :3},dag=dag,)
# Returns:4
frommy_functionsimportaddbasic_task=LithopsCallAsyncOperator(task_id='add_task_2',func=add,data={'x' :4},data_from_task={'y' :'add_task_1'},dag=dag,)
# Returns:8
LithopsMapOperator
It invokes multiple parallel tasks, as many as how much data is in parameter
map_iterdata. It applies the functionmap_functionto every element inmap_iterdata:Parameter Description Default Type map_function Python callable. mandatory callablemap_iterdata Iterable. Invokes a function for every element in iterdatamandatory Has to be iterable iterdata_form_task Gets the input iterdata from another function's output NoneHas to be iterable extra_params Adds extra key word arguments to map function's signature Nonedictchunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk) Noneintchunk_n Splits the object in N chunks (on invocation per chunk) Noneintremote_invocation Activates pywren's remote invocation functionality False boolinvoke_pool_threads Number of threads to use to invoke 500intExample:
defadd(x,y):returnx+y
frommy_functionsimportaddmap_task=LithopsMapOperator(task_id='map_task',map_function=add,map_iterdata=[1,2,3],extra_params={'y' :1},dag=dag,)
# Returns:[2,3,4]
LithopsMapReduceOperator
It invokes multiple parallel tasks, as many as how much data is in parameter
map_iterdata. It applies the functionmap_functionto every element initerdata. Finally, a singlereduce_functionis invoked that gathers all the map results.Parameter Description Default Type map_function Python callable. mandatory callablemap_iterdata Iterable. Invokes a function for every element in iterdatamandatory Has to be iterable reduce_function Python callable. mandatory callableiterdata_form_task Gets the input iterdata from another function's output NoneHas to be iterable extra_params Adds extra key word arguments to map function's signature Nonedictmap_runtime_memory Memory to use to run the map functions Loaded from config intreduce_runtime_memory Memory to use to run the reduce function Loaded from config intchunk_size Splits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk). 'None' for processing the whole file in one function activation Noneintchunk_n Splits the object in N chunks (on invocation per chunk). 'None' for processing the whole file in one function activation Noneintremote_invocation Activates pywren's remote invocation functionality False boolinvoke_pool_threads Number of threads to use to invoke 500intreducer_one_per_object Set one reducer per object after running the partitioner Falseboolreducer_wait_local Wait for results locally FalseboolExample:
defadd(x,y):returnx+ydefmult_array(results):result=1forninresults:result*=2returnresult
frommy_functionsimportadd,multmapreduce_task=LithopsMapReduceOperator(task_id='mapreduce_task',map_function=add,reduce_funtion=mul,map_iterdata=[1,2,3],extra_params={'y' :1},dag=dag,)
# Returns:18
All operators inherit a common PyWren operator that has the following parameters:
| Parameter | Description | Default | Type |
|---|---|---|---|
| lithops_config | Lithops config, as a dictionary | {} | dict |
| async_invoke | Invokes functions asynchronously, does not wait to function completion | False | bool |
| get_result | Downloads results upon completion | True | bool |
| clean_data | Deletes PyWren metadata from COS | False | bool |
| extra_env | Adds environ variables to function's runtime | None | dict |
| runtime_memory | Runtime memory, in MB | 256 | int |
| timeout | Time that the functions have to complete their execution before raising a timeout | Default from config | int |
| include_modules | Explicitly pickle these dependencies | [] | list |
| exclude_modules | Explicitly keep these modules from pickled dependencies | [] | list |
About
Plugin for Apache Airflow to execute serverless tasks using Lithops
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors3
Uh oh!
There was an error while loading.Please reload this page.