Run a Data Analytics DAG in Google Cloud using data from AWS Stay organized with collections Save and categorize content based on your preferences.
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This tutorial is a modification ofRun a Data Analytics DAG in Google Cloudthat shows how to connect your Cloud Composer environment to AmazonWeb Services to utilize data stored there. It shows how to useCloud Composer to create anApache Airflow DAG. TheDAG joins data from a BigQuery public dataset and a CSV file storedin anAmazon Web Services (AWS) S3 bucket and then runs aGoogle Cloud Serverless for Apache Spark batch job to process the joined data.
The BigQuery public dataset in this tutorial isghcn_d, an integrated database of climate summariesacross the globe. TheCSV file contains informationabout the dates and names of US holidays from 1997 to 2021.
The question we want to answer using the DAG is: "How warm was it in Chicagoon Thanksgiving for the past 25 years?"
Note: This tutorial uses Airflow 2.Objectives
- Create a Cloud Composer environment in the default configuration
- Create a bucket in AWS S3
- Create an empty BigQuery dataset
- Create a new Cloud Storage bucket
- Create and run a DAG that includes the following tasks:
- Load an external dataset from S3 to Cloud Storage
- Load an external dataset from Cloud Storage toBigQuery
- Join two datasets in BigQuery
- Run a data analytics PySpark job
Before you begin
Manage permissions in AWS
Follow the "Creating policies with the visual editor section" of theCreating IAM Policies AWS tutorial to create a customizedIAM policy for AWS S3 with the following configuration:
- Service: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets), for viewing your S3 bucket - CreateBucket (
s3:CreateBucket), for creating a bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls), for creating a bucket - ListBucket (
s3:ListBucket), for granting permission to list objects in a S3 bucket - PutObject (
s3:PutObject), for uploading files to a bucket - GetBucketVersioning (
s3:GetBucketVersioning), for deleting an object in a bucket - DeleteObject (
s3:DeleteObject), for deleting an object in a bucket - ListBucketVersions (
s3:ListBucketVersions), for deleting a bucket - DeleteBucket (
s3:DeleteBucket), for deleting a bucket - Resources: Choose "Any" next to "bucket" and "object" to grantpermissions to any resources of that type.
- Tag: None
- Name: TutorialPolicy
Refer to thelist of actions supported in Amazon S3 for more information about each configuration.
Enable APIs
Enable the following APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.
gcloudservicesenabledataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Grant permissions
Grant the following roles and permissions to your user account:
Grant roles formanaging Cloud Composer environments and environment buckets.
Grant theBigQuery Data Owner (
roles/bigquery.dataOwner) role tocreate a BigQuery dataset.Grant theStorage Admin (
roles/storage.admin) role tocreate a Cloud Storage bucket.
Create and prepare your Cloud Composer environment
Create a Cloud Composer environment with defaultparameters:
- Choose a US-based region.
- Choose the latestCloud Composer version.
USmultiregion. We recommend choosing a US region for yourCloud Composer environment to reduce cost and latency, but thetutorial can still run if your Cloud Composer environment is inanother region.Grant the following roles to the service account used in yourCloud Composer environment in order for the Airflow workers tosuccessfully run DAG tasks:
- BigQuery User (
roles/bigquery.user) - BigQuery Data Owner (
roles/bigquery.dataOwner) - Service Account User (
roles/iam.serviceAccountUser) - Dataproc Editor (
roles/dataproc.editor) - Dataproc Worker (
roles/dataproc.worker)
- BigQuery User (
Create and modify related resources in Google Cloud
Install the
apache-airflow-providers-amazonPyPI package in yourCloud Composer environment.Create an empty BigQuery datasetwith the following parameters:
- Name:
holiday_weather - Region:
US
- Name:
Create a new Cloud Storage bucketin the
USmultiregion.Run the following command toenable Private Google Accesson the default subnet in the region where you would like to runGoogle Cloud Serverless for Apache Spark to fulfillnetworking requirements. Werecommend using the same region as your Cloud Composerenvironment.
gcloudcomputenetworkssubnetsupdatedefault\--regionDATAPROC_SERVERLESS_REGION\--enable-private-ip-google-access
Create related resources in AWS
Create an S3 bucket with default settings in yourpreferred region.
Connect to AWS from Cloud Composer
Note: We recommend tostore all credentials for connections inSecret Manager. If you plan to use this tutorial forproduction purposes, consider configuring Secret Manager foryour environment. For more information, seeConfigure Secret Manager for yourenvironment.- Get your AWS access key ID and secret access key
Add your AWS S3connection using the Airflow UI:
- Go toAdmin> Connections.
Create a new connection with the following configuration:
- Connection Id:
aws_s3_connection - Connection Type:
Amazon S3 - Extras (or Extra Fields JSON):
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- Connection Id:
Data processing using Google Cloud Serverless for Apache Spark
This section describes processing data with Google Cloud Serverless for Apache Spark.
Explore the example PySpark Job
The code shown below is an example PySpark job that converts temperature fromtenths of a degree in Celsius to degrees Celsius. This job convertstemperature data from the dataset into a different format.
importsysfrompy4j.protocolimportPy4JJavaErrorfrompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportcolif__name__=="__main__":BUCKET_NAME=sys.argv[1]READ_TABLE=sys.argv[2]WRITE_TABLE=sys.argv[3]# Create a SparkSession, viewable via the Spark UIspark=SparkSession.builder.appName("data_processing").getOrCreate()# Load data into dataframe if READ_TABLE existstry:df=spark.read.format("bigquery").load(READ_TABLE)exceptPy4JJavaErrorase:raiseException(f"Error reading{READ_TABLE}")frome# Convert temperature from tenths of a degree in celsius to degrees celsiusdf=df.withColumn("value",col("value")/10)# Display sample of rowsdf.show(n=20)# Write results to GCSif"--dry-run"insys.argv:print("Data will not be uploaded to BigQuery")else:# Set GCS temp locationtemp_path=BUCKET_NAME# Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector# Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run# See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes# for other save mode optionsdf.write.format("bigquery").option("temporaryGcsBucket",temp_path).mode("overwrite").save(WRITE_TABLE)print("Data written to BigQuery")Upload the PySpark file to Cloud Storage
To upload the PySpark file to Cloud Storage:
Savedata_analytics_process.pyto your local machine.
In the Google Cloud console go to theCloud Storage browser page:
Click the name of the bucket you created earlier.
In theObjects tab for the bucket, click theUpload files button,select
data_analytics_process.pyin the dialog that appears, and clickOpen.
Upload the CSV file to AWS S3
To upload theholidays.csv file:
- Save
holidays.csvon your local machine. - Follow theAWS guide to upload the file to yourbucket.
Data analytics DAG
This section describes configuring and using the data analytics DAG.
Explore the example DAG
The DAG uses multiple operators to transform and unify the data:
The
S3ToGCSOperatortransfers theholidays.csv file from your AWS S3 bucketto your Cloud Storage bucket.The
GCSToBigQueryOperatoringests theholidays.csv file from Cloud Storageto a new table in the BigQueryholidays_weatherdataset youcreated earlier.The
DataprocCreateBatchOperatorcreates andruns a PySpark batch job using Serverless for Apache Spark.The
BigQueryInsertJobOperatorjoins the datafromholidays.csv on the "Date" column with weather datafrom the BigQuery public datasetghcn_d. TheBigQueryInsertJobOperatortasks aredynamically generated using a for loop, and these tasks are in aTaskGroupfor better readability in the GraphView of the Airflow UI.
importdatetimefromairflowimportmodelsfromairflow.providers.google.cloud.operatorsimportdataprocfromairflow.providers.google.cloud.operators.bigqueryimportBigQueryInsertJobOperatorfromairflow.providers.google.cloud.transfers.gcs_to_bigqueryimport(GCSToBigQueryOperator,)fromairflow.providers.google.cloud.transfers.s3_to_gcsimportS3ToGCSOperatorfromairflow.utils.task_groupimportTaskGroupPROJECT_NAME="{{var.value.gcp_project}}"REGION="{{var.value.gce_region}}"# BigQuery configsBQ_DESTINATION_DATASET_NAME="holiday_weather"BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"BQ_NORMALIZED_TABLE_NAME="holidays_weather_normalized"# Dataproc configsBUCKET_NAME="{{var.value.gcs_bucket}}"PYSPARK_JAR="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"PROCESSING_PYTHON_FILE=f"gs://{BUCKET_NAME}/data_analytics_process.py"# S3 configsS3_BUCKET_NAME="{{var.value.s3_bucket}}"BATCH_ID="data-processing-{{ ts_nodash | lower}}"# Dataproc serverless only allows lowercase charactersBATCH_CONFIG={"pyspark_batch":{"jar_file_uris":[PYSPARK_JAR],"main_python_file_uri":PROCESSING_PYTHON_FILE,"args":[BUCKET_NAME,f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",],},"environment_config":{"execution_config":{"service_account":"{{var.value.dataproc_service_account}}"}},}yesterday=datetime.datetime.combine(datetime.datetime.today()-datetime.timedelta(1),datetime.datetime.min.time())default_dag_args={# Setting start date as yesterday starts the DAG immediately when it is# detected in the Cloud Storage bucket."start_date":yesterday,# To email on failure or retry set 'email' arg to your email and enable# emailing here."email_on_failure":False,"email_on_retry":False,}withmodels.DAG("s3_to_gcs_dag",# Continue to run DAG once per dayschedule_interval=datetime.timedelta(days=1),default_args=default_dag_args,)asdag:s3_to_gcs_op=S3ToGCSOperator(task_id="s3_to_gcs",bucket=S3_BUCKET_NAME,gcp_conn_id="google_cloud_default",aws_conn_id="aws_s3_connection",dest_gcs=f"gs://{BUCKET_NAME}",)create_batch=dataproc.DataprocCreateBatchOperator(task_id="create_batch",project_id=PROJECT_NAME,region=REGION,batch=BATCH_CONFIG,batch_id=BATCH_ID,)load_external_dataset=GCSToBigQueryOperator(task_id="run_bq_external_ingestion",bucket=BUCKET_NAME,source_objects=["holidays.csv"],destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",source_format="CSV",schema_fields=[{"name":"Date","type":"DATE"},{"name":"Holiday","type":"STRING"},],skip_leading_rows=1,write_disposition="WRITE_TRUNCATE",)withTaskGroup("join_bq_datasets")asbq_join_group:foryearinrange(1997,2022):BQ_DATASET_NAME=f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"# Specifically query a Chicago weather stationWEATHER_HOLIDAYS_JOIN_QUERY=f""" SELECT Holidays.Date, Holiday, id, element, value FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays JOIN (SELECT id, date, element, value FROM{BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather ON Holidays.Date = Weather.Date; """# For demo purposes we are using WRITE_APPEND# but if you run the DAG repeatedly it will continue to append# Your use case may be different, see the Job docs# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job# for alternative values for the writeDisposition# or consider using partitioned tables# https://cloud.google.com/bigquery/docs/partitioned-tablesbq_join_holidays_weather_data=BigQueryInsertJobOperator(task_id=f"bq_join_holidays_weather_data_{str(year)}",configuration={"query":{"query":WEATHER_HOLIDAYS_JOIN_QUERY,"useLegacySql":False,"destinationTable":{"projectId":PROJECT_NAME,"datasetId":BQ_DESTINATION_DATASET_NAME,"tableId":BQ_DESTINATION_TABLE_NAME,},"writeDisposition":"WRITE_APPEND",}},location="US",)s3_to_gcs_op >>load_external_dataset >>bq_join_group >>create_batchUse the Airflow UI to add variables
In Airflow,variables are an universal way to storeand retrieve arbitrary settings or configurations as a simple key value store.This DAG uses Airflow variables to store common values. To add them to yourenvironment:
Access the Airflow UI fromGoogle Cloud console.
Go toAdmin> Variables.
Add the following variables:
s3_bucket: the name of the S3 bucket you created earlier.gcp_project: your project ID.gcs_bucket: the name of the bucket you created earlier(without thegs://prefix).gce_region: the region where you want yourDataproc job that meets Google Cloud Serverless for Apache Sparknetworking requirements. This is the region whereyou enabled Private Google Access earlier.dataproc_service_account: the service account for yourCloud Composer environment. You can find this serviceaccount on the environment configuration tab for yourCloud Composer environment.
Upload the DAG to your environment's bucket
Cloud Composer schedules DAGs that are located in the/dags folder in your environment's bucket. To upload the DAG using theGoogle Cloud console:
On your local machine, saves3togcsoperator_tutorial.py.
In Google Cloud console, go to theEnvironments page.
In the list of environments, in theDAG folder column clicktheDAGs link. The DAGs folder of your environment opens.
ClickUpload files.
Select
s3togcsoperator_tutorial.pyon your local machine and clickOpen.
Trigger the DAG
In your Cloud Composer environment, click theDAGs tab.
Click into DAG id
s3_to_gcs_dag.ClickTrigger DAG.
Wait about five to ten minutes until you see a green check indicating thetasks have been completed successfully.
Validate the DAG's success
In Google Cloud console, go to theBigQuery page.
In theExplorer panel, click your project name.
Click
holidays_weather_joined.Click preview to view the resulting table. Note that the numbers in thevalue column are in tenths of a degree Celsius.
Click
holidays_weather_normalized.Click preview to view the resulting table. Note that the numbers in thevalue column are in degree Celsius.
Cleanup
Delete individual resources that you created for this tutorial:
Delete the
holidays.csvfile in your AWS S3bucket.Delete the AWS S3 bucket that you created.
Delete the Cloud Storage bucket that youcreated for this tutorial.
Delete the Cloud Composer environment, includingmanually deleting the environment's bucket.
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.