Movatterモバイル変換


[0]ホーム

URL:


Elastic Data Processing (EDP)

updated: 'Thu Jun 29 08:54:09 2017, commit 506f85b'

Elastic Data Processing (EDP)

Overview

Sahara’s Elastic Data Processing facility orEDP allows the executionof jobs on clusters created from sahara. EDP supports:

  • Hive, Pig, MapReduce, MapReduce.Streaming, Java, and Shell job types onHadoop clusters
  • Spark jobs on Spark standalone clusters, MapR (v5.0.0 - v5.2.0) clusters,Vanilla clusters (v2.7.1) and CDH clusters (v5.3.0 or higher).
  • storage of job binaries in the OpenStack Object Storage service (swift),the OpenStack Shared file systems service (manila), or sahara’s owndatabase
  • access to input and output data sources in
    • HDFS for all job types
    • swift for all types excluding Hive
    • manila (NFS shares only) for all types excluding Pig
  • configuration of jobs at submission time
  • execution of jobs on existing clusters or transient clusters

Interfaces

The EDP features can be used from the sahara web UI which is described in theSahara (Data Processing) UI User Guide.

The EDP features also can be used directly by a client through theREST api

EDP Concepts

Sahara EDP uses a collection of simple objects to define and execute jobs.These objects are stored in the sahara database when they are created,allowing them to be reused. This modular approach with database persistenceallows code and data to be reused across multiple jobs.

The essential components of a job are:

  • executable code to run
  • input and output data paths, as needed for the job
  • any additional configuration values needed for the job run

These components are supplied through the objects described below.

Job Binaries

AJob Binary object stores a URL to a single script or Jar file andany credentials needed to retrieve the file. The file itself may be storedin the sahara internal database (but it is deprecated now), in swift,or in manila.

deprecated: Files in the sahara database are stored as raw bytes in aJob Binary Internal object. This object’s sole purpose is to store afile for later retrieval. No extra credentials need to be supplied for filesstored internally.

Sahara requires credentials (username and password) to access files stored inswift unless swift proxy users are configured as described inSahara Advanced Configuration Guide. The swift service must berunning in the same OpenStack installation referenced by sahara.

To reference a binary file stored in manila, create the job binary with theURLmanila://{share_id}/{path}. This assumes that you have already storedthat file in the appropriate path on the share. The share will beautomatically mounted to any cluster nodes which require access to the file,if it is not mounted already.

There is a configurable limit on the size of a single job binary that may beretrieved by sahara. This limit is 5MB and may be set with thejob_binary_max_KB setting in thesahara.conf configuration file.

Jobs

AJob object specifies the type of the job and lists all of theindividual Job Binary objects that are required for execution. An individualJob Binary may be referenced by multiple Jobs. A Job object specifies a mainbinary and/or supporting libraries depending on its type:

Job typeMain binaryLibraries
Hiverequiredoptional
Pigrequiredoptional
MapReducenot usedrequired
MapReduce.Streamingnot usedoptional
Javanot usedrequired
Shellrequiredoptional
Sparkrequiredoptional
Stormrequirednot used
StormPyelusrequirednot used

Data Sources

AData Source object stores a URL which designates the location ofinput or output data and any credentials needed to access the location.

Sahara supports data sources in swift. The swift service must be running inthe same OpenStack installation referenced by sahara.

Sahara also supports data sources in HDFS. Any HDFS instance running on asahara cluster in the same OpenStack installation is accessible withoutmanual configuration. Other instances of HDFS may be used as well providedthat the URL is resolvable from the node executing the job.

Sahara supports data sources in manila as well. To reference a path on an NFSshare as a data source, create the data source with the URLmanila://{share_id}/{path}. As in the case of job binaries, the specifiedshare will be automatically mounted to your cluster’s nodes as needed toaccess the data source.

Some job types require the use of data source objects to specify input andoutput when a job is launched. For example, when running a Pig job the UI willprompt the user for input and output data source objects.

Other job types like Java or Spark do not require the user to specify datasources. For these job types, data paths are passed as arguments. Forconvenience, sahara allows data source objects to be referenced by name or id.The sectionUsing Data Source References as Arguments gives furtherdetails.

