Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

AWS Community Builders  profile imageNeylson Crepalde
Neylson Crepalde forAWS Community Builders

Posted on

     

Running Delta Lake on Amazon EMR Serverless

Amazon EMR Serverless is a brand new AWS Service made generally available in June 1st, 2022. With this service, it is possible to run serverless Spark clusters that can process TB scale data very easily and using any spark open source libraries. Getting started with EMR Serverless can be a bit tricky. The goal of this post is to help you get your Spark+Delta jobs up and running "serverlessly". Let's get to it!

Setup - Authentication

In order to run EMR Serverless you'll need to configure two IAM roles, a service-linked role and an access authorization role for your spark jobs. The service-linked role is very straightforward to create. Go to IAM on the AWS console, click on roles and click onCreate role,

IAM Role console

chooseAmazon EMR Serverless,

Configurations to create a Service role

and choose default settings until you finish creating the role. Next, create a job role with permissions to access S3 and glue. We will create a very open role (not the best practice) for didactic purposes. In a "production" environment, you should make your permissions very strict.

Click againCreate role on the AWS console Roles section and markCustom Trust Policy. Below, in the "Service" key, replace "{}" withemr-serverless.amazonaws.com.

Configurations for Custom trust policy

Next, you can select 2 AWS managed policies, "AmazonS3FullAccess" and "AWSGlueConsoleFullAccess". Click next, give your new role an easy identifiable name (like "EMRServerlessJobRole") and finish creating the role.

Setup - Data

For this post, we are working with the (very famous)titanic dataset which you can downloadhere and upload to S3.

Data Pipeline Strategy

Delta Lake is a great tool that implements the Lakehouse architecture. It has many cool features (such as schema evolution, data time travel, transaction logs, ACID transactions) and it is fundamentally valuable when we have a case of incremental data ingestion. Thus, we are going to simulate some changes in titanic dataset. We will include two new passengers (Ney and Sarah) and we will update information on two passengers that were presumed dead but found alive(!!!), Mr. Owen Braund and Mr. William Allen.

First version of data is written as a delta table and the updates will be written with anupsert transaction.

Python code to do those operations is presented below:

frompyspark.sqlimportfunctionsasffrompyspark.sqlimportSparkSessionspark=(SparkSession.builder.getOrCreate())fromdelta.tablesimport*print("Reading CSV file from S3...")schema="PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"df=spark.read.csv("s3://<YOUR-BUCKET>/titanic",header=True,schema=schema,sep=";")print("Writing titanic dataset as a delta table...")df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")print("Updating and inserting new rows...")new=df.where("PassengerId IN (1, 5)")new=new.withColumn("Survived",f.lit(1))newrows=[(892,1,1,"Sarah Crepalde","female",23.0,1,0,None,None,None,None),(893,0,1,"Ney Crepalde","male",35.0,1,0,None,None,None,None)]newrowsdf=spark.createDataFrame(newrows,schema=schema)new=new.union(newrowsdf)print("Create a delta table object...")old=DeltaTable.forPath(spark,"s3://<YOUR-BUCKET>/silver/titanic_delta")print("UPSERT...")# UPSERT(old.alias("old").merge(new.alias("new"),"old.PassengerId = new.PassengerId").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute())print("Checking if everything is ok")print("New data...")(spark.read.format("delta").load("s3://<YOUR-BUCKET>/silver/titanic_delta").where("PassengerId < 6 OR PassengerId > 888").show())print("Old data - with time travel")(spark.read.format("delta").option("versionAsOf","0").load("s3://<YOUR-BUCKET>/silver/titanic_delta").where("PassengerId < 6 OR PassengerId > 888").show())
Enter fullscreen modeExit fullscreen mode

This.py file should be uploaded to S3.

Dependencies

One thing about EMR Serverless latest release available (6.6.0) is that thespark-submit flag--packages isnot available yet (😢). So, we have an extra step to use java dependencies and python dependencies.

Jars

