Use Dataflow Insights

You can use Dataflow Insights to help optimize job performance.This topic demonstrates how to interact with Dataflow Insightsusinggcloud or the REST API. You can also review Insights in theDataflow Console. For more information on reviewing Insights inthe Console, seeRecommendations.

Overview

Dataflow Insights providesinsights on improving jobperformance, reducing cost, and troubleshooting errors. Dataflow Insights ispart of theRecommender service and is availablethrough thegoogle.dataflow.diagnostics.Insight type.

When you're working with Dataflow Insights, keep in mind that somerecommendations might not be relevant to your use case.

Before you begin

Before you can begin using Dataflow Insights, you must completethe following steps.

  1. Enable the Recommender API.
  2. Set up authentication.

    Select the tab for how you plan to use the samples on this page:

    gcloud

    In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, aCloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    REST

    To use the REST API samples on this page in a local development environment, you use the credentials you provide to the gcloud CLI.

      Install the Google Cloud CLI. After installation,initialize the Google Cloud CLI by running the following command:

      gcloudinit

      If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

    For more information, seeAuthenticate for using REST in the Google Cloud authentication documentation.

  3. Ensure that your account has the following permissions:

    • recommender.dataflowDiagnosticsInsights.get
    • recommender.dataflowDiagnosticsInsights.list
    • recommender.dataflowDiagnosticsInsights.update

    You can grant these permissions individually, or you can grant one of thefollowing roles:

    • roles/recommender.dataflowDiagnosticsViewer
    • roles/recommender.dataflowDiagnosticsAdmin
    • roles/dataflow.viewer
    • roles/dataflow.developer
    • roles/dataflow.admin

Request Dataflow insights

You can list Dataflow insights as shown below. For other types ofinsights interactions, seethe insights guide for the Recommender API.

List Dataflow insights

To list all Dataflow insights for your project in a given region,use one of the following methods:

gcloud

You can use thegcloud recommender insights listcommand to view all Dataflow insights for your project in aspecified region.

Before you run the command, replace the following values:

  • PROJECT_ID: The ID of the project that you want to list insights for.
  • REGION: The region where your Dataflow jobs are running. For example:us-west1.
gcloud recommender insights list --insight-type=google.dataflow.diagnostics.Insight \  --project=PROJECT_ID \  --location=REGION

The output lists all of the Dataflow insights for your projectin the specified region.

REST

You can use Recommender API'sinsights.listmethod to list all Dataflow insights for your project in aspecified region.

Before using any of the request data, make the following replacements:

  • PROJECT_ID: The ID of the project that you want to list insights for.
  • REGION: The region where your Dataflow jobs are running. For example:us-west1.

HTTP method and URL:

GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights

To send your request using curl (Linux, macOS, or Cloud Shell), run thefollowing command:

curl -X GET \  -H "Authorization: Bearer "$(gcloud auth print-access-token) \  "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights"

Get a single Dataflow insight

To get more information about a single insight, including the insight'sdescription, status, and any recommendations associated with it, use one of thefollowing methods:

gcloud

Use thegcloud recommender insights describecommand with your insight ID to view information about a single insight.Before you run the command, replace the following values:

  • INSIGHT_ID: The ID of the insight that you want to view.
  • PROJECT_ID: The ID of the project that you want to list insights for.
  • REGION: The region where your Dataflow jobs are running. For example:us-west1.
gcloud recommender insights describeINSIGHT_ID \  --insight-type=google.dataflow.diagnostics.Insight \  --project=PROJECT_ID \  --location=REGION

The output shows the insight in detail.

REST

The Recommender API'sinsights.getmethod gets a single insight. Before using any of the request data, make thefollowing replacements:

  • PROJECT_ID: The ID of the project that you want to list insights for.
  • REGION: The region where your Dataflow jobs are running. For example:us-west1.
  • INSIGHT_ID: The ID of the insight that you want to view.

HTTP method and URL:

GET https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID

To send your request using curl (Linux, macOS, or Cloud Shell), run thefollowing command:

curl -X GET \  -H "Authorization: Bearer "$(gcloud auth print-access-token) \  "https://recommender.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/insightTypes/google.dataflow.diagnostics.Insight/insights/INSIGHT_ID"

Interpret Dataflow insights