Job Execution

Job objects must belaunched orexecuted in order for them to run on thecluster. During job launch, a user specifies execution details including datasources, configuration values, and program arguments. The relevant detailswill vary by job type. The launch will create aJob Execution object insahara which is used to monitor and manage the job.

To execute Hadoop jobs, sahara generates an Oozie workflow and submits it tothe Oozie server running on the cluster. Familiarity with Oozie is notnecessary for using sahara but it may be beneficial to the user. A link tothe Oozie web console can be found in the sahara web UI in the clusterdetails.

For Spark jobs, sahara uses thespark-submit shell script and executes theSpark job from the master node in case of Spark cluster and from the SparkJob History server in other cases. Logs of spark jobs run by sahara can befound on this node under the/tmp/spark-edp directory.

General Workflow

The general workflow for defining and executing a job in sahara is essentiallythe same whether using the web UI or the REST API.

  1. Launch a cluster from sahara if there is not one already available
  2. Create all of the Job Binaries needed to run the job, stored in the saharadatabase, in swift, or in manila
    • When using the REST API and internal storage of job binaries, the JobBinary Internal objects must be created first
    • Once the Job Binary Internal objects are created, Job Binary objects maybe created which refer to them by URL
  3. Create a Job object which references the Job Binaries created in step 2
  4. Create an input Data Source which points to the data you wish to process
  5. Create an output Data Source which points to the location for output data
  6. Create a Job Execution object specifying the cluster and Job object plusrelevant data sources, configuration values, and program arguments
    • When using the web UI this is done with theLaunch On Existing Cluster orLaunch on New Cluster buttons on the Jobs tab
    • When using the REST API this is done via the/jobs/<job_id>/executemethod

The workflow is simpler when using existing objects. For example, toconstruct a new job which uses existing binaries and input data a user mayonly need to perform steps 3, 5, and 6 above. Of course, to repeat the samejob multiple times a user would need only step 6.

Specifying Configuration Values, Parameters, and Arguments

Jobs can be configured at launch. The job type determines the kinds of valuesthat may be set:

Job typeConfigurationValuesParametersArguments
HiveYesYesNo
PigYesYesYes
MapReduceYesNoNo
MapReduce.StreamingYesNoNo
JavaYesNoYes
ShellYesYesYes
SparkYesNoYes
StormYesNoYes
StormPyelusYesNoYes
  • Configuration values are key/value pairs.
    • The EDP configuration values have names beginning withedp. and areconsumed by sahara
    • Other configuration values may be read at runtime by Hadoop jobs
    • Currently additional configuration values are not available to Spark jobsat runtime
  • Parameters are key/value pairs. They supply values for the Hive andPig parameter substitution mechanisms. In Shell jobs, they are passed asenvironment variables.
  • Arguments are strings passed as command line arguments to a shell ormain program

These values can be set on theConfigure tab during job launchthrough the web UI or through thejob_configs parameter when using the/jobs/<job_id>/execute REST method.

In some cases sahara generates configuration values or parametersautomatically. Values set explicitly by the user during launch will overridethose generated by sahara.

Using Data Source References as Arguments

Sometimes it’s necessary or desirable to pass a data path as an argument to ajob. In these cases, a user may simply type out the path as an argument whenlaunching a job. If the path requires credentials, the user can manually addthe credentials as configuration values. However, if a data source object hasbeen created that contains the desired path and credentials there is no needto specify this information manually.

As a convenience, sahara allows data source objects to be referenced by nameor id in arguments, configuration values, or parameters. When the job isexecuted, sahara will replace the reference with the path stored in the datasource object and will add any necessary credentials to the job configuration.Referencing an existing data source object is much faster than adding thisinformation by hand. This is particularly useful for job types like Java orSpark that do not use data source objects directly.

