Run a Data Analytics DAG in Google Cloud using data from AWS

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This tutorial is a modification ofRun a Data Analytics DAG in Google Cloudthat shows how to connect your Cloud Composer environment to AmazonWeb Services to utilize data stored there. It shows how to useCloud Composer to create anApache Airflow DAG. TheDAG joins data from a BigQuery public dataset and a CSV file storedin anAmazon Web Services (AWS) S3 bucket and then runs aGoogle Cloud Serverless for Apache Spark batch job to process the joined data.

The BigQuery public dataset in this tutorial isghcn_d, an integrated database of climate summariesacross the globe. TheCSV file contains informationabout the dates and names of US holidays from 1997 to 2021.

The question we want to answer using the DAG is: "How warm was it in Chicagoon Thanksgiving for the past 25 years?"

Note: This tutorial uses Airflow 2.

Objectives

  • Create a Cloud Composer environment in the default configuration
  • Create a bucket in AWS S3
  • Create an empty BigQuery dataset
  • Create a new Cloud Storage bucket
  • Create and run a DAG that includes the following tasks:
    • Load an external dataset from S3 to Cloud Storage
    • Load an external dataset from Cloud Storage toBigQuery
    • Join two datasets in BigQuery
    • Run a data analytics PySpark job

Before you begin

Manage permissions in AWS

  1. Create an AWS account.

  2. Follow the "Creating policies with the visual editor section" of theCreating IAM Policies AWS tutorial to create a customizedIAM policy for AWS S3 with the following configuration:

    • Service: S3
    • ListAllMyBuckets (s3:ListAllMyBuckets), for viewing your S3 bucket
    • CreateBucket (s3:CreateBucket), for creating a bucket
    • PutBucketOwnershipControls (s3:PutBucketOwnershipControls), for creating a bucket
    • ListBucket (s3:ListBucket), for granting permission to list objects in a S3 bucket
    • PutObject (s3:PutObject), for uploading files to a bucket
    • GetBucketVersioning (s3:GetBucketVersioning), for deleting an object in a bucket
    • DeleteObject (s3:DeleteObject), for deleting an object in a bucket
    • ListBucketVersions (s3:ListBucketVersions), for deleting a bucket
    • DeleteBucket (s3:DeleteBucket), for deleting a bucket
    • Resources: Choose "Any" next to "bucket" and "object" to grantpermissions to any resources of that type.
    • Tag: None
    • Name: TutorialPolicy

    Refer to thelist of actions supported in Amazon S3 for more information about each configuration.

  3. Add theTutorialPolicy IAM policy to your identity.

Caution: Choosing "Any" for "bucket" and "object" will grant permission to anyresources in S3. To narrow the scope of this policy, you can limit thepermissions to a specific bucket and object once you have created them laterin the tutorial.

Enable APIs

Enable the following APIs:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

gcloudservicesenabledataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Grant permissions

Grant the following roles and permissions to your user account:

Create and prepare your Cloud Composer environment

  1. Create a Cloud Composer environment with defaultparameters:

    Note: The BigQuery portion of this tutorial must run in theUSmultiregion. We recommend choosing a US region for yourCloud Composer environment to reduce cost and latency, but thetutorial can still run if your Cloud Composer environment is inanother region.
  2. Grant the following roles to the service account used in yourCloud Composer environment in order for the Airflow workers tosuccessfully run DAG tasks:

Create and modify related resources in Google Cloud

  1. Install theapache-airflow-providers-amazonPyPI package in yourCloud Composer environment.

  2. Create an empty BigQuery datasetwith the following parameters:

    • Name:holiday_weather
    • Region:US
  3. Create a new Cloud Storage bucketin theUS multiregion.

  4. Run the following command toenable Private Google Accesson the default subnet in the region where you would like to runGoogle Cloud Serverless for Apache Spark to fulfillnetworking requirements. Werecommend using the same region as your Cloud Composerenvironment.

    gcloudcomputenetworkssubnetsupdatedefault\--regionDATAPROC_SERVERLESS_REGION\--enable-private-ip-google-access
Caution: The empty BigQuery dataset must be located in the US regionto satisfycolocation requirements for transferring theBigQuery public datasetghcn_d.

Create related resources in AWS

Create an S3 bucket with default settings in yourpreferred region.

Connect to AWS from Cloud Composer

Note: We recommend tostore all credentials for connections inSecret Manager. If you plan to use this tutorial forproduction purposes, consider configuring Secret Manager foryour environment. For more information, seeConfigure Secret Manager for yourenvironment.
  1. Get your AWS access key ID and secret access key
  2. Add your AWS S3connection using the Airflow UI:

    1. Go toAdmin> Connections.
    2. Create a new connection with the following configuration:

      • Connection Id:aws_s3_connection
      • Connection Type:Amazon S3
      • Extras (or Extra Fields JSON):{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}

Data processing using Google Cloud Serverless for Apache Spark

This section describes processing data with Google Cloud Serverless for Apache Spark.

Explore the example PySpark Job

The code shown below is an example PySpark job that converts temperature fromtenths of a degree in Celsius to degrees Celsius. This job convertstemperature data from the dataset into a different format.

importsysfrompy4j.protocolimportPy4JJavaErrorfrompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportcolif__name__=="__main__":BUCKET_NAME=sys.argv[1]READ_TABLE=sys.argv[2]WRITE_TABLE=sys.argv[3]# Create a SparkSession, viewable via the Spark UIspark=SparkSession.builder.appName("data_processing").getOrCreate()# Load data into dataframe if READ_TABLE existstry:df=spark.read.format("bigquery").load(READ_TABLE)exceptPy4JJavaErrorase:raiseException(f"Error reading{READ_TABLE}")frome# Convert temperature from tenths of a degree in celsius to degrees celsiusdf=df.withColumn("value",col("value")/10)# Display sample of rowsdf.show(n=20)# Write results to GCSif"--dry-run"insys.argv:print("Data will not be uploaded to BigQuery")else:# Set GCS temp locationtemp_path=BUCKET_NAME# Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector# Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run# See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes# for other save mode optionsdf.write.format("bigquery").option("temporaryGcsBucket",temp_path).mode("overwrite").save(WRITE_TABLE)print("Data written to BigQuery")

Upload the PySpark file to Cloud Storage

To upload the PySpark file to Cloud Storage:

  1. Savedata_analytics_process.pyto your local machine.

  2. In the Google Cloud console go to theCloud Storage browser page:

    Go to Cloud Storage browser

  3. Click the name of the bucket you created earlier.

  4. In theObjects tab for the bucket, click theUpload files button,selectdata_analytics_process.py in the dialog that appears, and clickOpen.

Upload the CSV file to AWS S3

To upload theholidays.csv file:

  1. Saveholidays.csv on your local machine.
  2. Follow theAWS guide to upload the file to yourbucket.

Data analytics DAG

This section describes configuring and using the data analytics DAG.

Explore the example DAG

The DAG uses multiple operators to transform and unify the data:

importdatetimefromairflowimportmodelsfromairflow.providers.google.cloud.operatorsimportdataprocfromairflow.providers.google.cloud.operators.bigqueryimportBigQueryInsertJobOperatorfromairflow.providers.google.cloud.transfers.gcs_to_bigqueryimport(GCSToBigQueryOperator,)fromairflow.providers.google.cloud.transfers.s3_to_gcsimportS3ToGCSOperatorfromairflow.utils.task_groupimportTaskGroupPROJECT_NAME="{{var.value.gcp_project}}"REGION="{{var.value.gce_region}}"# BigQuery configsBQ_DESTINATION_DATASET_NAME="holiday_weather"BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"BQ_NORMALIZED_TABLE_NAME="holidays_weather_normalized"# Dataproc configsBUCKET_NAME="{{var.value.gcs_bucket}}"PYSPARK_JAR="gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"PROCESSING_PYTHON_FILE=f"gs://{BUCKET_NAME}/data_analytics_process.py"# S3 configsS3_BUCKET_NAME="{{var.value.s3_bucket}}"BATCH_ID="data-processing-{{ ts_nodash | lower}}"# Dataproc serverless only allows lowercase charactersBATCH_CONFIG={"pyspark_batch":{"jar_file_uris":[PYSPARK_JAR],"main_python_file_uri":PROCESSING_PYTHON_FILE,"args":[BUCKET_NAME,f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",],},"environment_config":{"execution_config":{"service_account":"{{var.value.dataproc_service_account}}"}},}yesterday=datetime.datetime.combine(datetime.datetime.today()-datetime.timedelta(1),datetime.datetime.min.time())default_dag_args={# Setting start date as yesterday starts the DAG immediately when it is# detected in the Cloud Storage bucket."start_date":yesterday,# To email on failure or retry set 'email' arg to your email and enable# emailing here."email_on_failure":False,"email_on_retry":False,}withmodels.DAG("s3_to_gcs_dag",# Continue to run DAG once per dayschedule_interval=datetime.timedelta(days=1),default_args=default_dag_args,)asdag:s3_to_gcs_op=S3ToGCSOperator(task_id="s3_to_gcs",bucket=S3_BUCKET_NAME,gcp_conn_id="google_cloud_default",aws_conn_id="aws_s3_connection",dest_gcs=f"gs://{BUCKET_NAME}",)create_batch=dataproc.DataprocCreateBatchOperator(task_id="create_batch",project_id=PROJECT_NAME,region=REGION,batch=BATCH_CONFIG,batch_id=BATCH_ID,)load_external_dataset=GCSToBigQueryOperator(task_id="run_bq_external_ingestion",bucket=BUCKET_NAME,source_objects=["holidays.csv"],destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",source_format="CSV",schema_fields=[{"name":"Date","type":"DATE"},{"name":"Holiday","type":"STRING"},],skip_leading_rows=1,write_disposition="WRITE_TRUNCATE",)withTaskGroup("join_bq_datasets")asbq_join_group:foryearinrange(1997,2022):BQ_DATASET_NAME=f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"# Specifically query a Chicago weather stationWEATHER_HOLIDAYS_JOIN_QUERY=f"""            SELECT Holidays.Date, Holiday, id, element, value            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays            JOIN (SELECT id, date, element, value FROM{BQ_DATASET_NAME} AS Table            WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather            ON Holidays.Date = Weather.Date;            """# For demo purposes we are using WRITE_APPEND# but if you run the DAG repeatedly it will continue to append# Your use case may be different, see the Job docs# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job# for alternative values for the writeDisposition# or consider using partitioned tables# https://cloud.google.com/bigquery/docs/partitioned-tablesbq_join_holidays_weather_data=BigQueryInsertJobOperator(task_id=f"bq_join_holidays_weather_data_{str(year)}",configuration={"query":{"query":WEATHER_HOLIDAYS_JOIN_QUERY,"useLegacySql":False,"destinationTable":{"projectId":PROJECT_NAME,"datasetId":BQ_DESTINATION_DATASET_NAME,"tableId":BQ_DESTINATION_TABLE_NAME,},"writeDisposition":"WRITE_APPEND",}},location="US",)s3_to_gcs_op >>load_external_dataset >>bq_join_group >>create_batch

Use the Airflow UI to add variables

In Airflow,variables are an universal way to storeand retrieve arbitrary settings or configurations as a simple key value store.This DAG uses Airflow variables to store common values. To add them to yourenvironment:

  1. Access the Airflow UI fromGoogle Cloud console.

  2. Go toAdmin> Variables.

  3. Add the following variables:

    • s3_bucket: the name of the S3 bucket you created earlier.

    • gcp_project: your project ID.

    • gcs_bucket: the name of the bucket you created earlier(without thegs:// prefix).

    • gce_region: the region where you want yourDataproc job that meets Google Cloud Serverless for Apache Sparknetworking requirements. This is the region whereyou enabled Private Google Access earlier.

    • dataproc_service_account: the service account for yourCloud Composer environment. You can find this serviceaccount on the environment configuration tab for yourCloud Composer environment.

Upload the DAG to your environment's bucket

Cloud Composer schedules DAGs that are located in the/dags folder in your environment's bucket. To upload the DAG using theGoogle Cloud console:

  1. On your local machine, saves3togcsoperator_tutorial.py.

  2. In Google Cloud console, go to theEnvironments page.

    Go to Environments

  3. In the list of environments, in theDAG folder column clicktheDAGs link. The DAGs folder of your environment opens.

  4. ClickUpload files.

  5. Selects3togcsoperator_tutorial.py on your local machine and clickOpen.

Trigger the DAG

  1. In your Cloud Composer environment, click theDAGs tab.

  2. Click into DAG ids3_to_gcs_dag.

  3. ClickTrigger DAG.

  4. Wait about five to ten minutes until you see a green check indicating thetasks have been completed successfully.

Validate the DAG's success

  1. In Google Cloud console, go to theBigQuery page.

    Go to BigQuery

  2. In theExplorer panel, click your project name.

  3. Clickholidays_weather_joined.

  4. Click preview to view the resulting table. Note that the numbers in thevalue column are in tenths of a degree Celsius.

  5. Clickholidays_weather_normalized.

  6. Click preview to view the resulting table. Note that the numbers in thevalue column are in degree Celsius.

Cleanup

Delete individual resources that you created for this tutorial:

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.