Running Delta Lake on Amazon EMR Serverless
Amazon EMR Serverless is a brand new AWS Service made generally available in June 1st, 2022. With this service, it is possible to run serverless Spark clusters that can process TB scale data very easily and using any spark open source libraries. Getting started with EMR Serverless can be a bit tricky. The goal of this post is to help you get your Spark+Delta jobs up and running "serverlessly". Let's get to it!
Setup - Authentication
In order to run EMR Serverless you'll need to configure two IAM roles, a service-linked role and an access authorization role for your spark jobs. The service-linked role is very straightforward to create. Go to IAM on the AWS console, click on roles and click onCreate role,
chooseAmazon EMR Serverless,
and choose default settings until you finish creating the role. Next, create a job role with permissions to access S3 and glue. We will create a very open role (not the best practice) for didactic purposes. In a "production" environment, you should make your permissions very strict.
Click againCreate role on the AWS console Roles section and markCustom Trust Policy. Below, in the "Service" key, replace "{}" withemr-serverless.amazonaws.com.
Next, you can select 2 AWS managed policies, "AmazonS3FullAccess" and "AWSGlueConsoleFullAccess". Click next, give your new role an easy identifiable name (like "EMRServerlessJobRole") and finish creating the role.
Setup - Data
For this post, we are working with the (very famous)titanic dataset which you can downloadhere and upload to S3.
Data Pipeline Strategy
Delta Lake is a great tool that implements the Lakehouse architecture. It has many cool features (such as schema evolution, data time travel, transaction logs, ACID transactions) and it is fundamentally valuable when we have a case of incremental data ingestion. Thus, we are going to simulate some changes in titanic dataset. We will include two new passengers (Ney and Sarah) and we will update information on two passengers that were presumed dead but found alive(!!!), Mr. Owen Braund and Mr. William Allen.
First version of data is written as a delta table and the updates will be written with anupsert transaction.
Python code to do those operations is presented below:
frompyspark.sqlimportfunctionsasffrompyspark.sqlimportSparkSessionspark=(SparkSession.builder.getOrCreate())fromdelta.tablesimport*print("Reading CSV file from S3...")schema="PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"df=spark.read.csv("s3://<YOUR-BUCKET>/titanic",header=True,schema=schema,sep=";")print("Writing titanic dataset as a delta table...")df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")print("Updating and inserting new rows...")new=df.where("PassengerId IN (1, 5)")new=new.withColumn("Survived",f.lit(1))newrows=[(892,1,1,"Sarah Crepalde","female",23.0,1,0,None,None,None,None),(893,0,1,"Ney Crepalde","male",35.0,1,0,None,None,None,None)]newrowsdf=spark.createDataFrame(newrows,schema=schema)new=new.union(newrowsdf)print("Create a delta table object...")old=DeltaTable.forPath(spark,"s3://<YOUR-BUCKET>/silver/titanic_delta")print("UPSERT...")# UPSERT(old.alias("old").merge(new.alias("new"),"old.PassengerId = new.PassengerId").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute())print("Checking if everything is ok")print("New data...")(spark.read.format("delta").load("s3://<YOUR-BUCKET>/silver/titanic_delta").where("PassengerId < 6 OR PassengerId > 888").show())print("Old data - with time travel")(spark.read.format("delta").option("versionAsOf","0").load("s3://<YOUR-BUCKET>/silver/titanic_delta").where("PassengerId < 6 OR PassengerId > 888").show())
This.py
file should be uploaded to S3.
Dependencies
One thing about EMR Serverless latest release available (6.6.0) is that thespark-submit
flag--packages
isnot available yet (😢). So, we have an extra step to use java dependencies and python dependencies.
Jars
To use java dependencies, we have to build them manually into a single .jar file. AWS has provided aDockerfile that we can use to build the dependencies without having to install maven locally (😍). I used thispom.xml
file to define the dependencies:
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.serverless-samples</groupId><artifactId>jars</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>jars</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>io.delta</groupId><artifactId>delta-core_2.12</artifactId><version>1.2.1</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions><configuration><finalName>uber-${artifactId}-${version}</finalName></configuration></plugin></plugins></build></project>
We build the final .jar file with the command
docker build-f Dockerfile.jars--output..
The outputuber-jars-1.0-SNAPSHOT.jar
must be uploaded to S3.
With this .jar file, we can use.format("delta")
in our python code but if we try toimport delta.tables
we will get a python dependency error.
Python
We can build python dependencies in two ways: uploading dependencies files to S3 or building a virtual environment to use in EMR Serverless. For this post, uploading a zip file with delta python library was very simple to do.
mkdirdependenciespipinstalldelta-spark==1.2.1--target dependenciescddependencieszip-r9 ../emrserverless_dependencies.zip.
Theemrserverless_dependencies.zip
file must also be uploaded to S3.
Now, we are ready to configure our serverless Spark application.
EMR Serverless
First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clickingGet started in the EMR Serverless home page, you can click to create a studio automatically.
Then click to enter the studio url.
With EMR Serverless, we don't have to create a cluster. Instead, we work with theapplication concept. To create a new EMR Serverless application, clickCreate application, type an application name, select version and clickCreate application again at the bottom of the page.
Now, the last thing to do is to submit a spark job. If you haveaws cli installed, the code below will submit a job spark job.
aws emr-serverless start-job-run\--name Delta-Upsert\--application-id <YOUR-APPLICATION-ID>\--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole\--job-driver'{ "sparkSubmit": { "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py", "sparkSubmitParameters": "--jars s3://<YOUR-BUCKET>/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar --conf spark.submit.pyFiles=s3://<YOUR-BUCKET>/pyspark/dependencies/emrserverless_dependencies.zip" }}'\--configuration-overrides'{"monitoringConfiguration": { "s3MonitoringConfiguration": { "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"} } }'
Some important parameters:
- entrypoint sets the s3 path for you pyspark script
- sparkSubmitParameters: you should add the java dependencies with the
--jars
flag and set the--conf spark.submit.pyFiles=<YOUR .py/.zip/.egg FILE>
- s3MonitoringConfiguration sets the s3 path that will be used to save job logs.
If you wish to use the console, set the job name, role and script location
and .jar file and .zip file location as follows
Spark job should start after this. When it finishes, check the logs folder in s3 (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this
Reading CSV file from S3...Writing titanic dataset as a delta table...Updating and inserting new rows...Create a delta table object...UPSERT...Checking if everything is okNew data...+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+| 1| 1| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|| 5| 1| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|| 892| 1| 1| Sarah Crepalde|female|23.0| 1| 0| null| null| null| null|| 893| 0| 1| Ney Crepalde| male|35.0| 1| 0| null| null| null| null|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+Old data - with time travel+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Notice the same delta table being shown in 2 different moments using time travel. The last version, with Mr. Braund and Mr. Allen marked as alive and the new passengers in the first table and the original version of the dataset in the second table.
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse