Run an LLM in a streaming pipeline

This tutorial shows how to run a large language model (LLM) in a streamingDataflow pipeline by using the Apache Beam RunInference API.

For more information about the RunInference API, seeAbout Beam ML in the Apache Beam documentation.

The example code isavailable on GitHub.

Objectives

  • Create Pub/Sub topics and subscriptions for the model's input andresponses.
  • Load the model into Cloud Storage by using a Vertex AI customjob.
  • Run the pipeline.
  • Ask the model a question and get a response.

Costs

In this document, you use the following billable components of Google Cloud Platform:

To generate a cost estimate based on your projected usage, use thepricing calculator.

New Google Cloud users might be eligible for afree trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, seeClean up.

Before you begin

Run this tutorial on a machine that has at least 5 GB of free disk spaceto install the dependencies.

  1. Sign in to your Google Cloud Platform account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.serviceAccountUser

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  13. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    Note: If you don't plan to keep the resources that you create in this procedure, create a project instead of selecting an existing project. After you finish these steps, you can delete the project, removing all resources associated with the project.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Storage, Pub/Sub, and Vertex AI APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataflow.googleapis.com compute.googleapis.com storage.googleapis.com pubsub.googleapis.com aiplatform.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloudauthapplication-defaultlogin

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/iam.serviceAccountUser

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  18. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/aiplatform.user
    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com"--role=SERVICE_ACCOUNT_ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • PROJECT_NUMBER: your project number. To find your project number, use thegcloud projects describe command.
    • SERVICE_ACCOUNT_ROLE: each individual role.
  19. Copy the Google Cloud project ID. You need this value later in this tutorial.

Create the Google Cloud Platform resources

This section explains how to create the following resources:

  • A Cloud Storage bucket to use as a temporary storage location
  • A Pub/Sub topic for the model's prompts
  • A Pub/Sub topic and subscription for the model's responses

Create a Cloud Storage bucket

Create a Cloud Storage bucket by using the gcloud CLI.This bucket is used as a temporary storage location by theDataflow pipeline.

To create the bucket, use thegcloud storage buckets create command:

gcloudstoragebucketscreategs://BUCKET_NAME--location=LOCATION

Replace the following:

  • BUCKET_NAME: a name for your Cloud Storagebucket that meets thebucket naming requirements.Cloud Storage bucket names must be globally unique.
  • LOCATION: thelocation for the bucket.

Copy the bucket name. You need this value later in this tutorial.

Create Pub/Sub topics and subscriptions

Create two Pub/Sub topics and one subscription. One topic is forthe input prompts that you send to the model. The other topic and its attachedsubscription is for the model's responses.

  1. To create the topics, run thegcloud pubsub topics create command twice, once for each topic:

    gcloudpubsubtopicscreatePROMPTS_TOPIC_IDgcloudpubsubtopicscreateRESPONSES_TOPIC_ID

    Replace the following:

    • PROMPTS_TOPIC_ID: the topic ID for the inputprompts to send to the model, such asprompts
    • RESPONSES_TOPIC_ID: the topic ID for the model'sresponses, such asresponses
  2. To create the subscription and attach it to your responses topic, use thegcloud pubsub subscriptions create command:

    gcloudpubsubsubscriptionscreateRESPONSES_SUBSCRIPTION_ID--topic=RESPONSES_TOPIC_ID

    ReplaceRESPONSES_SUBSCRIPTION_ID with thesubscription ID for the model's responses, such asresponses-subscription.

Copy the topic IDs and the subscription ID. You need these values later in thistutorial.

Prepare your environment

Download the code samples and then set up your environment to run the tutorial.

The code samples in thepython-docs-samples GitHub repository provide the code that you need to run this pipeline. When youare ready to build your own pipeline, you can use this sample code as atemplate.

You create an isolated Python virtual environment to run your pipeline projectby usingvenv.A virtual environment lets you isolate the dependencies of one project from thedependencies of other projects. For more information about how to install Pythonand create a virtual environment, seeSetting up a Python development environment.

  1. Use thegit clone command to clone the GitHub repository:

    gitclonehttps://github.com/GoogleCloudPlatform/python-docs-samples.git
  2. Navigate to therun-inference directory:

    cdpython-docs-samples/dataflow/run-inference
  3. If you're using a command prompt, check that you have Python 3 andpiprunning in your system:

    python--versionpython-mpip--version

    If required,install Python 3.

    If you're using Cloud Shell, you can skip this step becauseCloud Shell already has Python installed.

  4. Create aPython virtual environment:

    python-mvenv/tmp/envsource/tmp/env/bin/activate
  5. Install the dependencies:

    pipinstall-rrequirements.txt--no-cache-dir

Model loading code sample

The model loading code in this tutorial launches a Vertex AIcustom job that loads the model'sstate_dict object intoCloud Storage.

The starter file looks like the following:

# Copyright 2023 Google LLC## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Loads the state_dict for an LLM model into Cloud Storage."""from__future__importannotationsimportosimporttorchfromtransformersimportAutoModelForSeq2SeqLMdefrun_local(model_name:str,state_dict_path:str)->None:"""Loads the state dict and saves it into the desired path.    If the `state_dict_path` is a Cloud Storage location starting    with "gs://", this assumes Cloud Storage is mounted with    Cloud Storage FUSE in `/gcs`. Vertex AI is set up like this.    Args:        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.    """print(f"Loading model:{model_name}")model=AutoModelForSeq2SeqLM.from_pretrained(model_name,torch_dtype=torch.bfloat16)print(f"Model loaded, saving state dict to:{state_dict_path}")# Assume Cloud Storage FUSE is mounted in `/gcs`.state_dict_path=state_dict_path.replace("gs://","/gcs/")directory=os.path.dirname(state_dict_path)ifdirectoryandnotos.path.exists(directory):os.makedirs(os.path.dirname(state_dict_path),exist_ok=True)torch.save(model.state_dict(),state_dict_path)print("State dict saved successfully!")defrun_vertex_job(model_name:str,state_dict_path:str,job_name:str,project:str,bucket:str,location:str="us-central1",machine_type:str="e2-highmem-2",disk_size_gb:int=100,)->None:"""Launches a Vertex AI custom job to load the state dict.    If the model is too large to fit into memory or disk, we can launch    a Vertex AI custom job with a large enough VM for this to work.    Depending on the model's size, it might require a different VM    configuration. The model MUST fit into the VM's memory, and there    must be enough disk space to stage the entire model while it gets    copied to Cloud Storage.    Args:        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.        job_name: Job display name in the Vertex AI console.        project: Google Cloud Project ID.        bucket: Cloud Storage bucket name, without the "gs://" prefix.        location: Google Cloud regional location.        machine_type: Machine type for the VM to run the job.        disk_size_gb: Disk size in GB for the VM to run the job.    """fromgoogle.cloudimportaiplatformaiplatform.init(project=project,staging_bucket=bucket,location=location)job=aiplatform.CustomJob.from_local_script(display_name=job_name,container_uri="us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest",script_path="download_model.py",args=["local",f"--model-name={model_name}",f"--state-dict-path={state_dict_path}",],machine_type=machine_type,boot_disk_size_gb=disk_size_gb,requirements=["transformers"],)job.run()if__name__=="__main__":importargparseparser=argparse.ArgumentParser()subparsers=parser.add_subparsers(required=True)parser_local=subparsers.add_parser("local")parser_local.add_argument("--model-name",required=True,help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",)parser_local.add_argument("--state-dict-path",required=True,help="File path to the model's state_dict, can be in Cloud Storage",)parser_local.set_defaults(run=run_local)parser_vertex=subparsers.add_parser("vertex")parser_vertex.add_argument("--model-name",required=True,help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",)parser_vertex.add_argument("--state-dict-path",required=True,help="File path to the model's state_dict, can be in Cloud Storage",)parser_vertex.add_argument("--job-name",required=True,help="Job display name in the Vertex AI console")parser_vertex.add_argument("--project",required=True,help="Google Cloud Project ID")parser_vertex.add_argument("--bucket",required=True,help='Cloud Storage bucket name, without the "gs://" prefix',)parser_vertex.add_argument("--location",default="us-central1",help="Google Cloud regional location")parser_vertex.add_argument("--machine-type",default="e2-highmem-2",help="Machine type for the VM to run the job",)parser_vertex.add_argument("--disk-size-gb",type=int,default=100,help="Disk size in GB for the VM to run the job",)parser_vertex.set_defaults(run=run_vertex_job)args=parser.parse_args()kwargs=args.__dict__.copy()kwargs.pop("run")args.run(**kwargs)

