Java task patterns Stay organized with collections Save and categorize content based on your preferences.
Theecommerce sample applicationdemonstrates best practices for using Dataflow to implement streaming dataanalytics and real-time AI. The example contains task patterns that showthe best way to accomplish Java programming tasks. These tasks are commonly needed tocreate ecommerce applications.
The application contains the following Java task patterns:
- Use Apache Beam schemas to work with structured data
- Use JsonToRow to convert JSON data
- Use the
AutoValuecode generator to generate plain old Java objects (POJOs) - Queue unprocessable data for further analysis
- Apply data validation transforms serially
- Use
DoFn.StartBundleto micro-batch calls to external services - Use an appropriate side-input pattern
Use Apache Beam schemas to work with structured data
You can useApache Beam schemas to make processing structured data easier.
Converting your objects toRows lets you produce very clean Java code, which makes yourdirected acyclic graph (DAG) building exercise easier. You can also reference object properties as fieldswithin the analytics statements that you create, instead of having to callmethods.
Example
Use JsonToRow to convert JSON data
Processing JSON strings in Dataflow is a common need. For example, JSONstrings are processed when streaming clickstream information captured from webapplications. To process JSON strings, you need to convert them into eitherRows orplain old Java objects (POJOs) during pipeline processing.
You can use the Apache Beam built-in transformJsonToRow to convert JSON strings to Rows. However, if you want aqueue for processing unsuccessful messages, you need to build thatseparately, seeQueuing unprocessable data for further analysis.
If you need to convert a JSON string to a POJO usingAutoValue,register a schema for the type by using the@DefaultSchema(AutoValueSchema.class) annotation, then use theConvertutility class. The resulting code is similar to the following:
PCollection<String>json=...PCollection<MyUserType>=json.apply("Parse JSON to Beam Rows",JsonToRow.withSchema(expectedSchema)).apply("Convert to a user type with a compatible schema registered",Convert.to(MyUserType.class))For more information, including what different Java types you can infer schemasfrom, seeCreating Schemas.
If JsonToRow does not work with your data,Gson is an alternative. Gson is fairly relaxed in its default processing of data, which mightrequire you to build more validation into the data conversion process.
Examples
Use theAutoValue code generator to generate POJOs
Apache Beam schemas are often the best way to represent objects in a pipeline, because of the way theylet you work with structured data. Nevertheless, at times aplain old Java object (POJO) is needed, such as when dealing with key-value objects or handling object state.Hand building POJOs requires you to code overrides for theequals() andhashcode() methods, which can be time consuming and error prone. Incorrect overridesmight result in inconsistent application behavior or data loss.
To generate POJOs, use theAutoValue class builder. This option ensures that the necessary overrides are used andlets you avoid potential errors.AutoValue is heavily used within the Apache Beam codebase,so familiarity with this class builder is useful if you want to developApache Beam pipelines on Dataflow using Java.
You can alsoAutoValue with Apache Beam schemas if you add an@DefaultSchema(AutoValueSchema.class) annotation. For more information, seeCreating Schemas.
For more information aboutAutoValue,seeWhyAutoValue? and theAutoValue docs.
Example
Queue unprocessable data for further analysis
In production systems, it is important to handle problematic data. If possible,you validate and correct data in-stream. When correctionisn't possible, log the value to an unprocessed messages queue, sometimes called adead-letter queue, for later analysis. Issues commonly occur whenconverting data from one format to another, for example when converting JSONstrings toRows.
To address this issue, use a multi-output transform to shuttle theelements containing the unprocessed data into anotherPCollection for further analysis. This processing is a common operation that you might wantto use in many places in a pipeline. Try to make the transform genericenough to use in multiple places. First, create an error object towrap common properties, including the original data. Next, create a sinktransform that has multiple options for the destination.
Examples
Apply data validation transforms serially
Data collected from external systems often needs cleaning. Structureyour pipeline so that it can correct problematic data in-stream when possible.Send the data to aqueue for further analysiswhen needed.
Because a single message might suffer from multiple issues that need correction,plan out the neededdirected acyclic graph (DAG).If an element contains data with multiple defects, you must ensure that the elementflows through the appropriate transforms.
For example, imagine an element with the following values, neither of which should benull:
{"itemA": null,"itemB": null}
Make sure the element flows through transforms that correct both potential issues:
badElements.apply(fixItemA).apply(fixItemB)
Your pipeline might have more serial steps, butfusionhelps to minimize the processing overhead introduced.
Example
UseDoFn.StartBundle to micro-batch calls to external services
You might need to invoke external APIs as part of your pipeline. Because apipeline distributes work across many compute resources, making a single callfor each element flowing through the system can overwhelm anexternal service endpoint. This issue is particularly common when you haven'tapplied any reducing functions.
To avoid this issue, batch calls to external systems.
You can batch calls using aGroupByKey transform or usingthe Apache Beam Timer API. However, these approaches both requireshuffling, whichintroduces some processing overhead and the need for amagic numberto determine the key space.
Instead, use theStartBundle andFinishBundle lifecycle elements to batch your data. With these options, no shuffling is needed.
One minor downside to this option is that bundle sizes are dynamically determined by theimplementation of the runner based on what's currently happening inside thepipeline and its workers. In stream mode, bundles are often small insize. Dataflow bundling is influenced by backend factors likesharding usage, how much data is available for a particular key, and thethroughput of the pipeline.
Example
EventItemCorrectionService.java
Use an appropriate side-input pattern for data enrichment
In streaming analytics applications, data is often enriched with additionalinformation that might be useful for further analysis. For example, if you havethe store ID for a transaction, you might want to add information about the storelocation. This additional information is often added by taking anelement and bringing in information from a lookup table.
For lookup tables that are both slowly changing and smaller in size,bringing the table into the pipeline as a singleton class thatimplements theMap<K,V> interface works well. This option lets you avoid havingeach element do an API call for its lookup. After you include a copy of a tablein the pipeline, you need to update it periodically to keep it fresh.
To handle slow updating side inputs, use the Apache BeamSide input patterns.
Caching
Side inputs are loaded in memory and are therefore cached automatically.
You can set the size of the cache by using the--setWorkerCacheMb option.
You can share the cache acrossDoFn instances and use external triggers to refresh the cache.
Example
SlowMovingStoreLocationDimension.java
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.