Run an Apache Airflow DAG in Cloud Composer 3 (Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This quickstart guide shows you how to create a Cloud Composer environmentand run an Apache Airflow DAG in Cloud Composer 3.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  9. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  10. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenablecomposer.googleapis.com
  13. To get the permissions that you need to complete this quickstart, ask your administrator to grant you the following IAM roles on your project:

    For more information about granting roles, seeManage access to projects, folders, and organizations.

    You might also be able to get the required permissions throughcustom roles or otherpredefined roles.

Create an environment's service account

When you create an environment, you specify a service account. This serviceaccount is calledenvironment's service account. Your environment uses thisservice account to perform most of the operations.

The service account for your environment is not a user account. Aservice account is a special kind of account used by an application or avirtual machine (VM) instance, not a person.

To create a service account for your environment:

  1. Create a new service account, as described inthe Identity and Access Management documentation.

  2. Grant a role to it, as described in the Identity and Access Managementdocumentation. The required role isComposer Worker (composer.worker).

Create an environment

Create a new environment namedexample-environment in theus-central1region, with the latest Cloud Composer 3version.

gcloudcomposerenvironmentscreateexample-environment\--locationus-central1\--image-versioncomposer-3-airflow-2.10.5-build.23\--service-accountENVIRONMENT_SERVICE_ACCOUNT

ReplaceENVIRONMENT_SERVICE_ACCOUNT with the service account for yourenvironment that youhave created eariler.

Create a DAG file

AnAirflow DAG is a collection of organized tasks thatyou want to schedule and run. DAGs are defined in standard Python files.

This guide uses an example Airflow DAG defined in thequickstart.py file.Python code in this file does the following:

  1. Creates a DAG,composer_sample_dag. This DAG runs every day.
  2. Executes one task,print_dag_run_conf. The task prints the DAG run'sconfiguration by using the bash operator.

Save a copy of thequickstart.py file on your local machine:

importdatetimefromairflowimportmodelsfromairflow.operatorsimportbash# If you are running Airflow in more than one time zone# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html# for best practicesYESTERDAY=datetime.datetime.now()-datetime.timedelta(days=1)default_args={"owner":"Composer Example","depends_on_past":False,"email":[""],"email_on_failure":False,"email_on_retry":False,"retries":1,"retry_delay":datetime.timedelta(minutes=5),"start_date":YESTERDAY,}withmodels.DAG("composer_quickstart",catchup=False,default_args=default_args,schedule_interval=datetime.timedelta(days=1),)asdag:# Print the dag_run id from the Airflow logsprint_dag_run_conf=bash.BashOperator(task_id="print_dag_run_conf",bash_command="echo {{ dag_run.id }}")

Upload the DAG file to your environment's bucket

Every Cloud Composer environment has a Cloud Storagebucket associated with it. Airflow in Cloud Composer schedules onlyDAGs that are located in the/dags folder in this bucket.

To schedule your DAG, uploadquickstart.py from your local machine to yourenvironment's/dags folder:

To uploadquickstart.py with Google Cloud CLI, run the following command inthe folder where thequickstart.py file is located:

gcloudcomposerenvironmentsstoragedagsimport\--environmentexample-environment--locationus-central1\--sourcequickstart.py

View the DAG

After you upload the DAG file, Airflow does the following:

  1. Parses the DAG file that you uploaded. It might take a few minutes for theDAG to become available to Airflow.
  2. Adds the DAG to the list of available DAGs.
  3. Executes the DAG according to the schedule you provided in the DAG file.

Check that your DAG is processed without errors and is available in Airflow byviewing it in DAG UI. DAG UI is Cloud Composer interface for viewingDAG information in Google Cloud console. Cloud Composer also providesaccess toAirflow UI, which is a native Airflow webinterface.

  1. Wait about five minutes to give Airflow time to process the DAG filethat you uploaded previously, and to complete the first DAG run(explained later).

  2. Run the following command in Google Cloud CLI. This command executes thedags listAirflow CLI command that lists DAGs in yourenvironment.

    gcloudcomposerenvironmentsrunexample-environment\--locationus-central1\dagslist
  3. Check that thecomposer_quickstart DAG is listed in the command's output.

    Example output:

    Executing the command: [ airflow dags list ]...Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcbUse ctrl-c to interrupt the commanddag_id              | filepath              | owner            | paused====================+=======================+==================+=======airflow_monitoring  | airflow_monitoring.py | airflow          | Falsecomposer_quickstart | dag-quickstart-af2.py | Composer Example | False
    Note: Theairflow_monitoring DAG is used by Cloud Composer tomonitor the health of your environment.

View DAG run details

A single execution of a DAG is called aDAG run. Airflow immediatelyexecutes a DAG run for the example DAG because the start date in the DAG file isset to yesterday. In this way, Airflow catches up to the specified DAG'sschedule.

The example DAG contains one task,print_dag_run_conf, which runs theechocommand in the console. This command outputs meta information about the DAG(DAG run's numeric identifier).

Run the following command in Google Cloud CLI. This command lists DAG runsfor thecomposer_quickstart DAG:

gcloudcomposerenvironmentsrunexample-environment\--locationus-central1\dagslist-runs----dag-idcomposer_quickstart

Example output:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date====================+=============================================+=========+==================================+==================================+=================================composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
Note: You can use thetasks states-for-dag-run command to get the statusof individual tasks.

Airflow CLI does not provide a command to view task logs. You can useother methods toview Airflow task logs:Cloud Composer DAG UI, Airflow UI, or Cloud Logging. This guideshows a way to query Cloud Logging for logs from a specific DAG run.

Run the following command in Google Cloud CLI. This command reads logs fromCloud Logging for a specific DAG run of thecomposer_quickstart DAG. The--format argument formats the output so that only the text of the log messageis displayed.

gcloudloggingread\--format="value(textPayload)"\--order=asc\"resource.type=cloud_composer_environment \resource.labels.location=us-central1 \resource.labels.environment_name=example-environment \labels.workflow=composer_quickstart \(labels.\"execution-date\"=\"RUN_ID\")"

Replace:

  • RUN_ID with therun_id value from the output of thetasks states-for-dag-run command that you run previously. Forexample,2024-02-17T15:38:38.969307+00:00.

Example output:

...Starting attempt 1 of 2Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-1715:38:38.969307+00:00Started process 22544 to run task...Running command: ['/usr/bin/bash', '-c', 'echo 115746']Output:115746...Command exited with return code 0Marking task as SUCCESS. dag_id=composer_quickstart,task_id=print_dag_run_conf, execution_date=20240217T153838,start_date=20240218T153841, end_date=20240218T153841Task exited with return code 00 downstream tasks scheduled from follow-on schedule check

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete theresources used in this tutorial:

  1. Delete the Cloud Composer environment:

    1. In the Google Cloud console, go to theEnvironments page.

      Go to Environments

    2. Selectexample-environment and clickDelete.

    3. Wait until the environment is deleted.

  2. Delete your environment's bucket. Deleting the Cloud Composerenvironment does not delete its bucket.

    1. In the Google Cloud console, go to theStorage >Browser page.

      Go to Storage > Browser

    2. Select the environment's bucket and clickDelete. For example, thisbucket can be namedus-central1-example-environ-c1616fe8-bucket.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2025-12-15 UTC.