Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Plugin for Apache Airflow to execute serverless tasks using Lithops

License

NotificationsYou must be signed in to change notification settings

lithops-cloud/airflow-plugin

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This repository contains an Apache Airflow Plugin that implements new operators to easily deploy serverless functions tasks using Lithops.

Lithops is a Python multicloud library for running serverless jobs. Litops transparently runs local sequential code over thousands of serverless functions. This plugin allows Airflow to benefit from serverless functions to achieve higher performance for highly parallelizable tasks such as big data analysis workflows whithout consuming all the resources of the cluser where Airflow is running on or without having to provision Airflow workers using Celery executor.

Contents

  1. Installation
  2. Usage
  3. Examples

Usage

Operators

This plugin provides three new operators.


Important note: Due to the way Airflow manages DAGs, the callables passed to the Lithops operators can not be declared in the DAG definition script. Instead, they must be declared inside a separate file or module. To access the functions from the DAG file, import them as regular modules.


  • LithopsCallAsyncOperator

    It invokes a single function.

    ParameterDescriptionDefault
    funcPython callablemandatory
    dataKey word arguments{}
    data_from_taskGet the output from another task as an input parameter for this functionNone

    Example:

    defadd(x,y):returnx+y
    frommy_functionsimportaddmy_task=LithopsCallAsyncOperator(task_id='add_task',func=add,data={'x' :1,'y' :3},dag=dag,)
    # Returns:4
    frommy_functionsimportaddbasic_task=LithopsCallAsyncOperator(task_id='add_task_2',func=add,data={'x' :4},data_from_task={'y' :'add_task_1'},dag=dag,)
    # Returns:8
  • LithopsMapOperator

    It invokes multiple parallel tasks, as many as how much data is in parametermap_iterdata. It applies the functionmap_function to every element inmap_iterdata:

    ParameterDescriptionDefaultType
    map_functionPython callable.mandatorycallable
    map_iterdataIterable. Invokes a function for every element initerdatamandatoryHas to be iterable
    iterdata_form_taskGets the input iterdata from another function's outputNoneHas to be iterable
    extra_paramsAdds extra key word arguments to map function's signatureNonedict
    chunk_sizeSplits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk)Noneint
    chunk_nSplits the object in N chunks (on invocation per chunk)Noneint
    remote_invocationActivates pywren's remote invocation functionalityFalsebool
    invoke_pool_threadsNumber of threads to use to invoke500int

    Example:

    defadd(x,y):returnx+y
    frommy_functionsimportaddmap_task=LithopsMapOperator(task_id='map_task',map_function=add,map_iterdata=[1,2,3],extra_params={'y' :1},dag=dag,)
    # Returns:[2,3,4]
  • LithopsMapReduceOperator

    It invokes multiple parallel tasks, as many as how much data is in parametermap_iterdata. It applies the functionmap_function to every element initerdata. Finally, a singlereduce_function is invoked that gathers all the map results.

    ParameterDescriptionDefaultType
    map_functionPython callable.mandatorycallable
    map_iterdataIterable. Invokes a function for every element initerdatamandatoryHas to be iterable
    reduce_functionPython callable.mandatorycallable
    iterdata_form_taskGets the input iterdata from another function's outputNoneHas to be iterable
    extra_paramsAdds extra key word arguments to map function's signatureNonedict
    map_runtime_memoryMemory to use to run the map functionsLoaded from configint
    reduce_runtime_memoryMemory to use to run the reduce functionLoaded from configint
    chunk_sizeSplits the object in chunks, and every chunk gets this many bytes as input data (on invocation per chunk). 'None' for processing the whole file in one function activationNoneint
    chunk_nSplits the object in N chunks (on invocation per chunk). 'None' for processing the whole file in one function activationNoneint
    remote_invocationActivates pywren's remote invocation functionalityFalsebool
    invoke_pool_threadsNumber of threads to use to invoke500int
    reducer_one_per_objectSet one reducer per object after running the partitionerFalsebool
    reducer_wait_localWait for results locallyFalsebool

    Example:

    defadd(x,y):returnx+ydefmult_array(results):result=1forninresults:result*=2returnresult
    frommy_functionsimportadd,multmapreduce_task=LithopsMapReduceOperator(task_id='mapreduce_task',map_function=add,reduce_funtion=mul,map_iterdata=[1,2,3],extra_params={'y' :1},dag=dag,)
    # Returns:18

Inherited parameters

All operators inherit a common PyWren operator that has the following parameters:

ParameterDescriptionDefaultType
lithops_configLithops config, as a dictionary{}dict
async_invokeInvokes functions asynchronously, does not wait to function completionFalsebool
get_resultDownloads results upon completionTruebool
clean_dataDeletes PyWren metadata from COSFalsebool
extra_envAdds environ variables to function's runtimeNonedict
runtime_memoryRuntime memory, in MB256int
timeoutTime that the functions have to complete their execution before raising a timeoutDefault from configint
include_modulesExplicitly pickle these dependencies[]list
exclude_modulesExplicitly keep these modules from pickled dependencies[]list

License

Apache 2 license

Releases

No releases published

Packages

No packages published

Contributors3

  •  
  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp