Use Apache Beam notebook advanced features

Using the Apache Beam interactive runner with JupyterLab notebooks lets youiteratively develop pipelines, inspect your pipeline graph, and parse individualPCollections in a read-eval-print-loop (REPL) workflow. For a tutorialthat demonstrates how to use the Apache Beam interactive runner withJupyterLab notebooks, seeDevelop with Apache Beam notebooks.

This page provides details about advanced features that you can use with yourApache Beam notebook.

Interactive FlinkRunner on notebook-managed clusters

To work with production-sized data interactively from the notebook, you can usetheFlinkRunner with some generic pipeline options to tell the notebooksession to manage a long-lasting Dataproc cluster and to run yourApache Beam pipelines distributedly.

Prerequisites

To use this feature:

  • Enable the Dataproc API.
  • Grant an admin or editor role to the service account that runs the notebookinstance for Dataproc.
  • Use a notebook kernel with the Apache Beam SDK version 2.40.0 or later.

Configuration

At a minimum, you need the following setup:

# Set a Cloud Storage bucket to cache source recording and PCollections.# By default, the cache is on the notebook instance itself, but that does not# apply to the distributed execution scenario.ib.options.cache_root='gs://<BUCKET_NAME>/flink'# Define an InteractiveRunner that uses the FlinkRunner under the hood.interactive_flink_runner=InteractiveRunner(underlying_runner=FlinkRunner())options=PipelineOptions()# Instruct the notebook that Google Cloud is used to run the FlinkRunner.cloud_options=options.view_as(GoogleCloudOptions)cloud_options.project='PROJECT_ID'

Explicit provision (optional)

You can add the following options.

# Change this if the pipeline needs to run in a different region# than the default, 'us-central1'. For example, to set it to 'us-west1':cloud_options.region='us-west1'# Explicitly provision the notebook-managed cluster.worker_options=options.view_as(WorkerOptions)# Provision 40 workers to run the pipeline.worker_options.num_workers=40# Use the default subnetwork.worker_options.subnetwork='default'# Choose the machine type for the workers.worker_options.machine_type='n1-highmem-8'# When working with non-official Apache Beam releases, such as Apache Beam built from source# code, configure the environment to use a compatible released SDK container.# If needed, build a custom container and use it. For more information, see:# https://beam.apache.org/documentation/runtime/environments/options.view_as(PortableOptions).environment_config='apache/beam_python3.7_sdk:2.41.0 orLOCATION.pkg.dev/PROJECT_ID/REPOSITORY/your_custom_container'

Usage

# The parallelism is applied to each step, so if your pipeline has 10 steps, you# end up having 10 * 10 = 100 tasks scheduled, which can be run in parallel.options.view_as(FlinkRunnerOptions).parallelism=10p_word_count=beam.Pipeline(interactive_flink_runner,options=options)word_counts=(p_word_count|'read' >>ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')|'count' >>beam.combiners.Count.PerElement())# The notebook session automatically starts and manages a cluster to run# your pipelines with the FlinkRunner.ib.show(word_counts)# Interactively adjust the parallelism.options.view_as(FlinkRunnerOptions).parallelism=150# The BigQuery read needs a Cloud Storage bucket as a temporary location.options.view_as(GoogleCloudOptions).temp_location=ib.options.cache_rootp_bq=beam.Pipeline(runner=interactive_flink_runner,options=options)delays_by_airline=(p_bq|'Read Dataset from BigQuery' >>beam.io.ReadFromBigQuery(project=project,use_standard_sql=True,query=('SELECT airline, arrival_delay ''FROM `bigquery-samples.airline_ontime_data.flights` ''WHERE date >= "2010-01-01"'))|'Rebalance Data to TM Slots' >>beam.Reshuffle(num_buckets=1000)|'Extract Delay Info' >>beam.Map(lambdae:(e['airline'],e['arrival_delay'] >0))|'Filter Delayed' >>beam.Filter(lambdae:e[1])|'Count Delayed Flights Per Airline' >>beam.combiners.Count.PerKey())# This step reuses the existing cluster.ib.collect(delays_by_airline)# Describe the cluster running the pipelines.# You can access the Flink dashboard from the printed link.ib.clusters.describe()# Cleans up all long-lasting clusters managed by the notebook session.ib.clusters.cleanup(force=True)

