Import, export, and modify data using Dataflow Stay organized with collections Save and categorize content based on your preferences.
This page describes how to use the Dataflow connector forSpanner to import, export, and modify data in SpannerGoogleSQL-dialect databases and PostgreSQL-dialect databases.
Dataflow is a managed service for transforming and enrichingdata. The Dataflow connector for Spanner lets you readdata from and write data to Spanner in a Dataflowpipeline, optionally transforming or modifying the data. You can also createpipelines that transfer data between Spanner and otherGoogle Cloud products.
The Dataflow connector is the recommended method for efficientlymoving data into and out of Spanner in bulk. It's also therecommended method for performing large transformations to a database which arenot supported byPartitioned DML, such as table moves and bulk deletesthat require a JOIN. When working with individual databases, there are othermethods you can use to import and export data:
- Use the Google Cloud console toexport an individual database fromSpanner to Cloud Storage inAvroformat.
- Use the Google Cloud console toimport a database back intoSpanner from files you exported to Cloud Storage.
- Use the REST API or Google Cloud CLI to runexport orimportjobs from Spanner to Cloud Storage and back also usingAvro format.
The Dataflow connector for Spanner is part of theApache Beam Java SDK, and it provides an API for performing the previousactions. For more information about some of the concepts discussed in this page,such asPCollection objects and transforms, see theApache Beam programmingguide.
Add the connector to your Maven project
To add the Google Cloud Dataflow connector to a Mavenproject, add thebeam-sdks-java-io-google-cloud-platform Maven artifact toyourpom.xml file as a dependency.
For example, assuming that yourpom.xml file setsbeam.version to theappropriate version number, you would add the following dependency:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version></dependency>Read data from Spanner
To read from Spanner, apply theSpannerIO.readtransform. Configure the read using the methods in theSpannerIO.Read class. Applying the transform returns aPCollection<Struct>, where each element in the collectionrepresents an individual row returned by the read operation. You can read fromSpanner with and without a specific SQL query, depending on yourneeded output.
Applying theSpannerIO.read transform returns a consistent view of data byperforming a strong read. Unless you specify otherwise, the result of the readis snapshotted at the time that you started the read. Seereads for moreinformation about the different types of reads Spanner canperform.
Read data using a query
To read a specific set of data from Spanner, configure thetransform using theSpannerIO.Read.withQuery method tospecify a SQL query. For example:
// Query for all the columns and rows in the specified Spanner tablePCollection<Struct>records=pipeline.apply(SpannerIO.read().withInstanceId(instanceId).withDatabaseId(databaseId).withQuery("SELECT * FROM "+options.getTable()));Read data without specifying a query
To read from a database without using a query, you can specify a tablename using theSpannerIO.Read.withTable method, and specify alist of columns to read using theSpannerIO.Read.withColumnsmethod. For example:
GoogleSQL
// Query for all the columns and rows in the specified Spanner tablePCollection<Struct>records=pipeline.apply(SpannerIO.read().withInstanceId(instanceId).withDatabaseId(databaseId).withTable("Singers").withColumns("singerId","firstName","lastName"));PostgreSQL
// Query for all the columns and rows in the specified Spanner tablePCollection<Struct>records=pipeline.apply(SpannerIO.read().withInstanceId(instanceId).withDatabaseId(databaseId).withTable("singers").withColumns("singer_id","first_name","last_name"));To limit the rows read, you can specify a set of primary keys to read using theSpannerIO.Read.withKeySet method.
You can also read a table using a specified secondary index. As with thereadUsingIndex API call,the index must contain all of the data that appears in the query results.
To do so, specify the table as shown in the previous example, and specify theindex that contains the needed column values using theSpannerIO.Read.withIndex method. The index must store all the columns that the transform needs to read. The base table's primary key isimplicitly stored. For example, to read theSongs table using the indexSongsBySongName, you use thefollowing code:
GoogleSQL
// Read the indexed columns from all rows in the specified index.PCollection<Struct>records=pipeline.apply(SpannerIO.read().withInstanceId(instanceId).withDatabaseId(databaseId).withTable("Songs").withIndex("SongsBySongName")// Can only read columns that are either indexed, STORED in the index or// part of the primary key of the Songs table,.withColumns("SingerId","AlbumId","TrackId","SongName"));PostgreSQL
// // Read the indexed columns from all rows in the specified index.PCollection<Struct>records=pipeline.apply(SpannerIO.read().withInstanceId(instanceId).withDatabaseId(databaseId).withTable("Songs").withIndex("SongsBySongName")// Can only read columns that are either indexed, STORED in the index or// part of the primary key of the songs table,.withColumns("singer_id","album_id","track_id","song_name"));SpannerIO.ReadwithQuery andwithTable methodstogether. This is becausewithQuery overrides values that you pass into thewithTable method.Control the staleness of transaction data
A transform is guaranteed to be executed on a consistent snapshot of data. Tocontrol thestaleness of data, use theSpannerIO.Read.withTimestampBound method. Seetransactions for more information.
Read from multiple tables in the same transaction
If you want to read data from multiple tables at the same point in time toensure data consistency, perform all of the reads in a single transaction. To dothis, apply acreateTransaction transform, creatingaPCollectionView<Transaction> object which then creates a transaction. Theresulting view can be passed to a read operation usingSpannerIO.Read.withTransaction.
GoogleSQL
SpannerConfigspannerConfig=SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);PCollectionView<Transaction>tx=pipeline.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig).withTimestampBound(TimestampBound.strong()));PCollection<Struct>singers=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withQuery("SELECT SingerID, FirstName, LastName FROM Singers").withTransaction(tx));PCollection<Struct>albums=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").withTransaction(tx));PostgreSQL
SpannerConfigspannerConfig=SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);PCollectionView<Transaction>tx=pipeline.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig).withTimestampBound(TimestampBound.strong()));PCollection<Struct>singers=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withQuery("SELECT singer_id, first_name, last_name FROM singers").withTransaction(tx));PCollection<Struct>albums=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withQuery("SELECT singer_id, album_id, album_title FROM albums").withTransaction(tx));Read data from all available tables
You can read data from all available tables in a Spanner database.
GoogleSQL
PCollection<Struct>allRecords=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withBatching(false).withQuery("SELECT t.table_name FROM information_schema.tables AS t WHERE t"+".table_catalog = '' AND t.table_schema = ''")).apply(MapElements.into(TypeDescriptor.of(ReadOperation.class)).via((SerializableFunction<Struct,ReadOperation>)input->{StringtableName=input.getString(0);returnReadOperation.create().withQuery("SELECT * FROM "+tableName);})).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));PostgreSQL
PCollection<Struct>allRecords=pipeline.apply(SpannerIO.read().withSpannerConfig(spannerConfig).withBatching(false).withQuery(Statement.newBuilder("SELECT t.table_name FROM information_schema.tables AS t "+"WHERE t.table_catalog = $1 AND t.table_schema = $2").bind("p1").to(spannerConfig.getDatabaseId().get()).bind("p2").to("public").build())).apply(MapElements.into(TypeDescriptor.of(ReadOperation.class)).via((SerializableFunction<Struct,ReadOperation>)input->{StringtableName=input.getString(0);returnReadOperation.create().withQuery("SELECT * FROM \""+tableName+"\"");})).apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));Troubleshoot unsupported queries
The Dataflow connector only supports Spanner SQLqueries where the first operator in the query execution plan is aDistributedUnion. If you attempt to read data from Spanner using aquery and you get an exception stating that the querydoes not have aDistributedUnion at the root, follow the steps inUnderstand howSpanner executes queries to retrieve an execution plan foryour query using the Google Cloud console.
If your SQL query isn't supported, simplify it to a query that has a distributedunion as the first operator in the query execution plan. Remove aggregatefunctions, table joins, as well as the operatorsDISTINCT,GROUP BY, andORDER, as they are the operators that are most likely to prevent the queryfrom working.
Create mutations for a write
Use theMutation class'snewInsertOrUpdateBuilder method instead of thenewInsertBuilder methodunless absolutely necessary for Java pipelines. For Python pipelines, useSpannerInsertOrUpdate instead ofSpannerInsert. Dataflow providesat-least-once guarantees, meaning that the mutation might be writtenseveral times. As a result,INSERT only mutations might generatecom.google.cloud.spanner.SpannerException: ALREADY_EXISTS errors that cause the pipeline to fail. To prevent this error, use theINSERT_OR_UPDATEmutation instead, which adds a new row or updates column values if the rowalready exists. TheINSERT_OR_UPDATE mutation can be applied more than once.
Write to Spanner and transform data
You can write data to Spanner with the Dataflowconnector by using aSpannerIO.write transform to execute acollection of input row mutations. The Dataflow connector groupsmutations into batches for efficiency.
The following example shows how to apply a write transform to aPCollection ofmutations:
GoogleSQL
albums// Spanner expects a Mutation object, so create it using the Album's data.apply("CreateAlbumMutation",ParDo.of(newDoFn<Album,Mutation>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){Albumalbum=c.element();c.output(Mutation.newInsertOrUpdateBuilder("albums").set("singerId").to(album.singerId).set("albumId").to(album.albumId).set("albumTitle").to(album.albumTitle).build());}}))// Write mutations to Spanner.apply("WriteAlbums",SpannerIO.write().withInstanceId(instanceId).withDatabaseId(databaseId));PostgreSQL
PCollectionView<Dialect>dialectView=pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());albums// Spanner expects a Mutation object, so create it using the Album's data.apply("CreateAlbumMutation",ParDo.of(newDoFn<Album,Mutation>(){@ProcessElementpublicvoidprocessElement(ProcessContextc){Albumalbum=c.element();c.output(Mutation.newInsertOrUpdateBuilder("albums").set("singerId").to(album.singerId).set("albumId").to(album.albumId).set("albumTitle").to(album.albumTitle).build());}}))// Write mutations to Spanner.apply("WriteAlbums",SpannerIO.write().withInstanceId(instanceId).withDatabaseId(databaseId).withDialectView(dialectView));If a transform unexpectedly stops before completion, mutations that have alreadybeen applied aren't rolled back.
Note: TheSpannerIO.write transform doesn't guarantee that all of themutations in thePCollection are applied atomically in a single transaction.If a small set of mutations must be applied atomically, seeApply groups of mutations atomically.Apply groups of mutations atomically
You can use theMutationGroup class to ensure that agroup of mutations are applied together atomically. Mutations in aMutationGroup are guaranteed to be submitted in the same transaction, but thetransaction might be retried.
Mutation groups perform best when they are used to group together mutations thataffect data stored close together in the key space. BecauseSpanner interleaves parent and child table data together in theparent table, that data is always close together in the key space. We recommendthat you either structure your mutation group so that it contains one mutationthat's applied to a parent table and additional mutations that are applied tochild tables, or so that all of its mutations modify data that's close togetherin the key space. For more information about how Spanner storesparent and child table data, seeSchema and data model. If youdon't organize your mutation groups around the recommended table hierarchies, orif the data being accessed isn't close together in the key space,Spanner might need to perform two-phase commits, which results inslower performance. For more information, seeLocality tradeoffs.
To useMutationGroup, build aSpannerIO.write transform and call theSpannerIO.Write.grouped method, which returns atransform that you can then apply to aPCollection ofMutationGroup objects.
When creating aMutationGroup, the first mutation listed becomes theprimary mutation. If your mutation group affects both a parent and a childtable, the primary mutation should be a mutation to the parent table. Otherwise,you can use any mutation as the primary mutation. The Dataflowconnector uses the primary mutation to determine partition boundaries in orderto efficiently batch mutations together.
For example, imagine that your application monitors behavior and flagsproblematic user behavior for review. For each flagged behavior, you want toupdate theUsers table to block the user's access to your application, and youalso need to record the incident in thePendingReviews table. To make surethat both of the tables are updated atomically, use aMutationGroup:
GoogleSQL
PCollection<MutationGroup>mutations=suspiciousUserIds.apply(MapElements.via(newSimpleFunction<>(){@OverridepublicMutationGroupapply(StringuserId){// Immediately block the user.MutationuserMutation=Mutation.newUpdateBuilder("Users").set("id").to(userId).set("state").to("BLOCKED").build();longgeneratedId=Hashing.sha1().newHasher().putString(userId,Charsets.UTF_8).putLong(timestamp.getSeconds()).putLong(timestamp.getNanos()).hash().asLong();// Add an entry to pending review requests.MutationpendingReview=Mutation.newInsertOrUpdateBuilder("PendingReviews").set("id").to(generatedId)// Must be deterministically generated..set("userId").to(userId).set("action").to("REVIEW ACCOUNT").set("note").to("Suspicious activity detected.").build();returnMutationGroup.create(userMutation,pendingReview);}}));mutations.apply(SpannerIO.write().withInstanceId(instanceId).withDatabaseId(databaseId).grouped());PostgreSQL
PCollectionView<Dialect>dialectView=pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());PCollection<MutationGroup>mutations=suspiciousUserIds.apply(MapElements.via(newSimpleFunction<String,MutationGroup>(){@OverridepublicMutationGroupapply(StringuserId){// Immediately block the user.MutationuserMutation=Mutation.newUpdateBuilder("Users").set("id").to(userId).set("state").to("BLOCKED").build();longgeneratedId=Hashing.sha1().newHasher().putString(userId,Charsets.UTF_8).putLong(timestamp.getSeconds()).putLong(timestamp.getNanos()).hash().asLong();// Add an entry to pending review requests.MutationpendingReview=Mutation.newInsertOrUpdateBuilder("PendingReviews").set("id").to(generatedId)// Must be deterministically generated..set("userId").to(userId).set("action").to("REVIEW ACCOUNT").set("note").to("Suspicious activity detected.").build();returnMutationGroup.create(userMutation,pendingReview);}}));mutations.apply(SpannerIO.write().withInstanceId(instanceId).withDatabaseId(databaseId).withDialectView(dialectView).grouped());When creating a mutation group, the first mutation supplied as an argumentbecomes the primary mutation. In this case, the two tables are unrelated, sothere is no clear primary mutation. We've selecteduserMutation as primary byplacing it first. Applying the two mutations separately would be faster, butwouldn't guarantee atomicity, so the mutation group is the best choice in thissituation.
What's next
- Learn more aboutdesigning an Apache Beam data pipeline.
- Export andimport Spanner databases in theGoogle Cloud console using Dataflow.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-12-15 UTC.