After you get an insight, you can review its contents to understand the patternof resource usage that it highlights. In addition to thestandard insight attributes,Dataflow Insights provides the following subtypes:

  • AUTOSCALING_NOT_ENABLED:Autoscaling could beenabled. The job has high CPU utilization and is using the maximum number ofworkers set. Enabling autoscaling could improve performance.
  • HIGH_FAN_OUT: Afusion breakcould be inserted after one or more transforms to increase parallelism.
  • MAX_NUM_WORKERS:Autoscaling:The maximum number of workers could be increased. The job is usingautoscaling, has high CPU utilization, and is using the maximum number ofworkers set. Increasing the maximum number of workers could improveperformance.
  • WORKER_OUT_OF_MEMORY: Some of the workers for the job failed due to runningout of memory, which could slow down the job or cause it to fail.
  • PREBUILD_NOT_UTILIZED: Use the worker imagepre-building workflowto improve worker startup time and autoscaling reliability.
  • ACTIVE_KEYS (Preview):Total number of active keys are less than total number of cores and scaling up won't help.
  • LONG_WORK_ITEM: Work in a fused stage is taking too long to process,indicating a slow-running or stuck operation.

To learn more about how to mitigate problems identified by Dataflow Insights,seeInsights.

Dataflow Insights also provides a specialcontent field that containssubfields with additional information and metadata about an insight. Dependingon your use case, the followingcontent subfields might be useful:

  • jobName: The Dataflow job name.
  • description: A description of the insight in English.
  • title: The title of the insight in English.

Insights

High fan-out detected

When Dataflow detects that a job has one or more transforms with a high fan-out,the following message appears:

High fan-out detected

This message displays when a ParDo that has a high output-to-input element count ratio isfused with a subsequent ParDo. In this situation, the second ParDo runssequentially with the first, which forces all the output elements of a giveninput onto the same worker and reduces parallelism and slowing down performance.

To resolve this issue:

  • Insert aGroupByKey and ungroup after your first ParDo. The Dataflow servicenever fuses ParDo operations across an aggregation. For more information, seeFusion optimization
  • Pass the intermediate PCollection as a side input to another ParDo. TheDataflow service always materializes side inputs.
  • Insert a Reshuffle step. Reshuffle prevents fusion, checkpoints the data, andreconfigures the windowing strategy so that no data is dropped. Reshuffle issupported by Dataflow even though it is marked deprecated in the Apache Beamdocumentation (note that reshuffling data can increase the cost of running yourpipeline).

Autoscaling: Maximum number of workers could be increased

When Dataflow detects that a job is using the maximum number of allowed workers,maxNumWorkers(ormax_num_workers), and that the job might use more workers if this maximum was increased,the following message appears:

maximum number of workers could be increased

For example, this recommendation occurs for a batch or streaming job that hasmaxNumWorkersset to 50 when all 50 workers are being used with an average worker CPU utilization above 80%.This recommendation also occurs for streaming jobs that havemaxNumWorkers set to 50 when all 50 workers arebeing used with an average worker CPU utilization above 50% and the job has an estimated processing time over 2 minutes.

Typically, increasingmaxNumWorkers increases pipeline throughput.A batch pipeline could complete in less time, and a streaming pipeline couldhandle larger spikes in data and process more elements per second.However, this might come at an increased cost. For more information, seeWorker resources pricing.For details about how the Autoscaling algorithm works and how to configure it, see theAutoscaling guide.

To resolve this issue:

  • Increase or remove themaxNumWorkers pipeline option.Without the option, Dataflow uses the defaults listed in theAutoscaling guide.
  • It's okay to do nothing if pipeline performance is adequate.
    • For batch pipelines, check that the total running time meets your requirements.
    • For streaming pipelines, check theData freshness graph on theJob Metrics tab of the job page.Verify that the values in the graph aren't continuously increasing and that they are within acceptable bounds.

Autoscaling: Setting the initial number of workers could improve the job performance

When Dataflow detects that a job is using a certain number of workers for more than 50% of the running time,setting the initial number of workers to the recommended value could improve the job performance byreducing the running time for batch jobs or preventing the backlog from growing when updating a streaming job.

Workers are failing with OutOfMemory errors

When Dataflow detects that workers for a job are failing because of Out of Memory errors,the following message appears:

Some workers are out of memory

Some workers for the job have failed due to being out of memory. Although it is possiblethe job will finish, it is also possible these errors will prevent the job fromcompleting successfully or otherwise slow down performance.

Try the following suggestions:

Pre-build workflow not utilized

When Dataflow detects a pipeline where the worker image pre-building workflow is not used, the following message appears:

pre-build workflow not utilized

When the worker image pre-building workflow is not used, the pipeline has dependencies that are repetitively installed at runtime. This configuration slows worker startup time, which degrades the throughput of the job and causes unreliable autoscaling behavior.

To resolve this issue, use the worker image pre-building workflow when launching the pipeline. For more information, seePre-build Python dependencies.

If a customized, pre-built container is already in use, to avoid unnecessary installations,add the '--sdk_location=container' option, and remove the following options:

  • '--setup_file'
  • '--requirements_file'
  • '--extra_package(s)'

Active keys are low

Preview

This product or feature is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of theService Specific Terms. Pre-GA products and features are available "as is" and might have limited support. For more information, see thelaunch stage descriptions.

When Dataflow detects that a job is falling behind because the number of active keysis less than the total number of cores and scaling up won't help, the following message appears:

Active keys can be increased

To run user code in jobs, Dataflow uses workers. Each thread maps to a key that is responsible for a set of data to process and a key can only be run on a single core at a time for correctness reasons.

In some cases, some cores are overworked while others are idle. To resolve this issue, increase the number of keys, which also increases the number of active threads.

Potential solutions to increase keys: - You can increase the number of keys by using a more specific key type. For example, if the key type isIP address, fewer keys are available. However, if you change the key type toIP + [user identifier], more keys are available, which increases parallelism. - For pipelines that write to BigQuery where sinks could potentially be the bottleneck, refer tothis article. - For other sources/sinks, check to see if it has have anumShards parameter and increase it. In general one shard maps to one key. - For more general guidance on our execution model, refer tothis article. - Fanout can be used to take a single input key and add a hash to it to produce multiple output keys.Reference

Stage spending too long on work

When Dataflow detects that work has frequently taken too long tofinish processing, the following message appears:

Stage spending too long on work

Dataflow sends work to fused stages in bundles of elements to beprocessed, and each bundle is considered complete once all elements and theiroutputs have been processed for the stage. Streaming pipelines are optimizedaround bundles of work that take less than a minute to fully process, so longprocessing times can cause further performance issues in pipelines.

This issue can be caused by user transforms that are stuck or slow. Thesetransforms can be identified by warnings emitted in Cloud Logging and itsDiagnostics tab, with the key phrases "Operation ongoing" or "Processing stuck".To diagnose whether this issue is caused by a user transform, useCloud Profiler to inspect the performance of user transforms. Then tracewhat code is causing the slowdown and how frequently. For moreinformation, seeTroubleshooting Common Dataflow Errors.

If investigating reveals that the long processing times are not caused by usertransforms, then we recommend contacting Cloud Support and describing the stepstaken to investigate.

Job stuck on work item

When Dataflow detects that a key is stuck because a singlework item has repeatedly failed and then been retried, the following messageappears:

Job is stuck due to failed and retried work item

In Dataflow, all messages in a pipeline are processed undera particular key. When an error occurs while processing a message, that messageis retried. It's acceptable if a message is retriedtwo or three times. However, if errors occur over and over again, such asten times in a row, it usually indicates a fundamental problem with thepipeline code. When a particular message on a key gets retried, other messagesunder the same key can't make progress. If a message fails 10 or more times,the issue will likely not resolve on its own. This message failure can causepipeline problems such as:

  • delaying the watermark
  • accruing backlog
  • preventing a drain operation from completing

To debug this issue, investigate the stage that the recommendation reported andreview the logs to identify the problematic code. Then, update the job with thenew pipeline code to get the job unstuck.

Streaming Engine not enabled

When Dataflow detects that a streaming job doesn't haveStreaming Engine enabled,the following message appears:

This job isn't using Streaming Engine. It might benefit from having Streaming Engine enabled.

Using Streaming Engine has various potential benefits, includingbetter horizontal autoscaling, improved supportability, andreduced CPU, memory, and Persistent Disk storage resource usageon the worker VMs. Streaming Engine also supportsresource-based billing.

Kafka partitions low

When Dataflow detects that a streaming job reads from Kafka andthe read stage keys are identified as a bottleneck, the following messageappears:

The partition count provided by the Kafka source is too low, consider increasingpartitions or redistributing to more keys.

We recommended that you increase the number of partitions, or include aRedistribute transformwithRedistribute() and consider offset deduplicationmodewithOffsetDeduplication(). For additional information about Kafka Readbest practices for parallelism, seeRead from Kafka.

Kafka persistence cost

When Dataflow detects that a streaming job reads from Kafka withRedistribute and the cost of persisting outputs is high, the following messageappears:

Redistribute requires persistence of Kafka outputs as part of the shuffle phase,consider reducing latency and cost with offset deduplication mode.

To minimize the latency and cost of the shuffle, use offset deduplication modewithOffsetDeduplication(). For additional information about Kafka Read bestpractices for parallelism, seeRead from Kafka.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.