Pipeline code sample

The pipeline code in this tutorial deploys a Dataflow pipelinethat does the following things:

  • Reads a prompt from Pub/Sub and encodes the text intotoken tensors.
  • Runs theRunInference transform.
  • Decodes the output token tensors into text and writes the response toPub/Sub.

The starter file looks like the following:

# Copyright 2023 Google LLC## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Runs a streaming RunInference Language Model pipeline."""from__future__importannotationsimportloggingimportapache_beamasbeamfromapache_beam.ml.inference.baseimportPredictionResultfromapache_beam.ml.inference.baseimportRunInferencefromapache_beam.ml.inference.pytorch_inferenceimportmake_tensor_model_fnfromapache_beam.ml.inference.pytorch_inferenceimportPytorchModelHandlerTensorfromapache_beam.options.pipeline_optionsimportPipelineOptionsimporttorchfromtransformersimportAutoConfigfromtransformersimportAutoModelForSeq2SeqLMfromtransformersimportAutoTokenizerfromtransformers.tokenization_utilsimportPreTrainedTokenizerMAX_RESPONSE_TOKENS=256defto_tensors(input_text:str,tokenizer:PreTrainedTokenizer)->torch.Tensor:"""Encodes input text into token tensors.    Args:        input_text: Input text for the language model.        tokenizer: Tokenizer for the language model.    Returns: Tokenized input tokens.    """returntokenizer(input_text,return_tensors="pt").input_ids[0]defdecode_response(result:PredictionResult,tokenizer:PreTrainedTokenizer)->str:"""Decodes output token tensors into text.    Args:        result: Prediction results from the RunInference transform.        tokenizer: Tokenizer for the language model.    Returns: The model's response as text.    """output_tokens=result.inferencereturntokenizer.decode(output_tokens,skip_special_tokens=True)classAskModel(beam.PTransform):"""Asks an language model a prompt message and gets its responses.    Attributes:        model_name: HuggingFace model name compatible with AutoModelForSeq2SeqLM.        state_dict_path: File path to the model's state_dict, can be in Cloud Storage.        max_response_tokens: Maximum number of tokens for the model to generate.    """def__init__(self,model_name:str,state_dict_path:str,max_response_tokens:int=MAX_RESPONSE_TOKENS,)->None:self.model_handler=PytorchModelHandlerTensor(state_dict_path=state_dict_path,model_class=AutoModelForSeq2SeqLM.from_config,model_params={"config":AutoConfig.from_pretrained(model_name)},inference_fn=make_tensor_model_fn("generate"),)self.tokenizer=AutoTokenizer.from_pretrained(model_name)self.max_response_tokens=max_response_tokensdefexpand(self,pcollection:beam.PCollection[str])->beam.PCollection[str]:return(pcollection|"To tensors" >>beam.Map(to_tensors,self.tokenizer)|"RunInference"            >>RunInference(self.model_handler,inference_args={"max_new_tokens":self.max_response_tokens},)|"Get response" >>beam.Map(decode_response,self.tokenizer))if__name__=="__main__":importargparseparser=argparse.ArgumentParser()parser.add_argument("--messages-topic",required=True,help="Pub/Sub topic for input text messages",)parser.add_argument("--responses-topic",required=True,help="Pub/Sub topic for output text responses",)parser.add_argument("--model-name",required=True,help="HuggingFace model name compatible with AutoModelForSeq2SeqLM",)parser.add_argument("--state-dict-path",required=True,help="File path to the model's state_dict, can be in Cloud Storage",)args,beam_args=parser.parse_known_args()logging.getLogger().setLevel(logging.INFO)beam_options=PipelineOptions(beam_args,pickle_library="cloudpickle",streaming=True,)simple_name=args.model_name.split("/")[-1]pipeline=beam.Pipeline(options=beam_options)_=(pipeline|"Read from Pub/Sub" >>beam.io.ReadFromPubSub(args.messages_topic)|"Decode bytes" >>beam.Map(lambdamsg:msg.decode("utf-8"))|f"Ask{simple_name}" >>AskModel(args.model_name,args.state_dict_path)|"Encode bytes" >>beam.Map(lambdamsg:msg.encode("utf-8"))|"Write to Pub/Sub" >>beam.io.WriteToPubSub(args.responses_topic))pipeline.run()

Load the model

LLMs can be very large models. Larger models that are trained with moreparameters generally give better results. However, larger models require abigger machine and more memory to run. Larger models can also be slowerto run on CPUs.

Before you run a PyTorch model on Dataflow, you need to load themodel'sstate_dict object. A model'sstate_dict object stores the weights for the model.

In a Dataflow pipeline that uses the Apache BeamRunInference transform, the model'sstate_dict object must be loaded toCloud Storage. The machine that you use to load thestate_dict objectto Cloud Storage needs to have enough memory to load the model. Themachine also needs a fast internet connection to download the weights and toupload them to Cloud Storage.

The following table shows the number of parameters for each model and theminimum memory that's needed to load each model.

ModelParametersMemory needed
google/flan-t5-small80 million> 320 MB
google/flan-t5-base250 million> 1 GB
google/flan-t5-large780 million> 3.2 GB
google/flan-t5-xl3 billion> 12 GB
google/flan-t5-xxl11 billion> 44 GB
google/flan-ul220 billion> 80 GB

Although you can load a smaller model locally, this tutorial shows how to launcha Vertex AI custom job that loads the model with an appropriatelysized VM.

Because LLMs can be so large, the example in this tutorial saves thestate_dict object asfloat16 format instead of the defaultfloat32 format.With this configuration, each parameter uses 16 bits instead of 32 bits, makingthestate_dict object half the size. A smaller size minimizes the time that'sneeded to load the model. However, converting the format means that the VM hasto fit both the model and thestate_dict object into memory.

The following table shows the minimum requirements to load a model after thestate_dict object is saved asfloat16 format. The table also shows thesuggested machine types to load a model by using Vertex AI. Theminimum (and default) disk size for Vertex AI is 100 GB, but somemodels might require a larger disk.

Model nameMemory neededMachine typeVM memoryVM disk
google/flan-t5-small> 480 MBe2-standard-416 GB100 GB
google/flan-t5-base> 1.5 GBe2-standard-416 GB100 GB
google/flan-t5-large> 4.8 GBe2-standard-416 GB100 GB
google/flan-t5-xl> 18 GBe2-highmem-432 GB100 GB
google/flan-t5-xxl> 66 GBe2-highmem-16128 GB100 GB
google/flan-ul2> 120 GBe2-highmem-16128 GB150 GB

Load the model'sstate_dict object into Cloud Storage by using aVertex AI custom job:

pythondownload_model.pyvertex\--model-name="MODEL_NAME"\--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt"\--job-name="LoadMODEL_NAME"\--project="PROJECT_ID"\--bucket="BUCKET_NAME"\--location="LOCATION"\--machine-type="VERTEX_AI_MACHINE_TYPE"\--disk-size-gb="DISK_SIZE_GB"

Replace the following:

  • MODEL_NAME: the name of the model, such asgoogle/flan-t5-xl.
  • VERTEX_AI_MACHINE_TYPE: the type of machine to runthe Vertex AI custom job on, such ase2-highmem-4.
  • DISK_SIZE_GB: the disk size for the VM, in GB. Theminimum size is 100 GB.

Depending on the size of the model, it might take a few minutes to load themodel. To view the status, go to the Vertex AICustom jobspage.

Go to Custom jobs

Run the pipeline

After you load the model, you run the Dataflow pipeline. To runthe pipeline, both the model and the memory used by each worker must fit intomemory.

The following table shows the recommended machine types to run an inferencepipeline.

Model nameMachine typeVM memory
google/flan-t5-smalln2-highmem-216 GB
google/flan-t5-basen2-highmem-216 GB
google/flan-t5-largen2-highmem-432 GB
google/flan-t5-xln2-highmem-432 GB
google/flan-t5-xxln2-highmem-864 GB
google/flan-ul2n2-highmem-16128 GB

Run the pipeline:

pythonmain.py\--messages-topic="projects/PROJECT_ID/topics/PROMPTS_TOPIC_ID"\--responses-topic="projects/PROJECT_ID/topics/RESPONSES_TOPIC_ID"\--model-name="MODEL_NAME"\--state-dict-path="gs://BUCKET_NAME/run-inference/MODEL_NAME.pt"\--runner="DataflowRunner"\--project="PROJECT_ID"\--temp_location="gs://BUCKET_NAME/temp"\--region="REGION"\--machine_type="DATAFLOW_MACHINE_TYPE"\--requirements_file="requirements.txt"\--requirements_cache="skip"\--experiments="use_sibling_sdk_workers"\--experiments="no_use_multiple_sdk_containers"

Replace the following:

  • PROJECT_ID: the project ID
  • PROMPTS_TOPIC_ID: the topic ID for the inputprompts to send to the model
  • RESPONSES_TOPIC_ID: the topic ID for the model'sresponses
  • MODEL_NAME: the name of the model, such asgoogle/flan-t5-xl
  • BUCKET_NAME: the name of the bucket
  • REGION: the region to deploy thejob in, such asus-central1
  • DATAFLOW_MACHINE_TYPE: the VM to run the pipelineon, such asn2-highmem-4

To ensure that the model is loaded only once per worker and doesn't run out ofmemory, you configure workers to use a single process by setting the pipelineoption--experiments=no_use_multiple_sdk_containers. You don't have to limitthe number of threads because theRunInference transform shares the same modelwith multiple threads.

The pipeline in this example runs with CPUs. For a larger model, more time isrequired to process each request. You canenable GPUs if you need faster responses.

To view the status of the pipeline, go to the DataflowJobspage.

Go to Jobs

Ask the model a question

After the pipeline starts running, you provide a prompt to the model andreceive a response.

  1. Send your prompt by publishing a message to Pub/Sub. Use thegcloud pubsub topics publish command:

    gcloudpubsubtopicspublishPROMPTS_TOPIC_ID\--message="PROMPT_TEXT"

    ReplacePROMPT_TEXT with a string that contains theprompt that you want to provide. Surround the prompt with quotation marks.

    Use your own prompt, or try one of the following examples:

    • Translate to Spanish: My name is Luka
    • Complete this sentence: Once upon a time, there was a
    • Summarize the following text: Dataflow is a Google Cloudservice that provides unified stream and batch data processing at scale. UseDataflow to create data pipelines that read from one or moresources, transform the data, and write the data to a destination.
  2. To get the response, use thegcloud pubsub subscriptions pull command.

    Depending on the size of the model, it might take a few minutes for themodel to generate a response. Larger models take longer to deploy and togenerate a response.

    gcloudpubsubsubscriptionspullRESPONSES_SUBSCRIPTION_ID--auto-ack

    ReplaceRESPONSES_SUBSCRIPTION_ID with thesubscription ID for the model's responses.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

    Caution: Deleting a project has the following effects:
    • Everything in the project is deleted. If you used an existing project for the tasks in this document, when you delete it, you also delete any other work you've done in the project.
    • Custom project IDs are lost. When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as anappspot.com URL, delete selected resources inside the project instead of deleting the whole project.

    If you plan to explore multiple architectures, tutorials, or quickstarts, reusing projects can help you avoid exceeding project quota limits.

    Delete a Google Cloud project:

    gcloud projects deletePROJECT_ID

Delete individual resources

  1. Exit the Python virtual environment:

    deactivate
  2. Stop the pipeline:

    1. List the job IDs for the Dataflow jobs that are running, and then note the job ID for the tutorial's job:

      gclouddataflowjobslist--region=REGION--status=active
    2. Cancel the job:

      gclouddataflowjobscancelJOB_ID--region=REGION
  3. Delete the bucket and anything inside of it:

    gcloudstoragermgs://BUCKET_NAME--recursive
  4. Delete the topics and the subscription:

    gcloudpubsubtopicsdeletePROMPTS_TOPIC_IDgcloudpubsubtopicsdeleteRESPONSES_TOPIC_IDgcloudpubsubsubscriptionsdeleteRESPONSES_SUBSCRIPTION_ID
  5. Revoke the roles that you granted to the Compute Engine default service account. Run the following command once for each of the following IAM roles:

    gcloudprojectsremove-iam-policy-bindingPROJECT_ID--member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com--role=SERVICE_ACCOUNT_ROLE
  6. Optional: Revoke roles from your Google Account.

    gcloudprojectsremove-iam-policy-bindingPROJECT_ID--member="user:EMAIL_ADDRESS"--role=roles/iam.serviceAccountUser
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloudauthapplication-defaultrevoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloudauthrevoke

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.