Process data in bulk with Dataflow

This page gives examples of how to useDataflow to performbulkCloud Firestore operations in an Apache Beampipeline.Apache Beam supports a connector forCloud Firestore. You can use thisconnector to run batch and streaming operations in Dataflow.

We recommend using Dataflow and Apache Beam for large scale dataprocessing workloads.

TheCloud Firestore connector for Apache Beam is available in Java. For moreinformation about theCloud Firestore connector, see theApache Beam SDK for Java.

Before you begin

Before you read this page, you should be familiar with theProgramming model for Apache Beam.

To run the samples, you mustenable the Dataflow API.

ExampleCloud Firestore pipelines

The examples below demonstrate a pipeline that writes data and one thatreads and filters data. You can use these samples as a starting point for yourown pipelines.

Running the sample pipelines

The source code for the samples is available in thegoogleapis/java-firestore GitHub repository. To run these samples, download the source codeand see theREADME.

ExampleWrite pipeline

The following example creates documents in thecities-beam-sample collection:

publicclassExampleFirestoreBeamWrite{privatestaticfinalFirestoreOptionsFIRESTORE_OPTIONS=FirestoreOptions.getDefaultInstance();publicstaticvoidmain(String[]args){runWrite(args,"cities-beam-sample");}publicstaticvoidrunWrite(String[]args,StringcollectionId){// create pipeline options from the passed in argumentsPipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);Pipelinepipeline=Pipeline.create(options);RpcQosOptionsrpcQosOptions=RpcQosOptions.newBuilder().withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()).build();// create some writesWritewrite1=Write.newBuilder().setUpdate(Document.newBuilder()// resolves to// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC.setName(createDocumentName(collectionId,"NYC")).putFields("name",Value.newBuilder().setStringValue("New York City").build()).putFields("state",Value.newBuilder().setStringValue("New York").build()).putFields("country",Value.newBuilder().setStringValue("USA").build())).build();Writewrite2=Write.newBuilder().setUpdate(Document.newBuilder()// resolves to// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK.setName(createDocumentName(collectionId,"TOK")).putFields("name",Value.newBuilder().setStringValue("Tokyo").build()).putFields("country",Value.newBuilder().setStringValue("Japan").build()).putFields("capital",Value.newBuilder().setBooleanValue(true).build())).build();// batch write the datapipeline.apply(Create.of(write1,write2)).apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());// run the pipelinepipeline.run().waitUntilFinish();}privatestaticStringcreateDocumentName(StringcollectionId,StringcityDocId){StringdocumentPath=String.format("projects/%s/databases/%s/documents",FIRESTORE_OPTIONS.getProjectId(),FIRESTORE_OPTIONS.getDatabaseId());returndocumentPath+"/"+collectionId+"/"+cityDocId;}}

The example uses the following arguments to configure and run a pipeline:

GOOGLE_CLOUD_PROJECT=project-idREGION=regionTEMP_LOCATION=gs://temp-bucket/temp/NUM_WORKERS=number-workersMAX_NUM_WORKERS=max-number-workers

ExampleRead Pipeline

The following example pipeline reads documents from thecities-beam-samplecollection, applies a filter for documents where fieldcountry is set toUSA, and returns the names of the matching documents.

publicclassExampleFirestoreBeamRead{publicstaticvoidmain(String[]args){runRead(args,"cities-beam-sample");}publicstaticvoidrunRead(String[]args,StringcollectionId){FirestoreOptionsfirestoreOptions=FirestoreOptions.getDefaultInstance();PipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);Pipelinepipeline=Pipeline.create(options);RpcQosOptionsrpcQosOptions=RpcQosOptions.newBuilder().withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()).build();pipeline.apply(Create.of(collectionId)).apply(newFilterDocumentsQuery(firestoreOptions.getProjectId(),firestoreOptions.getDatabaseId())).apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()).apply(ParDo.of(// transform each document to its namenewDoFn<RunQueryResponse,String>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){c.output(Objects.requireNonNull(c.element()).getDocument().getName());}})).apply(ParDo.of(// print the document namenewDoFn<String,Void>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){System.out.println(c.element());}}));pipeline.run().waitUntilFinish();}privatestaticfinalclassFilterDocumentsQueryextendsPTransform<PCollection<String>,PCollection<RunQueryRequest>>{privatefinalStringprojectId;privatefinalStringdatabaseId;publicFilterDocumentsQuery(StringprojectId,StringdatabaseId){this.projectId=projectId;this.databaseId=databaseId;}@OverridepublicPCollection<RunQueryRequest>expand(PCollection<String>input){returninput.apply(ParDo.of(newDoFn<String,RunQueryRequest>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){// select from collection "cities-collection-<uuid>"StructuredQuery.CollectionSelectorcollection=StructuredQuery.CollectionSelector.newBuilder().setCollectionId(Objects.requireNonNull(c.element())).build();// filter where country is equal to USAStructuredQuery.FiltercountryFilter=StructuredQuery.Filter.newBuilder().setFieldFilter(StructuredQuery.FieldFilter.newBuilder().setField(StructuredQuery.FieldReference.newBuilder().setFieldPath("country").build()).setValue(Value.newBuilder().setStringValue("USA").build()).setOp(StructuredQuery.FieldFilter.Operator.EQUAL)).buildPartial();RunQueryRequestrunQueryRequest=RunQueryRequest.newBuilder().setParent(DocumentRootName.format(projectId,databaseId)).setStructuredQuery(StructuredQuery.newBuilder().addFrom(collection).setWhere(countryFilter).build()).build();c.output(runQueryRequest);}}));}}}

The example uses the following arguments to configure and run a pipeline:

GOOGLE_CLOUD_PROJECT=project-idREGION=regionTEMP_LOCATION=gs://temp-bucket/temp/NUM_WORKERS=number-workersMAX_NUM_WORKERS=max-number-workers

Pricing

Running aCloud Firestore workload in Dataflow incurs costsforCloud Firestore usage and Dataflow usage. Dataflow usage is billed for resources that your jobs use. See theDataflow pricing pagefor details. ForCloud Firestore pricing, see thePricing page.

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-18 UTC.