Sahara’s Elastic Data Processing facility orEDP allows the executionof jobs on clusters created from sahara. EDP supports:
The EDP features can be used from the sahara web UI which is described in theSahara (Data Processing) UI User Guide.
The EDP features also can be used directly by a client through theREST api
Sahara EDP uses a collection of simple objects to define and execute jobs.These objects are stored in the sahara database when they are created,allowing them to be reused. This modular approach with database persistenceallows code and data to be reused across multiple jobs.
The essential components of a job are:
These components are supplied through the objects described below.
AJob Binary object stores a URL to a single script or Jar file andany credentials needed to retrieve the file. The file itself may be storedin the sahara internal database (but it is deprecated now), in swift,or in manila.
deprecated: Files in the sahara database are stored as raw bytes in aJob Binary Internal object. This object’s sole purpose is to store afile for later retrieval. No extra credentials need to be supplied for filesstored internally.
Sahara requires credentials (username and password) to access files stored inswift unless swift proxy users are configured as described inSahara Advanced Configuration Guide. The swift service must berunning in the same OpenStack installation referenced by sahara.
To reference a binary file stored in manila, create the job binary with theURLmanila://{share_id}/{path}. This assumes that you have already storedthat file in the appropriate path on the share. The share will beautomatically mounted to any cluster nodes which require access to the file,if it is not mounted already.
There is a configurable limit on the size of a single job binary that may beretrieved by sahara. This limit is 5MB and may be set with thejob_binary_max_KB setting in thesahara.conf configuration file.
AJob object specifies the type of the job and lists all of theindividual Job Binary objects that are required for execution. An individualJob Binary may be referenced by multiple Jobs. A Job object specifies a mainbinary and/or supporting libraries depending on its type:
Job type Main binary Libraries Hiverequired optional Pigrequired optional MapReducenot used required MapReduce.Streamingnot used optional Javanot used required Shellrequired optional Sparkrequired optional Stormrequired not used StormPyelusrequired not used
AData Source object stores a URL which designates the location ofinput or output data and any credentials needed to access the location.
Sahara supports data sources in swift. The swift service must be running inthe same OpenStack installation referenced by sahara.
Sahara also supports data sources in HDFS. Any HDFS instance running on asahara cluster in the same OpenStack installation is accessible withoutmanual configuration. Other instances of HDFS may be used as well providedthat the URL is resolvable from the node executing the job.
Sahara supports data sources in manila as well. To reference a path on an NFSshare as a data source, create the data source with the URLmanila://{share_id}/{path}. As in the case of job binaries, the specifiedshare will be automatically mounted to your cluster’s nodes as needed toaccess the data source.
Some job types require the use of data source objects to specify input andoutput when a job is launched. For example, when running a Pig job the UI willprompt the user for input and output data source objects.
Other job types like Java or Spark do not require the user to specify datasources. For these job types, data paths are passed as arguments. Forconvenience, sahara allows data source objects to be referenced by name or id.The sectionUsing Data Source References as Arguments gives furtherdetails.
Job objects must belaunched orexecuted in order for them to run on thecluster. During job launch, a user specifies execution details including datasources, configuration values, and program arguments. The relevant detailswill vary by job type. The launch will create aJob Execution object insahara which is used to monitor and manage the job.
To execute Hadoop jobs, sahara generates an Oozie workflow and submits it tothe Oozie server running on the cluster. Familiarity with Oozie is notnecessary for using sahara but it may be beneficial to the user. A link tothe Oozie web console can be found in the sahara web UI in the clusterdetails.
For Spark jobs, sahara uses thespark-submit shell script and executes theSpark job from the master node in case of Spark cluster and from the SparkJob History server in other cases. Logs of spark jobs run by sahara can befound on this node under the/tmp/spark-edp directory.
The general workflow for defining and executing a job in sahara is essentiallythe same whether using the web UI or the REST API.
The workflow is simpler when using existing objects. For example, toconstruct a new job which uses existing binaries and input data a user mayonly need to perform steps 3, 5, and 6 above. Of course, to repeat the samejob multiple times a user would need only step 6.
Jobs can be configured at launch. The job type determines the kinds of valuesthat may be set:
Job type ConfigurationValues Parameters Arguments HiveYes Yes No PigYes Yes Yes MapReduceYes No No MapReduce.StreamingYes No No JavaYes No Yes ShellYes Yes Yes SparkYes No Yes StormYes No Yes StormPyelusYes No Yes
These values can be set on theConfigure tab during job launchthrough the web UI or through thejob_configs parameter when using the/jobs/<job_id>/execute REST method.
In some cases sahara generates configuration values or parametersautomatically. Values set explicitly by the user during launch will overridethose generated by sahara.
Sometimes it’s necessary or desirable to pass a data path as an argument to ajob. In these cases, a user may simply type out the path as an argument whenlaunching a job. If the path requires credentials, the user can manually addthe credentials as configuration values. However, if a data source object hasbeen created that contains the desired path and credentials there is no needto specify this information manually.
As a convenience, sahara allows data source objects to be referenced by nameor id in arguments, configuration values, or parameters. When the job isexecuted, sahara will replace the reference with the path stored in the datasource object and will add any necessary credentials to the job configuration.Referencing an existing data source object is much faster than adding thisinformation by hand. This is particularly useful for job types like Java orSpark that do not use data source objects directly.
There are two job configuration parameters that enable data source references.They may be used with any job type and are set on theConfiguration tabwhen the job is launched:
edp.substitute_data_source_for_name (defaultFalse) If set toTrue, causes sahara to look for data source object name references inconfiguration values, arguments, and parameters when a job is launched. Namereferences have the formdatasource://name_of_the_object.
For example, assume a user has a WordCount application that takes an inputpath as an argument. If there is a data source object namedmy_input, auser may simply set theedp.substitute_data_source_for_nameconfiguration parameter toTrue and adddatasource://my_input as anargument when launching the job.
edp.substitute_data_source_for_uuid (defaultFalse) If set toTrue, causes sahara to look for data source object ids in configurationvalues, arguments, and parameters when a job is launched. A data sourceobject id is a uuid, so they are unique. The id of a data source object isavailable through the UI or the sahara command line client. A user maysimply use the id as a value.
In order to better document your job for cluster operators (or for yourselfin the future), sahara allows the addition of an interface (or methodsignature) to your job template. A sample interface for the Teragen Hadoopexample might be:
| Name | MappingType | Location | ValueType | Required | Default |
|---|---|---|---|---|---|
| ExampleClass | args | 0 | string | false | teragen |
| Rows | args | 1 | number | true | unset |
| OutputPath | args | 2 | data_source | false | hdfs://ip:port/path |
| MapperCount | configs | mapred.map.tasks | number | false | unset |
A “Description” field may also be added to each interface argument.
To create such an interface via the REST API, provide an “interface” argument,the value of which consists of a list of JSON objects, as below:
[{"name":"Example Class","description":"Indicates which example job class should be used.","mapping_type":"args","location":"0","value_type":"string","required":false,"default":"teragen"},]
Creating this interface would allow you to specify a configuration for anyexecution of the job template by passing an “interface” map similar to:
{"Rows":"1000000","Mapper Count":"3","Output Path":"hdfs://mycluster:8020/user/myuser/teragen-output"}
The specified arguments would be automatically placed into the args, configs,and params for the job, according to the mapping type and location fields ofeach interface argument. The finaljob_configs map would be:
{"job_configs":{"configs":{"mapred.map.tasks":"3"},"args":["teragen","1000000","hdfs://mycluster:8020/user/myuser/teragen-output"]}}
Rules for specifying an interface are as follows:
configs,params, orargs. Only typessupported for your job type are allowed (see above.)configs andparams, and an integer forargs. The set ofargs locations must be an unbroken series ofintegers starting from 0.string,number, ordata_source. Datasources may be passed as UUIDs or as valid paths (see above.) All valuesshould be sent as JSON strings. (Note that booleans and null values areserialized differently in different languages. Please specify them as astring representation of the appropriate constants for your data processingengine.)args that are not required must be given a default value.The additional one-time complexity of specifying an interface on your templateallows a simpler repeated execution path, and also allows us to generate acustomized form for your job in the Horizon UI. This may be particularlyuseful in cases in which an operator who is not a data processing jobdeveloper will be running and administering the jobs.
If swift proxy users are not configured (seeSahara Advanced Configuration Guide) and a job is run with datasource objects containing swift paths, sahara will automatically generateswift username and password configuration values based on the credentialsin the data sources. If the input and output data sources are both in swift,it is expected that they specify the same credentials.
The swift credentials may be set explicitly with the following configurationvalues:
Name fs.swift.service.sahara.username fs.swift.service.sahara.password
Setting the swift credentials explicitly is required when passing literalswift paths as arguments instead of using data source references. Whenpossible, use data source references as described inUsing Data Source References as Arguments.
Sahara will automatically generate values for theINPUT andOUTPUTparameters required by Hive based on the specified data sources.
Sahara will automatically generate values for theINPUT andOUTPUTparameters required by Pig based on the specified data sources.
For Pig jobs,arguments should be thought of as command line argumentsseparated by spaces and passed to thepig shell.
Parameters are a shorthand and are actually translated to the arguments-paramname=value
Important!
If the job type is MapReduce, the mapper and reducer classesmust bespecified as configuration values.
Note that the UI will not prompt the user for these required values; they mustbe added manually with theConfigure tab.
Make sure to add these values with the correct names:
Name Example Value mapred.mapper.new-api true mapred.reducer.new-api true mapreduce.job.map.class org.apache.oozie.example.SampleMapper mapreduce.job.reduce.class org.apache.oozie.example.SampleReducer
Important!
If the job type is MapReduce.Streaming, the streaming mapper and reducerclassesmust be specified.
In this case, the UIwill prompt the user to enter mapper and reducervalues on the form and will take care of adding them to the job configurationwith the appropriate names. If using the python client, however, be certain toadd these values to the job configuration manually with the correct names:
Name Example Value edp.streaming.mapper /bin/cat edp.streaming.reducer /usr/bin/wc
Data Source objects are not used directly with Java job types. Instead, anyinput or output paths must be specified as arguments at job launch eitherexplicitly or by reference as described inUsing Data Source References as Arguments. Using data source references isthe recommended way to pass paths to Java jobs.
If configuration values are specified, they must be added to the job’sHadoop configuration at runtime. There are two methods of doing this. Thesimplest way is to use theedp.java.adapt_for_oozie option describedbelow. The other method is to use the code fromthis exampleto explicitly load the values.
The following special configuration values are read by sahara and affect howJava jobs are run:
edp.java.main_class (required) Specifies the full name of the classcontainingmain(String[]args)
A Java job will execute themain method of the specified main class. Anyarguments set during job launch will be passed to the program through theargs array.
oozie.libpath (optional) Specifies configuration values for the Oozieshare libs, these libs can be shared by different workflows
edp.java.java_opts (optional) Specifies configuration values for the JVM
edp.java.adapt_for_oozie (optional) Specifies that sahara should performspecial handling of configuration values and exit conditions. The default isFalse.
If this configuration value is set toTrue, sahara will modifythe job’s Hadoop configuration before invoking the specifiedmain method.Any configuration values specified during job launch (excluding thosebeginning withedp.) will be automatically set in the job’s Hadoopconfiguration and will be available through standard methods.
Secondly, setting this option toTrue ensures that Oozie will handleprogram exit conditions correctly.
At this time, the following special configuration value only applies whenrunning jobs on a cluster generated by the Cloudera plugin with theEnable Hbase Common Lib cluster config set toTrue (the default value):
edp.hbase_common_lib (optional) Specifies that a common Hbase libgenerated by sahara in HDFS be added to theoozie.libpath. This for usewhen an Hbase application is driven from a Java job. Default isFalse.Theedp-wordcount example bundled with sahara shows how to useconfiguration values, arguments, and swift data paths in a Java job type. Notethat the example does not use theedp.java.adapt_for_oozie option butincludes the code to load the configuration values explicitly.
A shell job will execute the script specified asmain, and will place anyfiles specified aslibs in the same working directory (on both thefilesystem and in HDFS). Command line arguments may be passed to the scriptthrough theargs array, and anyparams values will be passed asenvironment variables.
Data Source objects are not used directly with Shell job types but data sourcereferences may be used as described inUsing Data Source References as Arguments.
Theedp-shell example bundled with sahara contains a script which willoutput the executing user to a file specified by the first command lineargument.
Data Source objects are not used directly with Spark job types. Instead, anyinput or output paths must be specified as arguments at job launch eitherexplicitly or by reference as described inUsing Data Source References as Arguments. Using data source referencesis the recommended way to pass paths to Spark jobs.
Spark jobs use some special configuration values:
edp.java.main_class (required) Specifies the full name of the classcontaining the Java or Scala main method:
main(String[]args) for Javamain(args:Array[String] for ScalaA Spark job will execute themain method of the specified main class.Any arguments set during job launch will be passed to the program through theargs array.
edp.spark.adapt_for_swift (optional) If set toTrue, instructssahara to modify the job’s Hadoop configuration so that swift paths may beaccessed. Without this configuration value, swift paths will not beaccessible to Spark jobs. The default isFalse.
edp.spark.driver.classpath (optional) If set to empty string saharawill use default classpath for the cluster during job execution.Otherwise this will override default value for the cluster for particularjob execution.
Theedp-spark example bundled with sahara contains a Spark program forestimating Pi.
Sahara uses custom URLs to refer to objects stored in swift, in manila, or inthe sahara internal database. These URLs are not meant to be used outside ofsahara.
Sahara swift URLs passed to running jobs as input or output sources include a“.sahara” suffix on the container, for example:
swift://container.sahara/object
You may notice these swift URLs in job logs, however, you do not need to addthe suffix to the containers yourself. sahara will add the suffix ifnecessary, so when using the UI or the python client you may write the aboveURL simply as:
swift://container/object
Sahara internal database URLs have the form:
internal-db://sahara-generated-uuid
This indicates a file object in the sahara database which has the given uuidas a key.
Manila NFS filesystem reference URLS take the form:
manila://share-uuid/path
This format should be used when referring to a job binary or a data sourcestored in a manila NFS share.
The OpenStack installation and the cluster launched from sahara must meet thefollowing minimum requirements in order for EDP to function:
When a Hadoop job is executed, binaries are first uploaded to a cluster nodeand then moved from the node local filesystem to HDFS. Therefore, there mustbe an instance of HDFS available to the nodes in the sahara cluster.
If the swift serviceis not running in the OpenStack installation:
- Job binaries may only be stored in the sahara internal database
- Data sources require a long-running HDFS
If the swift serviceis running in the OpenStack installation:
- Job binaries may be stored in swift or the sahara internal database
- Data sources may be in swift or a long-running HDFS
Requirements for EDP support depend on the EDP job type and plugin used forthe cluster. For example a Vanilla sahara cluster must run at least oneinstance of these processes to support EDP:
There are several things in EDP which require attention in orderto work properly. They are listed on this page.
EDP allows running jobs on transient clusters. In this case the cluster iscreated specifically for the job and is shut down automatically once the jobis finished.
Two config parameters control the behaviour of periodic clusters:
- periodic_enable - if set to ‘false’, sahara will do nothing to a transientcluster once the job it was created for is completed. If it is set to‘true’, then the behaviour depends on the value of the next parameter.
- use_identity_api_v3 - set it to ‘false’ if your OpenStack installationdoes not provide keystone API v3. In that case sahara will not terminateunneeded clusters. Instead it will set their state to ‘AwaitingTermination’meaning that they could be manually deleted by a user. If the parameter isset to ‘true’, sahara will itself terminate the cluster. The limitation iscaused by lack of ‘trusts’ feature in Keystone API older than v3.
If both parameters are set to ‘true’, sahara works with transient clusters inthe following manner:
- When a user requests for a job to be executed on a transient cluster,sahara creates such a cluster.
- Sahara drops the user’s credentials once the cluster is created butprior to that it creates a trust allowing it to operate with thecluster instances in the future without user credentials.
- Once a cluster is not needed, sahara terminates its instances using thestored trust. sahara drops the trust after that.

Except where otherwise noted, this document is licensed underCreative Commons Attribution 3.0 License. See all OpenStack Legal Documents.