Process data in bulk with Dataflow Stay organized with collections Save and categorize content based on your preferences.
This page gives examples of how to useDataflow to performbulkCloud Firestore operations in an Apache Beampipeline.Apache Beam supports a connector forCloud Firestore. You can use thisconnector to run batch and streaming operations in Dataflow.
We recommend using Dataflow and Apache Beam for large scale dataprocessing workloads.
TheCloud Firestore connector for Apache Beam is available in Java. For moreinformation about theCloud Firestore connector, see theApache Beam SDK for Java.
Before you begin
Before you read this page, you should be familiar with theProgramming model for Apache Beam.
To run the samples, you mustenable the Dataflow API.ExampleCloud Firestore pipelines
The examples below demonstrate a pipeline that writes data and one thatreads and filters data. You can use these samples as a starting point for yourown pipelines.
Running the sample pipelines
The source code for the samples is available in thegoogleapis/java-firestore GitHub repository. To run these samples, download the source codeand see theREADME.
ExampleWrite pipeline
The following example creates documents in thecities-beam-sample collection:
publicclassExampleFirestoreBeamWrite{privatestaticfinalFirestoreOptionsFIRESTORE_OPTIONS=FirestoreOptions.getDefaultInstance();publicstaticvoidmain(String[]args){runWrite(args,"cities-beam-sample");}publicstaticvoidrunWrite(String[]args,StringcollectionId){// create pipeline options from the passed in argumentsPipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);Pipelinepipeline=Pipeline.create(options);RpcQosOptionsrpcQosOptions=RpcQosOptions.newBuilder().withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()).build();// create some writesWritewrite1=Write.newBuilder().setUpdate(Document.newBuilder()// resolves to// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/NYC.setName(createDocumentName(collectionId,"NYC")).putFields("name",Value.newBuilder().setStringValue("New York City").build()).putFields("state",Value.newBuilder().setStringValue("New York").build()).putFields("country",Value.newBuilder().setStringValue("USA").build())).build();Writewrite2=Write.newBuilder().setUpdate(Document.newBuilder()// resolves to// projects/<projectId>/databases/<databaseId>/documents/<collectionId>/TOK.setName(createDocumentName(collectionId,"TOK")).putFields("name",Value.newBuilder().setStringValue("Tokyo").build()).putFields("country",Value.newBuilder().setStringValue("Japan").build()).putFields("capital",Value.newBuilder().setBooleanValue(true).build())).build();// batch write the datapipeline.apply(Create.of(write1,write2)).apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(rpcQosOptions).build());// run the pipelinepipeline.run().waitUntilFinish();}privatestaticStringcreateDocumentName(StringcollectionId,StringcityDocId){StringdocumentPath=String.format("projects/%s/databases/%s/documents",FIRESTORE_OPTIONS.getProjectId(),FIRESTORE_OPTIONS.getDatabaseId());returndocumentPath+"/"+collectionId+"/"+cityDocId;}}
The example uses the following arguments to configure and run a pipeline:
GOOGLE_CLOUD_PROJECT=project-idREGION=regionTEMP_LOCATION=gs://temp-bucket/temp/NUM_WORKERS=number-workersMAX_NUM_WORKERS=max-number-workers
ExampleRead Pipeline
The following example pipeline reads documents from thecities-beam-samplecollection, applies a filter for documents where fieldcountry is set toUSA, and returns the names of the matching documents.
publicclassExampleFirestoreBeamRead{publicstaticvoidmain(String[]args){runRead(args,"cities-beam-sample");}publicstaticvoidrunRead(String[]args,StringcollectionId){FirestoreOptionsfirestoreOptions=FirestoreOptions.getDefaultInstance();PipelineOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);Pipelinepipeline=Pipeline.create(options);RpcQosOptionsrpcQosOptions=RpcQosOptions.newBuilder().withHintMaxNumWorkers(options.as(DataflowPipelineOptions.class).getMaxNumWorkers()).build();pipeline.apply(Create.of(collectionId)).apply(newFilterDocumentsQuery(firestoreOptions.getProjectId(),firestoreOptions.getDatabaseId())).apply(FirestoreIO.v1().read().runQuery().withRpcQosOptions(rpcQosOptions).build()).apply(ParDo.of(// transform each document to its namenewDoFn<RunQueryResponse,String>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){c.output(Objects.requireNonNull(c.element()).getDocument().getName());}})).apply(ParDo.of(// print the document namenewDoFn<String,Void>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){System.out.println(c.element());}}));pipeline.run().waitUntilFinish();}privatestaticfinalclassFilterDocumentsQueryextendsPTransform<PCollection<String>,PCollection<RunQueryRequest>>{privatefinalStringprojectId;privatefinalStringdatabaseId;publicFilterDocumentsQuery(StringprojectId,StringdatabaseId){this.projectId=projectId;this.databaseId=databaseId;}@OverridepublicPCollection<RunQueryRequest>expand(PCollection<String>input){returninput.apply(ParDo.of(newDoFn<String,RunQueryRequest>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){// select from collection "cities-collection-<uuid>"StructuredQuery.CollectionSelectorcollection=StructuredQuery.CollectionSelector.newBuilder().setCollectionId(Objects.requireNonNull(c.element())).build();// filter where country is equal to USAStructuredQuery.FiltercountryFilter=StructuredQuery.Filter.newBuilder().setFieldFilter(StructuredQuery.FieldFilter.newBuilder().setField(StructuredQuery.FieldReference.newBuilder().setFieldPath("country").build()).setValue(Value.newBuilder().setStringValue("USA").build()).setOp(StructuredQuery.FieldFilter.Operator.EQUAL)).buildPartial();RunQueryRequestrunQueryRequest=RunQueryRequest.newBuilder().setParent(DocumentRootName.format(projectId,databaseId)).setStructuredQuery(StructuredQuery.newBuilder().addFrom(collection).setWhere(countryFilter).build()).build();c.output(runQueryRequest);}}));}}}
The example uses the following arguments to configure and run a pipeline:
GOOGLE_CLOUD_PROJECT=project-idREGION=regionTEMP_LOCATION=gs://temp-bucket/temp/NUM_WORKERS=number-workersMAX_NUM_WORKERS=max-number-workers
Pricing
Running aCloud Firestore workload in Dataflow incurs costsforCloud Firestore usage and Dataflow usage. Dataflow usage is billed for resources that your jobs use. See theDataflow pricing pagefor details. ForCloud Firestore pricing, see thePricing page.
What's next
- SeeUsing Firestore and Apache Beam for data processing for another pipeline example.
- For more about Dataflow and Apache Beam, see theDataflow documentation.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-18 UTC.