Enrich streaming data Stay organized with collections Save and categorize content based on your preferences.
Apache Beam simplifies the data enrichment workflow by providing a turnkeyenrichment transform that you can add to your pipeline. This page explains howto use the Apache Beam enrichment transform to enrich your streaming data.
When you enrich data, you augment the raw data from one source by adding relateddata from a second source. The additional data can come from a variety ofsources, such asBigtable orBigQuery. The Apache Beam enrichmenttransform uses a key-value lookup to connect the additional data to the raw data.
The following examples provide some cases where data enrichment is useful:
- You want to create an ecommerce pipeline that captures user activities from awebsite or app and provides customized recommendations. The transformincorporates the activities into your pipeline data so that you can providethe customized recommendations.
- You have user data that you want to join with geographical data to dogeography-based analytics.
- You want to create a pipeline that gathers data from internet-of-things (IOT)devices that send out telemetry events.
Benefits
The enrichment transform has the following benefits:
- Transforms your data without requiring you to write complex code or manageunderlying libraries.
- Provides built-in source handlers.
- Use the
BigTableEnrichmentHandlerhandlerto enrich your data by using aBigtable source without passing configuration details. - Use the
BigQueryEnrichmentHandlerhandlerto enrich your data by using aBigQuery source without passing configuration details. - Use the
VertexAIFeatureStoreEnrichmentHandlerhandlerwithVertex AI Feature StoreandBigtable online serving.
- Use the
- Uses client-side throttling to manage rate limitingthe requests. The requests are exponentially backed off with a default retrystrategy. You can configure rate limiting to suit your use case.
Support and limitations
The enrichment transform has the following requirements:
- Available for batch and streaming pipelines.
- The
BigTableEnrichmentHandlerhandler is available in the Apache BeamPython SDK versions 2.54.0 and later. - The
BigQueryEnrichmentHandlerhandler is available in the Apache BeamPython SDK versions 2.57.0 and later. - The
VertexAIFeatureStoreEnrichmentHandlerhandler is available in the Apache BeamPython SDK versions 2.55.0 and later. - When using the Apache Beam Python SDK versions 2.55.0 and later, youalso need to install thePython client for Redis.
- Dataflow jobs must useRunner v2.
Use the enrichment transform
To use the enrichment transform, include the following code inyour pipeline:
importapache_beamasbeamfromapache_beam.transforms.enrichmentimportEnrichmentfromapache_beam.transforms.enrichment_handlers.bigtableimportBigTableEnrichmentHandlerbigtable_handler=BigTableEnrichmentHandler(...)withbeam.Pipeline()asp:output=(p...|"Create" >>beam.Create(data)|"Enrich with Bigtable" >>Enrichment(bigtable_handler)...)Because the enrichment transform performs a cross join by default, design thecustom join to enrich the input data. This design ensures that the join includesonly the specified fields.
In the following example,left is the input element of the enrichmenttransform, andright is data fetched from an external service for that inputelement.
defcustom_join(left:Dict[str,Any],right:Dict[str,Any]):enriched={}enriched['FIELD_NAME']=left['FIELD_NAME']...returnbeam.Row(**enriched)Parameters
To use the enrichment transform, theEnrichmentHandler parameter is required.
You can also use a configuration parameter to specify alambda function for a joinfunction, a timeout, a throttler, or a repeater (retry strategy). The followingconfiguration parameters are available:
join_fn: Alambdafunction that takes dictionaries as input and returns anenriched row (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Theenriched row specifies how to join the data fetched from the API.Defaults to a cross join.timeout: The number of seconds to wait for the request to be completed bythe API before timing out. Defaults to 30 seconds.throttler: Specifies the throttling mechanism. The only supported option isdefault client-side adaptive throttling.repeater: Specifies the retry strategy when errors likeTooManyRequestsandTimeoutExceptionoccur. Defaults toExponentialBackOffRepeater.
What's next
- For more examples, seeEnrichment transformin the Apache Beam transform catalog.
- Use Apache Beam and Bigtable to enrich data.
- Use Apache Beam and BigQuery to enrich data.
- Use Apache Beam and Vertex AI Feature Store to enrich data.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.