Enrich streaming data

Apache Beam simplifies the data enrichment workflow by providing a turnkeyenrichment transform that you can add to your pipeline. This page explains howto use the Apache Beam enrichment transform to enrich your streaming data.

When you enrich data, you augment the raw data from one source by adding relateddata from a second source. The additional data can come from a variety ofsources, such asBigtable orBigQuery. The Apache Beam enrichmenttransform uses a key-value lookup to connect the additional data to the raw data.

The following examples provide some cases where data enrichment is useful:

  • You want to create an ecommerce pipeline that captures user activities from awebsite or app and provides customized recommendations. The transformincorporates the activities into your pipeline data so that you can providethe customized recommendations.
  • You have user data that you want to join with geographical data to dogeography-based analytics.
  • You want to create a pipeline that gathers data from internet-of-things (IOT)devices that send out telemetry events.

Benefits

The enrichment transform has the following benefits:

  • Transforms your data without requiring you to write complex code or manageunderlying libraries.
  • Provides built-in source handlers.
  • Uses client-side throttling to manage rate limitingthe requests. The requests are exponentially backed off with a default retrystrategy. You can configure rate limiting to suit your use case.

Support and limitations

The enrichment transform has the following requirements:

  • Available for batch and streaming pipelines.
  • TheBigTableEnrichmentHandler handler is available in the Apache BeamPython SDK versions 2.54.0 and later.
  • TheBigQueryEnrichmentHandler handler is available in the Apache BeamPython SDK versions 2.57.0 and later.
  • TheVertexAIFeatureStoreEnrichmentHandler handler is available in the Apache BeamPython SDK versions 2.55.0 and later.
  • When using the Apache Beam Python SDK versions 2.55.0 and later, youalso need to install thePython client for Redis.
  • Dataflow jobs must useRunner v2.

Use the enrichment transform

To use the enrichment transform, include the following code inyour pipeline:

importapache_beamasbeamfromapache_beam.transforms.enrichmentimportEnrichmentfromapache_beam.transforms.enrichment_handlers.bigtableimportBigTableEnrichmentHandlerbigtable_handler=BigTableEnrichmentHandler(...)withbeam.Pipeline()asp:output=(p...|"Create" >>beam.Create(data)|"Enrich with Bigtable" >>Enrichment(bigtable_handler)...)

Because the enrichment transform performs a cross join by default, design thecustom join to enrich the input data. This design ensures that the join includesonly the specified fields.

In the following example,left is the input element of the enrichmenttransform, andright is data fetched from an external service for that inputelement.

defcustom_join(left:Dict[str,Any],right:Dict[str,Any]):enriched={}enriched['FIELD_NAME']=left['FIELD_NAME']...returnbeam.Row(**enriched)

Parameters

To use the enrichment transform, theEnrichmentHandler parameter is required.

You can also use a configuration parameter to specify alambda function for a joinfunction, a timeout, a throttler, or a repeater (retry strategy). The followingconfiguration parameters are available:

  • join_fn: Alambda function that takes dictionaries as input and returns anenriched row (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Theenriched row specifies how to join the data fetched from the API.Defaults to a cross join.
  • timeout: The number of seconds to wait for the request to be completed bythe API before timing out. Defaults to 30 seconds.
  • throttler: Specifies the throttling mechanism. The only supported option isdefault client-side adaptive throttling.
  • repeater: Specifies the retry strategy when errors likeTooManyRequestsandTimeoutException occur. Defaults toExponentialBackOffRepeater.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.