There are two job configuration parameters that enable data source references.They may be used with any job type and are set on theConfiguration tabwhen the job is launched:

  • edp.substitute_data_source_for_name (defaultFalse) If set toTrue, causes sahara to look for data source object name references inconfiguration values, arguments, and parameters when a job is launched. Namereferences have the formdatasource://name_of_the_object.

    For example, assume a user has a WordCount application that takes an inputpath as an argument. If there is a data source object namedmy_input, auser may simply set theedp.substitute_data_source_for_nameconfiguration parameter toTrue and adddatasource://my_input as anargument when launching the job.

  • edp.substitute_data_source_for_uuid (defaultFalse) If set toTrue, causes sahara to look for data source object ids in configurationvalues, arguments, and parameters when a job is launched. A data sourceobject id is a uuid, so they are unique. The id of a data source object isavailable through the UI or the sahara command line client. A user maysimply use the id as a value.

Creating an Interface for Your Job

In order to better document your job for cluster operators (or for yourselfin the future), sahara allows the addition of an interface (or methodsignature) to your job template. A sample interface for the Teragen Hadoopexample might be:

NameMappingTypeLocationValueTypeRequiredDefault
ExampleClassargs0stringfalseteragen
Rowsargs1numbertrueunset
OutputPathargs2data_sourcefalsehdfs://ip:port/path
MapperCountconfigsmapred.map.tasksnumberfalseunset

A “Description” field may also be added to each interface argument.

To create such an interface via the REST API, provide an “interface” argument,the value of which consists of a list of JSON objects, as below:

[{"name":"Example Class","description":"Indicates which example job class should be used.","mapping_type":"args","location":"0","value_type":"string","required":false,"default":"teragen"},]

Creating this interface would allow you to specify a configuration for anyexecution of the job template by passing an “interface” map similar to:

{"Rows":"1000000","Mapper Count":"3","Output Path":"hdfs://mycluster:8020/user/myuser/teragen-output"}

The specified arguments would be automatically placed into the args, configs,and params for the job, according to the mapping type and location fields ofeach interface argument. The finaljob_configs map would be:

{"job_configs":{"configs":{"mapred.map.tasks":"3"},"args":["teragen","1000000","hdfs://mycluster:8020/user/myuser/teragen-output"]}}

Rules for specifying an interface are as follows:

  • Mapping Type must be one ofconfigs,params, orargs. Only typessupported for your job type are allowed (see above.)
  • Location must be a string forconfigs andparams, and an integer forargs. The set ofargs locations must be an unbroken series ofintegers starting from 0.
  • Value Type must be one ofstring,number, ordata_source. Datasources may be passed as UUIDs or as valid paths (see above.) All valuesshould be sent as JSON strings. (Note that booleans and null values areserialized differently in different languages. Please specify them as astring representation of the appropriate constants for your data processingengine.)
  • args that are not required must be given a default value.

The additional one-time complexity of specifying an interface on your templateallows a simpler repeated execution path, and also allows us to generate acustomized form for your job in the Horizon UI. This may be particularlyuseful in cases in which an operator who is not a data processing jobdeveloper will be running and administering the jobs.

Generation of Swift Properties for Data Sources

If swift proxy users are not configured (seeSahara Advanced Configuration Guide) and a job is run with datasource objects containing swift paths, sahara will automatically generateswift username and password configuration values based on the credentialsin the data sources. If the input and output data sources are both in swift,it is expected that they specify the same credentials.

The swift credentials may be set explicitly with the following configurationvalues:

Name
fs.swift.service.sahara.username
fs.swift.service.sahara.password

Setting the swift credentials explicitly is required when passing literalswift paths as arguments instead of using data source references. Whenpossible, use data source references as described inUsing Data Source References as Arguments.

Additional Details for Hive jobs

Sahara will automatically generate values for theINPUT andOUTPUTparameters required by Hive based on the specified data sources.

Additional Details for Pig jobs

Sahara will automatically generate values for theINPUT andOUTPUTparameters required by Pig based on the specified data sources.

For Pig jobs,arguments should be thought of as command line argumentsseparated by spaces and passed to thepig shell.

Parameters are a shorthand and are actually translated to the arguments-paramname=value

Additional Details for MapReduce jobs

Important!

If the job type is MapReduce, the mapper and reducer classesmust bespecified as configuration values.

Note that the UI will not prompt the user for these required values; they mustbe added manually with theConfigure tab.

Make sure to add these values with the correct names:

NameExample Value
mapred.mapper.new-apitrue
mapred.reducer.new-apitrue
mapreduce.job.map.classorg.apache.oozie.example.SampleMapper
mapreduce.job.reduce.classorg.apache.oozie.example.SampleReducer

Additional Details for MapReduce.Streaming jobs

Important!

If the job type is MapReduce.Streaming, the streaming mapper and reducerclassesmust be specified.

In this case, the UIwill prompt the user to enter mapper and reducervalues on the form and will take care of adding them to the job configurationwith the appropriate names. If using the python client, however, be certain toadd these values to the job configuration manually with the correct names:

NameExample Value
edp.streaming.mapper/bin/cat
edp.streaming.reducer/usr/bin/wc

Additional Details for Java jobs

Data Source objects are not used directly with Java job types. Instead, anyinput or output paths must be specified as arguments at job launch eitherexplicitly or by reference as described inUsing Data Source References as Arguments. Using data source references isthe recommended way to pass paths to Java jobs.

If configuration values are specified, they must be added to the job’sHadoop configuration at runtime. There are two methods of doing this. Thesimplest way is to use theedp.java.adapt_for_oozie option describedbelow. The other method is to use the code fromthis exampleto explicitly load the values.

The following special configuration values are read by sahara and affect howJava jobs are run:

  • edp.java.main_class (required) Specifies the full name of the classcontainingmain(String[]args)

    A Java job will execute themain method of the specified main class. Anyarguments set during job launch will be passed to the program through theargs array.

  • oozie.libpath (optional) Specifies configuration values for the Oozieshare libs, these libs can be shared by different workflows

  • edp.java.java_opts (optional) Specifies configuration values for the JVM

  • edp.java.adapt_for_oozie (optional) Specifies that sahara should performspecial handling of configuration values and exit conditions. The default isFalse.

    If this configuration value is set toTrue, sahara will modifythe job’s Hadoop configuration before invoking the specifiedmain method.Any configuration values specified during job launch (excluding thosebeginning withedp.) will be automatically set in the job’s Hadoopconfiguration and will be available through standard methods.

    Secondly, setting this option toTrue ensures that Oozie will handleprogram exit conditions correctly.

At this time, the following special configuration value only applies whenrunning jobs on a cluster generated by the Cloudera plugin with theEnable Hbase Common Lib cluster config set toTrue (the default value):

  • edp.hbase_common_lib (optional) Specifies that a common Hbase libgenerated by sahara in HDFS be added to theoozie.libpath. This for usewhen an Hbase application is driven from a Java job. Default isFalse.

Theedp-wordcount example bundled with sahara shows how to useconfiguration values, arguments, and swift data paths in a Java job type. Notethat the example does not use theedp.java.adapt_for_oozie option butincludes the code to load the configuration values explicitly.

Additional Details for Shell jobs

A shell job will execute the script specified asmain, and will place anyfiles specified aslibs in the same working directory (on both thefilesystem and in HDFS). Command line arguments may be passed to the scriptthrough theargs array, and anyparams values will be passed asenvironment variables.

Data Source objects are not used directly with Shell job types but data sourcereferences may be used as described inUsing Data Source References as Arguments.

Theedp-shell example bundled with sahara contains a script which willoutput the executing user to a file specified by the first command lineargument.

Additional Details for Spark jobs

Data Source objects are not used directly with Spark job types. Instead, anyinput or output paths must be specified as arguments at job launch eitherexplicitly or by reference as described inUsing Data Source References as Arguments. Using data source referencesis the recommended way to pass paths to Spark jobs.