Notebook-managed clusters

  • By default, if you don't provide any pipeline options, InteractiveApache Beamalways reuses the most recently used clusterto run a pipeline with theFlinkRunner.
    • To avoid this behavior, for example, to run another pipeline in the samenotebook session with a FlinkRunner not hosted by the notebook, runib.clusters.set_default_cluster(None).
  • When instantiating a new pipeline that uses a project, region, andprovisioning configuration that map to an existing Dataproc cluster,Dataflow also reuses the cluster, though it might not use the mostrecently used cluster.
  • However, whenever a provisioning change is given, such as when resizing acluster, anew cluster is created to actuate the desired change. If you intend to resizea cluster, to avoid exhausting cloud resources, clean up unnecessaryclusters by usingib.clusters.cleanup(pipeline).
  • When a Flinkmaster_url is specified, if it belongs to a cluster that ismanaged by the notebook session, Dataflow reuses the managed cluster.
    • If themaster_url is unknown to the notebook session, it means that auser-self-hostedFlinkRunner is desired. The notebook doesn't do anythingimplicitly.

Troubleshooting

This section provides information to help you troubleshoot and debug theInteractive FlinkRunner on notebook-managed clusters.

Flink IOException: Insufficient number of network buffers

For simplicity, the Flink network buffer configuration is not exposed forconfiguration.

If your job graph is too complicated or your parallelism is set too high, thecardinality of steps multiplied by parallelism might be too big, cause too manytasks to be scheduled in parallel, and fail the execution.

Use the following tips to improve the velocity of interactive runs:

  • Only assign thePCollection that you want to inspect to a variable.
  • InspectPCollections one by one.
  • Use reshuffle after high fanout transforms.
  • Adjust parallelism based on the data size. Sometimes smaller is faster.

It takes too long to inspect the data

Check the Flink dashboard for the running job. You might see a step wherehundreds of tasks have finished and only one remains, because in-flight dataresides on a single machine and is not shuffled.

Always use reshuffle after a high fanout transform, such as when:

  • Reading rows from a file
  • Reading rows from a BigQuery table

Without reshuffle, fanout data is always run on the same worker,and you can't take advantage of parallelism.

How many workers do I need?

As a rule of thumb, the Flink cluster has about the number of vCPUs multipliedby the number of worker slots.For example, if you have 40 n1-highmem-8 workers, the Flink cluster has at most320 slots, or 8 multiplied by 40.

Ideally, the worker can manage a job that reads, maps, and combines withparallelism set in the hundreds, which schedules thousands of tasks in parallel.

Does it work with streaming?

Streaming pipelines are not currently compatible with the interactive Flink onnotebook-managed cluster feature.

Beam SQL andbeam_sql magic

Beam SQL allows youto query bounded and unboundedPCollections with SQL statements. If you'reworking in an Apache Beam notebook, you can use the IPythoncustom magicbeam_sql to speed up your pipeline development.

You can check thebeam_sql magic usage with the-h or--help option:

Check the beam_sql help

You can create aPCollection from constant values:

Create PCollection from constant values

You can join multiplePCollections:

Join multiple PCollections

You can launch a Dataflow job with the-r DataflowRunner or--runner DataflowRunner option:

Launch Dataflow job with Apache Beam SQL

To learn more, see the example notebookApache Beam SQL in notebooks.

Accelerate using JIT compiler and GPU

You can use libraries such asnumba andGPUs to accelerate your Python code andApache Beam pipelines. In the Apache Beam notebook instance created withannvidia-tesla-t4 GPU, to run on GPUs, compile your Python code withnumba.cuda.jit. Optionally, to speed up the execution on CPUs, compile yourPython code into machine code withnumba.jit ornumba.njit.

The following example creates aDoFn that processes on GPUs:

classSampler(beam.DoFn):def__init__(self,blocks=80,threads_per_block=64):# Uses only 1 cuda grid with below config.self.blocks=blocksself.threads_per_block=threads_per_blockdefsetup(self):importnumpyasnp# An array on host as the prototype of arrays on GPU to# hold accumulated sub count of points in the circle.self.h_acc=np.zeros(self.threads_per_block*self.blocks,dtype=np.float32)defprocess(self,element:Tuple[int,int]):fromnumbaimportcudafromnumba.cuda.randomimportcreate_xoroshiro128p_statesfromnumba.cuda.randomimportxoroshiro128p_uniform_float32@cuda.jitdefgpu_monte_carlo_pi_sampler(rng_states,sub_sample_size,acc):"""Uses GPU to sample random values and accumulates the sub count            of values within a circle of radius 1.            """pos=cuda.grid(1)ifpos <acc.shape[0]:sub_acc=0foriinrange(sub_sample_size):x=xoroshiro128p_uniform_float32(rng_states,pos)y=xoroshiro128p_uniform_float32(rng_states,pos)if(x*x+y*y) <=1.0:sub_acc+=1acc[pos]=sub_accrng_seed,sample_size=elementd_acc=cuda.to_device(self.h_acc)sample_size_per_thread=sample_size//self.h_acc.shape[0]rng_states=create_xoroshiro128p_states(self.h_acc.shape[0],seed=rng_seed)gpu_monte_carlo_pi_sampler[self.blocks,self.threads_per_block](rng_states,sample_size_per_thread,d_acc)yieldd_acc.copy_to_host()

The following image demonstrates the notebook running on a GPU:

Run DoFn on GPU

More details can be found in the example notebookUse GPUs with Apache Beam.

Build a custom container

In most cases, if your pipeline doesn't require additional Python dependenciesor executables, Apache Beam can automatically use its official containerimages to run your user-defined code. These images come with many commonPython modules, and you don't have to build or explicitly specify them.

In some cases, you might have extra Python dependencies or evennon-Python dependencies. In these scenarios, you can build a custom containerand make it available to the Flink cluster to run. The following list providesthe advantages of using a custom container:

  • Faster setup time for consecutive and interactive executions
  • Stable configurations and dependencies
  • More flexibility: you can set up more than Python dependencies

The container build process might be tedious, but you can do everythingin the notebook using the following usage pattern.

Create a local workspace

First, create a local work directory under the Jupyter home directory.

!mkdir-p/home/jupyter/.flink

Prepare Python dependencies

Next, install all the extra Python dependencies that you might use, and exportthem into a requirements file.

%pipinstalldep_a%pipinstalldep_b...

You can explicitly create a requirements file by using the%%writefilenotebook magic.

%%writefile/home/jupyter/.flink/requirements.txtdep_adep_b...

Alternatively, you can freeze all local dependencies into a requirements file.This option might introduce unintended dependencies.

%pipfreeze >/home/jupyter/.flink/requirements.txt

Prepare your non-Python dependencies

Copy all non-Python dependencies into the workspace. If you don't have anynon-Python dependencies, skip this step.

!cp/path/to/your-dep/home/jupyter/.flink/your-dep...

Create a Dockerfile

Create a Dockerfile with the%%writefile notebook magic. For example:

%%writefile/home/jupyter/.flink/DockerfileFROMapache/beam_python3.7_sdk:2.40.0COPYrequirements.txt/tmp/requirements.txtCOPYyour_dep/tmp/your_dep...RUNpython-mpipinstall-r/tmp/requirements.txt

The example container uses the image of the Apache Beam SDK version 2.40.0with Python 3.7 as the base,adds ayour_dep file, and installs the extra Python dependencies.Use this Dockerfile as a template, and edit it for your use case.

In your Apache Beam pipelines, when referring to non-Python dependencies, use theirCOPYdestinations. For example,/tmp/your_dep is the file path of theyour_dep file.

Build a container image in Artifact Registry by using Cloud Build

  1. Enable the Cloud Build and Artifact Registry services, if not already enabled.

    !gcloudservicesenablecloudbuild.googleapis.com!gcloudservicesenableartifactregistry.googleapis.com
  2. Create an Artifact Registry repository so that you can upload artifacts. Eachrepository can contain artifacts for a single supported format.

    All repository content is encrypted using either Google-owned and Google-managed encryption keys orcustomer-managed encryption keys. Artifact Registry usesGoogle-owned and Google-managed encryption keys by default and no configuration is requiredfor this option.

    You must have at leastArtifact Registry Writer accessto the repository.

    Run the following command to create a new repository. The command uses the--async flag and returns immediately, without waiting for the operation inprogress to complete.

    gcloudartifactsrepositoriescreateREPOSITORY\--repository-format=docker\--location=LOCATION\--async

    Replace the following values:

    • REPOSITORY: a name for your repository. For eachrepository location in a project, repository names must be unique.
    • LOCATION: thelocationfor your repository.
  3. Before you can push or pull images, configure Docker to authenticate requestsfor Artifact Registry. To set up authentication to Docker repositories, run thefollowing command:

    gcloudauthconfigure-dockerLOCATION-docker.pkg.dev

    The command updates your Docker configuration. You can now connect withArtifact Registry in your Google Cloud project to push images.

  4. Use Cloud Build to build the container image, and save itto Artifact Registry.

    !cd/home/jupyter/.flink\&&gcloudbuildssubmit\--tagLOCATION.pkg.dev/PROJECT_ID/REPOSITORY/flink:latest\--timeout=20m

    ReplacePROJECT_ID with the project ID of your project.

Warning: Don't build the container image on the notebook instance itself,because it isn't a consistent build environment.

Use custom containers

Depending on the runner, you can use custom containers for different purposes.

For general Apache Beam container usage, see:

For Dataflow container usage, see:

Disable external IP addresses

When creating an Apache Beam notebook instance, to increase security,disable external IP addresses. Because notebook instances need to downloadsome public internet resources, such asArtifact Registry,you need to firstcreate a new VPC network without an external IP address.Then,create a Cloud NAT gateway for this VPC network.For more information about Cloud NAT, see theCloud NAT documentation.Use the VPC network and Cloud NAT gateway to access the necessary public internetresources without enabling external IP addresses.

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.