To use java dependencies, we have to build them manually into a single .jar file. AWS has provided aDockerfile that we can use to build the dependencies without having to install maven locally (😍). I used thispom.xml file to define the dependencies:

<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.serverless-samples</groupId><artifactId>jars</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>jars</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>io.delta</groupId><artifactId>delta-core_2.12</artifactId><version>1.2.1</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions><configuration><finalName>uber-${artifactId}-${version}</finalName></configuration></plugin></plugins></build></project>
Enter fullscreen modeExit fullscreen mode

We build the final .jar file with the command

docker build-f Dockerfile.jars--output..
Enter fullscreen modeExit fullscreen mode

The outputuber-jars-1.0-SNAPSHOT.jar must be uploaded to S3.

With this .jar file, we can use.format("delta") in our python code but if we try toimport delta.tables we will get a python dependency error.

Python

We can build python dependencies in two ways: uploading dependencies files to S3 or building a virtual environment to use in EMR Serverless. For this post, uploading a zip file with delta python library was very simple to do.

mkdirdependenciespipinstalldelta-spark==1.2.1--target dependenciescddependencieszip-r9 ../emrserverless_dependencies.zip.
Enter fullscreen modeExit fullscreen mode

Theemrserverless_dependencies.zip file must also be uploaded to S3.

Now, we are ready to configure our serverless Spark application.

EMR Serverless

First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clickingGet started in the EMR Serverless home page, you can click to create a studio automatically.

Create EMR Studio automatically

Then click to enter the studio url.

Studio url

With EMR Serverless, we don't have to create a cluster. Instead, we work with theapplication concept. To create a new EMR Serverless application, clickCreate application, type an application name, select version and clickCreate application again at the bottom of the page.

Application creation page

Now, the last thing to do is to submit a spark job. If you haveaws cli installed, the code below will submit a job spark job.

aws emr-serverless start-job-run\--name Delta-Upsert\--application-id <YOUR-APPLICATION-ID>\--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole\--job-driver'{  "sparkSubmit": {    "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py",     "sparkSubmitParameters": "--jars s3://<YOUR-BUCKET>/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar --conf spark.submit.pyFiles=s3://<YOUR-BUCKET>/pyspark/dependencies/emrserverless_dependencies.zip"  }}'\--configuration-overrides'{"monitoringConfiguration": {  "s3MonitoringConfiguration": {    "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"}   } }'
Enter fullscreen modeExit fullscreen mode

Some important parameters:

  • entrypoint sets the s3 path for you pyspark script
  • sparkSubmitParameters: you should add the java dependencies with the--jars flag and set the--conf spark.submit.pyFiles=<YOUR .py/.zip/.egg FILE>
  • s3MonitoringConfiguration sets the s3 path that will be used to save job logs.

If you wish to use the console, set the job name, role and script location

Job initial parameters

and .jar file and .zip file location as follows

Jar files parameters

Spark job should start after this. When it finishes, check the logs folder in s3 (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this

Reading CSV file from S3...Writing titanic dataset as a delta table...Updating and inserting new rows...Create a delta table object...UPSERT...Checking if everything is okNew data...+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|          1|       1|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S||          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C||          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S||          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S||          5|       1|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S||        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S||        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C||        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q||        892|       1|     1|      Sarah Crepalde|female|23.0|    1|    0|            null|   null| null|    null||        893|       0|     1|        Ney Crepalde|  male|35.0|    1|    0|            null|   null| null|    null|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+Old data - with time travel+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S||          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C||          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S||          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S||          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S||        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S||        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C||        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Enter fullscreen modeExit fullscreen mode

Notice the same delta table being shown in 2 different moments using time travel. The last version, with Mr. Braund and Mr. Allen marked as alive and the new passengers in the first table and the original version of the dataset in the second table.

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Build On!

Would you like to become an AWS Community Builder? Learn more about the program and apply to join when applications are open next.

More fromAWS Community Builders

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp