- Notifications
You must be signed in to change notification settings - Fork40
Ruby-based programmatic access to Amazon's Elastic MapReduce service.
License
rslifka/elasticity
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
2021 Update As you can see from the modification dates, this gem hasn't been updated for years and the build is currently failing due to an ancient version of Ruby. Please consider this repository no longer maintained and not recommended for production use. I'm leaving it available in the off chance some portion of its source may be useful to others.
Best of luck! -- Rob
Elasticity provides programmatic access to Amazon's Elastic Map Reduce service. The aim is to conveniently abstract away the complex EMR REST API and make working with job flows more productive and more enjoyable.
Elasticity provides two ways to access EMR:
- Indirectly through a JobFlow-based API. This README discusses the Elasticity API.
- Directly through access to the EMR REST API. The less-discussed hidden darkside... I use this to enable the Elasticity API. RubyDoc can be found at the RubyGemsauto-generated documentation site. Be forewarned: Making the calls directly requires that you understand how to structure EMR requests at the Amazon API level and from experience I can tell you there are more fun things you could be doing :) Scroll to the end for more information on the Amazon API.
You must be running a ruby >= 2.2
gem install elasticity
or in your Gemfile
gem 'elasticity', '~> 6.0'
This will ensure that you protect yourself from API changes, which will only be made in major revisions.
If you're familiar with the AWS EMR UI, you'll recall there are sample jobs Amazon supplies to help us get familiar with EMR. Here's how you'd kick off the "Cloudburst (Custom Jar)" sample job with Elasticity. You can run this code as-is (supplying your AWS credentials and an output location) andJobFlow#run
will return the ID of the job flow.
require'elasticity'# Specify your AWS credentialsElasticity.configuredo |c|c.access_key=ENV['AWS_ACCESS_KEY_ID']c.secret_key=ENV['AWS_SECRET_ACCESS_KEY']end# Create a job flowjobflow=Elasticity::JobFlow.new# NOTE: Amazon requires that all new accounts specify a VPC subnet when launching jobs.# If you're on an existing account, this is unnecessary however new AWS accounts require# subnet IDs be specified when launching jobs.# jobflow.ec2_subnet_id = 'YOUR_SUBNET_ID_HERE'# This is the first step in the jobflow - running a custom jarstep=Elasticity::CustomJarStep.new('s3n://elasticmapreduce/samples/cloudburst/cloudburst.jar')# Here are the arguments to pass to the jar (replace OUTPUT_BUCKET)step.arguments=%w(s3n://elasticmapreduce/samples/cloudburst/input/s_suis.brs3n://elasticmapreduce/samples/cloudburst/input/100k.brs3n://OUTPUT_BUCKET/cloudburst/output/2012-06-223630124048242412816)# Add the step to the jobflowjobflow.add_step(step)# Let's go!jobflow.run
Note that this example is only forCustomJarStep
. Other steps will have different means of passing parameters.
Job flows are the center of the EMR universe. The general order of operations is:
- Specify AWS credentials
- Create a job flow.
- Specify options.
- (optional) Configure instance groups.
- (optional) Add bootstrap actions.
- (optional) Add steps.
- (optional) Upload assets.
- Run the job flow.
- (optional) Add additional steps.
- (optional) Wait for the job flow to complete.
- (optional) Shutdown the job flow.
Elasticity.configuredo |c|c.access_key=ENV['AWS_ACCESS_KEY_ID']# requiredc.secret_key=ENV['AWS_SECRET_ACCESS_KEY']# requiredc.security_token=ENV['AWS_SECURITY_TOKEN']# optional, if you're using STSend
jobflow=Elasticity::JobFlow.new
If you want to access a job flow that's already running:
jobflow=Elasticity::JobFlow.from_jobflow_id('jobflow ID','region')
This is useful if you'd like to attach to a running job flow and add more steps, etc. Theregion
parameter is necessary because job flows are only accessible from the the API when you connect to the same endpoint that created them (e.g. us-west-1). If you don't specify theregion
parameter, us-east-1 is assumed.
Configuration job flow options, shown below with default values. Note that these defaults are subject to change - they are reasonable defaults at the time(s) I work on them (e.g. the latest version of Hadoop).
These options are sent up as part of job flow submission (i.e.JobFlow#run
), so be sure to configure these before running the job.
jobflow.name='Elasticity Job Flow'# For new AWS accounts, this is required to be setjobflow.ec2_subnet_id=niljobflow.job_flow_role=niljobflow.service_role=niljobflow.action_on_failure='TERMINATE_JOB_FLOW'jobflow.keep_job_flow_alive_when_no_steps=falsejobflow.log_uri=niljobflow.enable_debugging=false# Requires a log_uri to enable# >= 4.0.0 release label is now the defaultjobflow.release_label='4.3.0'# < 4.0.0 ... Haven't used this before? just set the release label then.jobflow.ami_version='latest'jobflow.tags={name:"app-name",department:'marketing'}jobflow.ec2_key_name=niljobflow.visible_to_all_users=falsejobflow.placement='us-east-1a'jobflow.region='us-east-1'jobflow.instance_count=2jobflow.master_instance_type='m1.small'jobflow.slave_instance_type='m1.small'jobflow.additonal_info='additonal info'jobflow.additional_master_security_groups=['sg-1111','sg-2222']jobflow.additional_slave_security_groups=['sg-1111','sg-2222']
With the release of EMR 4.0.0 you can now supply applications which EMR will install for you on boot(rather than a manual bootstrap action. Which you can still use if required). You must set therelease_label
for the jobflow(>=4.0.0)
jobflow.release_label='4.3.0'# the simple wayjobflow.add_application("Spark")# Pig, Hive, Mahout# more verbosespark=Elasticity::Application.new({name:'Spark',arguments:'--webui-port 18080',version:'1.0.1',additional_info:''# This option is for advanced users only. This is meta information about third-party applications that third-party vendors use for testing purposes.})jobflow.add_application(spark)
Further reading:http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/emr-configure-apps.html
Technically this is optional since Elasticity creates MASTER and CORE instance groups for you (one m1.small instance in each). If you'd like your jobs to finish in an appreciable amount of time, you'll want to at least add a few instances to the CORE group :)
If all you'd like to do is change the type or number of instances,JobFlow
provides a few shortcuts to do just that.
jobflow.instance_count=10jobflow.master_instance_type='m1.small'jobflow.slave_instance_type='c1.medium'
This says "I want 10 instances from EMR: one m1.small MASTER instance and nine c1.medium CORE instances."
Elasticity supports all EMR instance group types and all configuration options. The MASTER, CORE and TASK instance groups can be configured viaJobFlow#set_master_instance_group
,JobFlow#set_core_instance_group
andJobFlow#set_task_instance_group
respectively.
These instances will be available for the life of your EMR job, versus Spot instances which are transient depending on your bid price (see below).
ig=Elasticity::InstanceGroup.newig.count=10# Provision 10 instancesig.type='c1.medium'# See the EMR docs for a list of supported typesig.set_on_demand_instances# This is the default settingjobflow.set_core_instance_group(ig)
When Amazon EC2 has unused capacity, it offers EC2 instances at a reduced cost, called the Spot Price. This price fluctuates based on availability and demand. You can purchase Spot Instances by placing a request that includes the highest bid price you are willing to pay for those instances. When the Spot Price is below your bid price, your Spot Instances are launched and you are billed the Spot Price. If the Spot Price rises above your bid price, Amazon EC2 terminates your Spot Instances. -EMR Developer Guide
ig=Elasticity::InstanceGroup.newig.count=10# Provision 10 instancesig.type='c1.medium'# See the EMR docs for a list of supported typesig.set_spot_instances(0.25)# Makes this a SPOT group with a $0.25 bid pricejobflow.set_core_instance_group(ig)
Bootstrap actions are run as part of setting up the job flow, so be sure to configure these before running the job.
With the basicBootstrapAction
you specify everything about the action - the script, options and arguments.
action=Elasticity::BootstrapAction.new('s3n://my-bucket/my-script','-g','100')jobflow.add_bootstrap_action(action)
HadoopBootstrapAction
handles passing Hadoop configuration options through.
[Elasticity::HadoopBootstrapAction.new('-m','mapred.map.tasks=101'),Elasticity::HadoopBootstrapAction.new('-m','mapred.reduce.child.java.opts=-Xmx200m')Elasticity::HadoopBootstrapAction.new('-m','mapred.tasktracker.map.tasks.maximum=14')].eachdo |action|jobflow.add_bootstrap_action(action)end
With EMR's current limit of 15 bootstrap actions, chances are you're going to create a configuration file full of your options and opt to use that instead of passing all the options individually. In that case, use theHadoopFileBootstrapAction
, supplying the location of your configuration file.
action=Elasticity::HadoopFileBootstrapAction.new('s3n://my-bucket/job-config.xml')jobflow.add_bootstrap_action(action)
Each type of step has#name
and#action_on_failure
fields that can be specified. Apart from that, steps are configured differently - exhaustively described below.
# Path to the Pig scriptpig_step=Elasticity::PigStep.new('s3n://mybucket/script.pig')# (optional) These variables are available during the execution of your scriptpig_step.variables={'VAR1'=>'VALUE1','VAR2'=>'VALUE2'}jobflow.add_step(pig_step)
Given the importance of specifying a reasonable value for [the number of parallel reducers](http://pig.apache.org/docs/r0.8.1/cookbook.html#Use+the+Parallel+Features PARALLEL), Elasticity calculates and passes through a reasonable default up with every invocation in the form of a script variable called E_PARALLELS. This default value is based off of the formula in the Pig Cookbook and the number of reducers AWS configures per instance.
For example, if you had 8 instances in total and your slaves were m1.xlarge, the value is 26 (as shown below).
s3://elasticmapreduce/libs/pig/pig-script --run-pig-script --args -p INPUT=s3n://elasticmapreduce/samples/pig-apache/input -p OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-05-04 -p E_PARALLELS=26 s3n://elasticmapreduce/samples/pig-apache/do-reports.pig
Use this as you would any other Pig variable.
A =LOAD'myfile'AS (t,u,v);B =GROUPABYtPARALLEL $E_PARALLELS; ...
# Path to the Hive Scripthive_step=Elasticity::HiveStep.new('s3n://mybucket/script.hql')# (optional) These variables are available during the execution of your scripthive_step.variables={'VAR1'=>'VALUE1','VAR2'=>'VALUE2'}jobflow.add_step(hive_step)
# Input bucket, output bucket, mapper script,reducer scriptstreaming_step=Elasticity::StreamingStep.new('s3n://elasticmapreduce/samples/wordcount/input','s3n://elasticityoutput/wordcount/output/2012-07-23','s3n://elasticmapreduce/samples/wordcount/wordSplitter.py','aggregate')# Optionally, include additional *arguments# streaming_step = Elasticity::StreamingStep.new('s3n://elasticmapreduce/samples/wordcount/input', 's3n://elasticityoutput/wordcount/output/2012-07-23', 's3n://elasticmapreduce/samples/wordcount/wordSplitter.py', 'aggregate', '-arg1', 'value1')jobflow.add_step(streaming_step)
# Path to your jarjar_step=Elasticity::CustomJarStep.new('s3n://mybucket/my.jar')# (optional) Arguments passed to the jarjar_step.arguments=['arg1','arg2']jobflow.add_step(jar_step)
# Path to your script, plus argumentsscript_step=Elasticity::ScriptStep.new('script_location','arg1','arg2')jobflow.add_step(script_step)
For a complete list of supported arguments, please see theAmazon EMR guide.
copy_step=Elasticity::S3DistCpStep.newcopy_step.arguments=[...]jobflow.add_step(copy_step)# For AMI < 4.x you need to specifify legacy argumentcopy_step=Elasticity::S3DistCpStep.new(true)
scalding_step=Elasticity::ScaldingStep.new('jar_location','main_class_fqcn',{'arg1'=>'value1'})jobflow.add_step(scalding_step)
This will result in the following command line arguments:
main_class_fqcn --hdfs --arg1 value1
spark_step=Elasticity::SparkStep.new('jar_location','main_class_fqcn')# Specifying arguments relative to Sparkspark_step.spark_arguments={'driver-memory'=>'2G'}# Specifying arguments relative to your applicationspark_step.app_arguments={'arg1'=>'value1'}
This will be equivalent to the following script:
spark-submit \ --driver-memory 2G \ --class main_class_fqcn \ jar_location \ --arg1 value1
This isn't part ofJobFlow
; more of an aside. Elasticity provides a very basic means of uploading assets to S3 so that your EMR job has access to them. Most commonly this will be a set of resources to run the job (e.g. JAR files, streaming scripts, etc.) and a set of resources used by the job itself (e.g. a TSV file with a range of valid values, join tables, etc.).
# Specify the bucket name, AWS credentials and regions3=Elasticity::SyncToS3.new('my-bucket','access','secret','region')# Alternatively, specify nothing :)# - Use the standard environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY)# - Use the 'us-east-1' region by default# s3 = Elasticity::SyncToS3('my-bucket')# Recursively sync the contents of '/foo' under the remote location 'remote-dir/this-job's3.sync('/foo','remote-dir/this-job')# Sync a single file to a remote directorys3.sync('/foo/this-job/tables/join.tsv','remote-dir/this-job/tables')
If the bucket doesn't exist, it will be created.
If a file already exists, there is an MD5 checksum evaluation. If the checksums are the same, the file will be skipped. Now you can use something likes3n://my-bucket/remote-dir/this-job/tables/join.tsv
in your EMR jobs.
Submit the job flow to Amazon, storing the ID of the running job flow.
jobflow_id=jobflow.run
Steps can be added to a running jobflow just by calling#add_step
on the job flow exactly how you add them prior to submitting the job.
Elasticity has the ability to block until the status of a job flow is not STARTING or RUNNING. There are two flavours. Without a status callback:
# Blocks until status changesjobflow.wait_for_completion
And with a status callback, providing the elapsed time and an instance ofElasticity::JobFlowStatus
so you can inspect the progress of the job.
# Blocks until status changes, calling back every 60 secondsjobflow.wait_for_completiondo |elapsed_time,job_flow_status|puts"Waiting for#{elapsed_time}, jobflow status:#{job_flow_status.state}"end
By default, job flows are set to terminate when there are no more running steps. You can tell the job flow to stay alive when it has nothing left to do:
jobflow.keep_job_flow_alive_when_no_steps=true
If that's the case, or if you'd just like to terminate a running jobflow before waiting for it to finish:
jobflow.shutdown
Elasticity supports a handful of configuration options, all of which are shown below.
Elasticity.configuredo |config|# AWS credentialsconfig.access_key=ENV['AWS_ACCESS_KEY_ID']config.secret_key=ENV['AWS_SECRET_ACCESS_KEY']# if you use federated Identity Management#config.security_token = ENV['AWS_SECURITY_TOKEN']# If using Hive, it will be configured via the directives hereconfig.hive_site='s3://bucket/hive-site.xml'end
Elasticity wraps all of the EMR API calls. Please see the Amazon guide for details on these operations because the default values aren't obvious (e.g. the meaning ofDescribeJobFlows
without parameters).
You may opt for "direct" access to the API where you specify the params and Elasticity takes care of the signing for you, responding with the XML from Amazon.
In addition to theAWS EMR site, there are three primary resources of reference information for EMR:
Unfortunately, the documentation is sometimes incorrect and sometimes missing. E.g. the allowable values forAddInstanceGroups
are present in thePDF version of the API reference but not in theHTML version. Elasticity implements the API as specified in the PDF reference as that is the most complete description I could find.
- Alexander Dean has been a constant source of excellent suggestions. He's also working onUnified Log Processing, which you should of course purchase several copies of, post haste :)
- AWS signing was used fromRightScale's amazingright_aws gem which works extraordinarily well! If you need access to any AWS service (EC2, S3, etc.), have a look.
camelize
was used from ActiveSupport to assist in converting parmeters to AWS request format.
Copyright 2011-2015 Robert Slifka Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
About
Ruby-based programmatic access to Amazon's Elastic MapReduce service.