Spark jobs use some special configuration values:

  • edp.java.main_class (required) Specifies the full name of the classcontaining the Java or Scala main method:

    • main(String[]args) for Java
    • main(args:Array[String] for Scala

    A Spark job will execute themain method of the specified main class.Any arguments set during job launch will be passed to the program through theargs array.

  • edp.spark.adapt_for_swift (optional) If set toTrue, instructssahara to modify the job’s Hadoop configuration so that swift paths may beaccessed. Without this configuration value, swift paths will not beaccessible to Spark jobs. The default isFalse.

  • edp.spark.driver.classpath (optional) If set to empty string saharawill use default classpath for the cluster during job execution.Otherwise this will override default value for the cluster for particularjob execution.

Theedp-spark example bundled with sahara contains a Spark program forestimating Pi.

Special Sahara URLs

Sahara uses custom URLs to refer to objects stored in swift, in manila, or inthe sahara internal database. These URLs are not meant to be used outside ofsahara.

Sahara swift URLs passed to running jobs as input or output sources include a“.sahara” suffix on the container, for example:

swift://container.sahara/object

You may notice these swift URLs in job logs, however, you do not need to addthe suffix to the containers yourself. sahara will add the suffix ifnecessary, so when using the UI or the python client you may write the aboveURL simply as:

swift://container/object

Sahara internal database URLs have the form:

internal-db://sahara-generated-uuid

This indicates a file object in the sahara database which has the given uuidas a key.

Manila NFS filesystem reference URLS take the form:

manila://share-uuid/path

This format should be used when referring to a job binary or a data sourcestored in a manila NFS share.

EDP Requirements

The OpenStack installation and the cluster launched from sahara must meet thefollowing minimum requirements in order for EDP to function:

OpenStack Services

When a Hadoop job is executed, binaries are first uploaded to a cluster nodeand then moved from the node local filesystem to HDFS. Therefore, there mustbe an instance of HDFS available to the nodes in the sahara cluster.

If the swift serviceis not running in the OpenStack installation:

  • Job binaries may only be stored in the sahara internal database
  • Data sources require a long-running HDFS

If the swift serviceis running in the OpenStack installation:

  • Job binaries may be stored in swift or the sahara internal database
  • Data sources may be in swift or a long-running HDFS

Cluster Processes

Requirements for EDP support depend on the EDP job type and plugin used forthe cluster. For example a Vanilla sahara cluster must run at least oneinstance of these processes to support EDP:

  • For Hadoop version 1:
    • jobtracker
    • namenode
    • oozie
    • tasktracker
    • datanode
  • For Hadoop version 2:
    • namenode
    • datanode
    • resourcemanager
    • nodemanager
    • historyserver
    • oozie
    • spark history server

EDP Technical Considerations

There are several things in EDP which require attention in orderto work properly. They are listed on this page.

Transient Clusters

EDP allows running jobs on transient clusters. In this case the cluster iscreated specifically for the job and is shut down automatically once the jobis finished.

Two config parameters control the behaviour of periodic clusters:

  • periodic_enable - if set to ‘false’, sahara will do nothing to a transientcluster once the job it was created for is completed. If it is set to‘true’, then the behaviour depends on the value of the next parameter.
  • use_identity_api_v3 - set it to ‘false’ if your OpenStack installationdoes not provide keystone API v3. In that case sahara will not terminateunneeded clusters. Instead it will set their state to ‘AwaitingTermination’meaning that they could be manually deleted by a user. If the parameter isset to ‘true’, sahara will itself terminate the cluster. The limitation iscaused by lack of ‘trusts’ feature in Keystone API older than v3.

If both parameters are set to ‘true’, sahara works with transient clusters inthe following manner:

  1. When a user requests for a job to be executed on a transient cluster,sahara creates such a cluster.
  2. Sahara drops the user’s credentials once the cluster is created butprior to that it creates a trust allowing it to operate with thecluster instances in the future without user credentials.
  3. Once a cluster is not needed, sahara terminates its instances using thestored trust. sahara drops the trust after that.
updated: 'Thu Jun 29 08:54:09 2017, commit 506f85b'
Creative Commons Attribution 3.0 License

Except where otherwise noted, this document is licensed underCreative Commons Attribution 3.0 License. See all OpenStack Legal Documents.

found an error? report a bug questions?

Contents


[8]ページ先頭

©2009-2026 Movatter.jp