Spanner Change Streams to Source Database template

Streaming pipeline. Reads data from Spanner Change Streams and writes them to a source.

Note: The Spanner change streams connector doesn't support draining a job. To stop the pipeline,cancel the job instead of draining it. Draining a job with the Spanner change streams connector can cause it to become stuck in a draining state. For more information, seeDraining in the Spanner documentation.

Template parameters

ParameterDescription
changeStreamNameThe name of the Spanner change stream that the pipeline reads from.
instanceIdThe name of the Spanner instance where the change stream is present.
databaseIdThe name of the Spanner database that the change stream monitors.
spannerProjectIdThe name of the Spanner project.
metadataInstanceThe instance to store the metadata used by the connector to control the consumption of the change stream API data.
metadataDatabaseThe database to store the metadata used by the connector to control the consumption of the change stream API data.
sourceShardsFilePathPath to a Cloud Storage file containing connection profile information for source shards.
startTimestampOptional: The starting timestamp for reading changes. Defaults to empty.
endTimestampOptional: The end timestamp for reading changes. If no timestamp provided, reads indefinitely. Defaults to empty.
shadowTablePrefixOptional: The prefix used to name shadow tables. Default:shadow_.
sessionFilePathOptional: Session path in Cloud Storage that contains mapping information from HarbourBridge.
filtrationModeOptional: Mode of filtration. Specifies how to drop certain records based on a criteria. Supported modes are:none (filter nothing),forward_migration (filter records written using the forward migration pipeline). Defaults toforward_migration.
shardingCustomJarPathOptional: Custom JAR file location in Cloud Storage that contains the customization logic for fetching the shard id. If you set this parameter, set theshardingCustomJarPath parameter. Defaults to empty.
shardingCustomClassNameOptional: Fully qualified class name having the custom shard id implementation. IfshardingCustomJarPath is specified, this parameter is required. Defaults to empty.
shardingCustomParametersOptional: String containing any custom parameters to be passed to the custom sharding class. Defaults to empty.
sourceDbTimezoneOffsetOptional: The timezone offset from UTC for the source database. Example value: +10:00. Defaults to:+00:00.
dlqGcsPubSubSubscriptionOptional: The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ retry directory when running in regular mode. The name should be in the format of projects/<project-id>/subscriptions/<subscription-name>. When set, the deadLetterQueueDirectory and dlqRetryMinutes are ignored.
skipDirectoryNameOptional: Records skipped from reverse replication are written to this directory. Default directory name is skip.
maxShardConnectionsOptional: The maximum number of connections that a given shard can accept. Defaults to:10000.
deadLetterQueueDirectoryOptional: The path used when storing the error queue output. The default path is a directory under the Dataflow job's temp location.
dlqMaxRetryCountOptional: The maximum number of times that temporary errors can be retried through the dead-letter queue. Defaults to 500.
runModeOptional: The run mode type. Supported values:regular,retryDLQ. Default:regular. SpecifyretryDLQ is retry severe dead-letter queue records only.
dlqRetryMinutesOptional: The number of minutes between dead-letter queue retries. Defaults to 10.

Run the template

Console

  1. Go to the DataflowCreate job from template page.
  2. Go to Create job from template
  3. In theJob name field, enter a unique job name.
  4. Optional: ForRegional endpoint, select a value from the drop-down menu. The default region isus-central1.

    For a list of regions where you can run a Dataflow job, seeDataflow locations.

  5. From theDataflow template drop-down menu, select theSpanner Change Streams to Source Database template.
  6. In the provided parameter fields, enter your parameter values.
  7. ClickRun job.

gcloud CLI

Note: To use the Google Cloud CLI to run flex templates, you must haveGoogle Cloud CLI version 284.0.0 or later.

In your shell or terminal, run the template:

gclouddataflowflex-templaterunJOB_NAME\--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb\--project=PROJECT_ID\--region=REGION_NAME\--parameters\changeStreamName=CHANGE_STREAM_NAME,\instanceId=INSTANCE_ID,\databaseId=DATABASE_ID,\spannerProjectId=SPANNER_PROJECT_ID,\metadataInstance=METADATA_INSTANCE,\metadataDatabase=METADATA_DATABASE,\sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Replace the following:

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, seeprojects.templates.launch.

POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch{"launchParameter":{"jobName":"JOB_NAME","parameters":{"changeStreamName":"CHANGE_STREAM_NAME","instanceId":"INSTANCE_ID","databaseId":"DATABASE_ID","spannerProjectId":"SPANNER_PROJECT_ID","metadataInstance":"METADATA_INSTANCE","metadataDatabase":"METADATA_DATABASE","sourceShardsFilePath":"SOURCE_SHARDS_FILE_PATH",},"containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb","environment":{"maxWorkers":"10"}}}

Replace the following:

Template source code

Java

/* * Copyright (C) 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */packagecom.google.cloud.teleport.v2.templates;import staticcom.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;importcom.datastax.oss.driver.api.core.CqlSession;importcom.datastax.oss.driver.api.core.CqlSessionBuilder;importcom.datastax.oss.driver.api.core.config.DriverConfigLoader;importcom.google.cloud.Timestamp;importcom.google.cloud.spanner.Options.RpcPriority;importcom.google.cloud.teleport.metadata.Template;importcom.google.cloud.teleport.metadata.TemplateCategory;importcom.google.cloud.teleport.metadata.TemplateParameter;importcom.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;importcom.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;importcom.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;importcom.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;importcom.google.cloud.teleport.v2.coders.FailsafeElementCoder;importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;importcom.google.cloud.teleport.v2.spanner.ddl.Ddl;importcom.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;importcom.google.cloud.teleport.v2.spanner.migrations.shard.Shard;importcom.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;importcom.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;importcom.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;importcom.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeValidator;importcom.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;importcom.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;importcom.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;importcom.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;importcom.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;importcom.google.cloud.teleport.v2.spanner.sourceddl.SourceSchemaScanner;importcom.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;importcom.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;importcom.google.cloud.teleport.v2.templates.constants.Constants;importcom.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;importcom.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;importcom.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;importcom.google.cloud.teleport.v2.templates.transforms.FilterRecordsFn;importcom.google.cloud.teleport.v2.templates.transforms.PreprocessRecordsFn;importcom.google.cloud.teleport.v2.templates.transforms.SourceWriterTransform;importcom.google.cloud.teleport.v2.templates.transforms.SpannerInformationSchemaProcessorTransform;importcom.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;importcom.google.cloud.teleport.v2.transforms.DLQWriteTransform;importcom.google.cloud.teleport.v2.values.FailsafeElement;importcom.google.common.base.Strings;importcom.zaxxer.hikari.HikariConfig;importcom.zaxxer.hikari.HikariDataSource;importjava.sql.Connection;importjava.sql.SQLException;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;importorg.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;importorg.apache.beam.runners.dataflow.options.DataflowPipelineOptions;importorg.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;importorg.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.coders.KvCoder;importorg.apache.beam.sdk.coders.StringUtf8Coder;importorg.apache.beam.sdk.coders.VarLongCoder;importorg.apache.beam.sdk.extensions.avro.coders.AvroCoder;importorg.apache.beam.sdk.io.FileSystems;importorg.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;importorg.apache.beam.sdk.io.gcp.spanner.SpannerConfig;importorg.apache.beam.sdk.io.gcp.spanner.SpannerIO;importorg.apache.beam.sdk.io.gcp.spanner.SpannerServiceFactoryImpl;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.StreamingOptions;importorg.apache.beam.sdk.options.ValueProvider;importorg.apache.beam.sdk.transforms.Flatten;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.transforms.ParDo;importorg.apache.beam.sdk.transforms.Reshuffle;importorg.apache.beam.sdk.transforms.View;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.PCollectionList;importorg.apache.beam.sdk.values.PCollectionTuple;importorg.apache.beam.sdk.values.PCollectionView;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/** This pipeline reads Spanner Change streams data and writes them to a source DB. */@Template(name="Spanner_to_SourceDb",category=TemplateCategory.STREAMING,displayName="Spanner Change Streams to Source Database",description="Streaming pipeline. Reads data from Spanner Change Streams and"+" writes them to a source.",optionsClass=Options.class,flexContainerName="spanner-to-sourcedb",contactInformation="https://cloud.google.com/support",hidden=false,streaming=true)publicclassSpannerToSourceDb{privatestaticfinalLoggerLOG=LoggerFactory.getLogger(SpannerToSourceDb.class);/**   * Options supported by the pipeline.   *   * <p>Inherits standard configuration options.   */publicinterfaceOptionsextendsPipelineOptions,StreamingOptions{@TemplateParameter.Text(order=1,optional=false,description="Name of the change stream to read from",helpText="This is the name of the Spanner change stream that the pipeline will read from.")StringgetChangeStreamName();voidsetChangeStreamName(Stringvalue);@TemplateParameter.Text(order=2,optional=false,description="Cloud Spanner Instance Id.",helpText="This is the name of the Cloud Spanner instance where the changestream is present.")StringgetInstanceId();voidsetInstanceId(Stringvalue);@TemplateParameter.Text(order=3,optional=false,description="Cloud Spanner Database Id.",helpText="This is the name of the Cloud Spanner database that the changestream is monitoring")StringgetDatabaseId();voidsetDatabaseId(Stringvalue);@TemplateParameter.ProjectId(order=4,optional=false,description="Cloud Spanner Project Id.",helpText="This is the name of the Cloud Spanner project.")StringgetSpannerProjectId();voidsetSpannerProjectId(StringprojectId);@TemplateParameter.Text(order=5,optional=false,description="Cloud Spanner Instance to store metadata when reading from changestreams",helpText="This is the instance to store the metadata used by the connector to control the"+" consumption of the change stream API data.")StringgetMetadataInstance();voidsetMetadataInstance(Stringvalue);@TemplateParameter.Text(order=6,optional=false,description="Cloud Spanner Database to store metadata when reading from changestreams",helpText="This is the database to store the metadata used by the connector to control the"+" consumption of the change stream API data.")StringgetMetadataDatabase();voidsetMetadataDatabase(Stringvalue);@TemplateParameter.Text(order=7,optional=true,description="Cloud Spanner metadata table name",helpText="The Spanner change streams connector metadata table name to use. If not provided,"+" Spanner automatically creates the streams connector metadata table during the pipeline flow"+" change. You must provide this parameter when updating an existing pipeline to ensure"+" that the metadata table from the original job is carried over.")StringgetSpannerMetadataTableName();voidsetSpannerMetadataTableName(Stringvalue);@TemplateParameter.Text(order=8,optional=true,description="Changes are read from the given timestamp",helpText="Read changes from the given timestamp.")@Default.String("")StringgetStartTimestamp();voidsetStartTimestamp(Stringvalue);@TemplateParameter.Text(order=9,optional=true,description="Changes are read until the given timestamp",helpText="Read changes until the given timestamp. If no timestamp provided, reads indefinitely.")@Default.String("")StringgetEndTimestamp();voidsetEndTimestamp(Stringvalue);@TemplateParameter.Text(order=10,optional=true,description="Cloud Spanner shadow table prefix.",helpText="The prefix used to name shadow tables. Default: `shadow_`.")@Default.String("rev_shadow_")StringgetShadowTablePrefix();voidsetShadowTablePrefix(Stringvalue);@TemplateParameter.GcsReadFile(order=11,optional=false,description="Path to GCS file containing the the Source shard details",helpText="Path to GCS file containing connection profile info for source shards.")StringgetSourceShardsFilePath();voidsetSourceShardsFilePath(Stringvalue);@TemplateParameter.GcsReadFile(order=12,optional=true,description="Session File Path in Cloud Storage",helpText="Session file path in Cloud Storage that contains mapping information from"+" HarbourBridge")StringgetSessionFilePath();voidsetSessionFilePath(Stringvalue);@TemplateParameter.Enum(order=13,optional=true,enumOptions={@TemplateEnumOption("none"),@TemplateEnumOption("forward_migration")},description="Filtration mode",helpText="Mode of Filtration, decides how to drop certain records based on a criteria. Currently"+" supported modes are: none (filter nothing), forward_migration (filter records"+" written via the forward migration pipeline). Defaults to forward_migration.")@Default.String("forward_migration")StringgetFiltrationMode();voidsetFiltrationMode(Stringvalue);@TemplateParameter.GcsReadFile(order=14,optional=true,description="Custom jar location in Cloud Storage",helpText="Custom jar location in Cloud Storage that contains the customization logic"+" for fetching shard id.")@Default.String("")StringgetShardingCustomJarPath();voidsetShardingCustomJarPath(Stringvalue);@TemplateParameter.Text(order=15,optional=true,description="Custom class name",helpText="Fully qualified class name having the custom shard id implementation.  It is a"+" mandatory field in case shardingCustomJarPath is specified")@Default.String("")StringgetShardingCustomClassName();voidsetShardingCustomClassName(Stringvalue);@TemplateParameter.Text(order=16,optional=true,description="Custom sharding logic parameters",helpText="String containing any custom parameters to be passed to the custom sharding class.")@Default.String("")StringgetShardingCustomParameters();voidsetShardingCustomParameters(Stringvalue);@TemplateParameter.Text(order=17,optional=true,description="SourceDB timezone offset",helpText="This is the timezone offset from UTC for the source database. Example value: +10:00")@Default.String("+00:00")StringgetSourceDbTimezoneOffset();voidsetSourceDbTimezoneOffset(Stringvalue);@TemplateParameter.PubsubSubscription(order=18,optional=true,description="The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"+" retry directory when running in regular mode.",helpText="The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"+" retry directory when running in regular mode. The name should be in the format"+" of projects/<project-id>/subscriptions/<subscription-name>. When set, the"+" deadLetterQueueDirectory and dlqRetryMinutes are ignored.")StringgetDlqGcsPubSubSubscription();voidsetDlqGcsPubSubSubscription(Stringvalue);@TemplateParameter.Text(order=19,optional=true,description="Directory name for holding skipped records",helpText="Records skipped from reverse replication are written to this directory. Default"+" directory name is skip.")@Default.String("skip")StringgetSkipDirectoryName();voidsetSkipDirectoryName(Stringvalue);@TemplateParameter.Long(order=20,optional=true,description="Maximum connections per shard.",helpText="This will come from shard file eventually.")@Default.Long(10000)LonggetMaxShardConnections();voidsetMaxShardConnections(Longvalue);@TemplateParameter.Text(order=21,optional=true,description="Dead letter queue directory.",helpText="The file path used when storing the error queue output. "+"The default file path is a directory under the Dataflow job's temp location.")@Default.String("")StringgetDeadLetterQueueDirectory();voidsetDeadLetterQueueDirectory(Stringvalue);@TemplateParameter.Integer(order=22,optional=true,description="Dead letter queue maximum retry count",helpText="The max number of times temporary errors can be retried through DLQ. Defaults to 500.")@Default.Integer(500)IntegergetDlqMaxRetryCount();voidsetDlqMaxRetryCount(Integervalue);@TemplateParameter.Enum(order=23,optional=true,description="Run mode - currently supported are : regular or retryDLQ",enumOptions={@TemplateEnumOption("regular"),@TemplateEnumOption("retryDLQ")},helpText="This is the run mode type, whether regular or with retryDLQ.Default is regular."+" retryDLQ is used to retry the severe DLQ records only.")@Default.String("regular")StringgetRunMode();voidsetRunMode(Stringvalue);@TemplateParameter.Integer(order=24,optional=true,description="Dead letter queue retry minutes",helpText="The number of minutes between dead letter queue retries. Defaults to 10.")@Default.Integer(10)IntegergetDlqRetryMinutes();voidsetDlqRetryMinutes(Integervalue);@TemplateParameter.Enum(order=25,optional=true,description="Source database type, ex: mysql",enumOptions={@TemplateEnumOption("mysql"),@TemplateEnumOption("cassandra")},helpText="The type of source database to reverse replicate to.")@Default.String("mysql")StringgetSourceType();voidsetSourceType(Stringvalue);@TemplateParameter.GcsReadFile(order=26,optional=true,description="Custom transformation jar location in Cloud Storage",helpText="Custom jar location in Cloud Storage that contains the custom transformation logic for processing records"+" in reverse replication.")@Default.String("")StringgetTransformationJarPath();voidsetTransformationJarPath(Stringvalue);@TemplateParameter.Text(order=27,optional=true,description="Custom class name for transformation",helpText="Fully qualified class name having the custom transformation logic.  It is a"+" mandatory field in case transformationJarPath is specified")@Default.String("")StringgetTransformationClassName();voidsetTransformationClassName(Stringvalue);@TemplateParameter.Text(order=28,optional=true,description="Custom parameters for transformation",helpText="String containing any custom parameters to be passed to the custom transformation class.")@Default.String("")StringgetTransformationCustomParameters();voidsetTransformationCustomParameters(Stringvalue);@TemplateParameter.Text(order=29,optional=true,description="Table name overrides from spanner to source",regexes="^\\[([[:space:]]*\\{[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",example="[{Singers, Vocalists}, {Albums, Records}]",helpText="These are the table name overrides from spanner to source. They are written in the"+"following format: [{SpannerTableName1, SourceTableName1}, {SpannerTableName2, SourceTableName2}]"+"This example shows mapping Singers table to Vocalists and Albums table to Records.")@Default.String("")StringgetTableOverrides();voidsetTableOverrides(Stringvalue);@TemplateParameter.Text(order=30,optional=true,description="Column name overrides from spanner to source",regexes="^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",example="[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",helpText="These are the column name overrides from spanner to source. They are written in the"+"following format: [{SpannerTableName1.SpannerColumnName1, SpannerTableName1.SourceColumnName1}, {SpannerTableName2.SpannerColumnName1, SpannerTableName2.SourceColumnName1}]"+"Note that the SpannerTableName should remain the same in both the spanner and source pair. To override table names, use tableOverrides."+"The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")@Default.String("")StringgetColumnOverrides();voidsetColumnOverrides(Stringvalue);@TemplateParameter.GcsReadFile(order=31,optional=true,description="File based overrides from spanner to source",helpText="A file which specifies the table and the column name overrides from spanner to source.")@Default.String("")StringgetSchemaOverridesFilePath();voidsetSchemaOverridesFilePath(Stringvalue);@TemplateParameter.Text(order=32,optional=true,description="Directory name for holding filtered records",helpText="Records skipped from reverse replication are written to this directory. Default"+" directory name is skip.")@Default.String("filteredEvents")StringgetFilterEventsDirectoryName();voidsetFilterEventsDirectoryName(Stringvalue);@TemplateParameter.Boolean(order=33,optional=true,description="Boolean setting if reverse migration is sharded",helpText="Sets the template to a sharded migration. If source shard template contains more"+" than one shard, the value will be set to true. This value defaults to false.")@Default.Boolean(false)BooleangetIsShardedMigration();voidsetIsShardedMigration(Booleanvalue);@TemplateParameter.Text(order=34,optional=true,description="Failure injection parameter",helpText="Failure injection parameter. Only used for testing.")@Default.String("")StringgetFailureInjectionParameter();voidsetFailureInjectionParameter(Stringvalue);@TemplateParameter.Enum(order=35,enumOptions={@TemplateEnumOption("LOW"),@TemplateEnumOption("MEDIUM"),@TemplateEnumOption("HIGH")},optional=true,description="Priority for Spanner RPC invocations",helpText="The request priority for Cloud Spanner calls. The value must be one of:"+" [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")@Default.Enum("HIGH")RpcPrioritygetSpannerPriority();voidsetSpannerPriority(RpcPriorityvalue);}/**   * Main entry point for executing the pipeline.   *   * @param args The command-line arguments to the pipeline.   */publicstaticvoidmain(String[]args){UncaughtExceptionLogger.register();LOG.info("Starting Spanner change streams to sink");Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);options.setStreaming(true);run(options);}/**   * Runs the pipeline with the supplied options.   *   * @param options The execution parameters to the pipeline.   * @return The result of the pipeline execution.   */publicstaticPipelineResultrun(Optionsoptions){Pipelinepipeline=Pipeline.create(options);pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);// calculate the max connections per workerintmaxNumWorkers=pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers() >0?pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers():1;intconnectionPoolSizePerWorker=(int)(options.getMaxShardConnections()/maxNumWorkers);if(connectionPoolSizePerWorker <1){// This can happen when the number of workers is more than max.// This can cause overload on the source database. Error out and let the user know.LOG.error("Max workers {} is more than max shard connections {}, this can lead to more database"+" connections than desired",maxNumWorkers,options.getMaxShardConnections());thrownewIllegalArgumentException("Max Dataflow workers "+maxNumWorkers+" is more than max per shard connections: "+options.getMaxShardConnections()+" this can lead to more"+" database connections than desired. Either reduce the max allowed workers or"+" incease the max shard connections");}StringworkerMachineType=pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getWorkerMachineType();DataflowWorkerMachineTypeValidator.validateMachineSpecs(workerMachineType,4);// Prepare Spanner configSpannerConfigspannerConfig=SpannerConfig.create().withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId())).withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId())).withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())).withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));// Create shadow tables// Note that there is a limit on the number of tables that can be created per DB: 5000.// If we create shadow tables per shard, there will be an explosion of tables.// Anyway the shadow table has Spanner PK so no need to again separate by the shard// Lookup by the Spanner PK should be sufficient.// Prepare Spanner configSpannerConfigspannerMetadataConfig=SpannerConfig.create().withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId())).withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance())).withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase())).withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));// Fetch DDLs and create shadow tables in a DoFn to avoid launcher-side timeout.PCollectionTupleddlTuple=pipeline.apply("Process Information Schema",newSpannerInformationSchemaProcessorTransform(spannerConfig,spannerMetadataConfig,options.getShadowTablePrefix()));finalPCollectionView<Ddl>ddlView=ddlTuple.get(SpannerInformationSchemaProcessorTransform.MAIN_DDL_TAG).apply("View Main DDL",View.asSingleton());DataflowPipelineDebugOptionsdebugOptions=options.as(DataflowPipelineDebugOptions.class);finalPCollectionView<Ddl>shadowTableDdlView=ddlTuple.get(SpannerInformationSchemaProcessorTransform.SHADOW_TABLE_DDL_TAG).apply("View Shadow DDL",View.asSingleton());List<Shard>shards;StringshardingMode;if(MYSQL_SOURCE_TYPE.equals(options.getSourceType())){ShardFileReadershardFileReader=newShardFileReader(newSecretManagerAccessorImpl());shards=shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());shardingMode=Constants.SHARDING_MODE_MULTI_SHARD;}else{CassandraConfigFileReadercassandraConfigFileReader=newCassandraConfigFileReader();shards=cassandraConfigFileReader.getCassandraShard(options.getSourceShardsFilePath());LOG.info("Cassandra config is: {}",shards.get(0));shardingMode=Constants.SHARDING_MODE_SINGLE_SHARD;}SourceSchemasourceSchema=fetchSourceSchema(options,shards);LOG.info("Source schema: {}",sourceSchema);if(shards.size()==1 &&!options.getIsShardedMigration()){shardingMode=Constants.SHARDING_MODE_SINGLE_SHARD;Shardshard=shards.get(0);if(shard.getLogicalShardId()==null){shard.setLogicalShardId(Constants.DEFAULT_SHARD_ID);LOG.info("Logical shard id was not found, hence setting it to : "+Constants.DEFAULT_SHARD_ID);}}booleanisRegularMode="regular".equals(options.getRunMode());PCollectionTuplereconsumedElements=null;DeadLetterQueueManagerdlqManager=buildDlqManager(options);intreshuffleBucketSize=maxNumWorkers*(debugOptions.getNumberOfWorkerHarnessThreads() >0?debugOptions.getNumberOfWorkerHarnessThreads():Constants.DEFAULT_WORKER_HARNESS_THREAD_COUNT);if(isRegularMode &&(!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))){reconsumedElements=dlqManager.getReconsumerDataTransformForFiles(pipeline.apply("Read retry from PubSub",newPubSubNotifiedDlqIO(options.getDlqGcsPubSubSubscription(),// file paths to ignore when re-consuming for retrynewArrayList<String>(Arrays.asList("/severe/","/tmp_retry","/tmp_severe/",".temp","/tmp_skip/","/"+options.getSkipDirectoryName())))));}else{reconsumedElements=dlqManager.getReconsumerDataTransform(pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));}PCollection<FailsafeElement<String,String>>dlqJsonStrRecords=reconsumedElements.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));PCollection<TrimmedShardedDataChangeRecord>dlqRecords=dlqJsonStrRecords.apply("Convert DLQ records to TrimmedShardedDataChangeRecord",ParDo.of(newConvertDlqRecordToTrimmedShardedDataChangeRecordFn()));PCollection<TrimmedShardedDataChangeRecord>mergedRecords=null;if(options.getFailureInjectionParameter()!=null        &&!options.getFailureInjectionParameter().isBlank()){spannerConfig=SpannerServiceFactoryImpl.createSpannerService(spannerConfig,options.getFailureInjectionParameter());}if(isRegularMode){PCollection<TrimmedShardedDataChangeRecord>changeRecordsFromDB=pipeline.apply(getReadChangeStreamDoFn(options,spannerConfig))// This emits PCollection<DataChangeRecord> which is Spanner// change// stream data.apply("Reshuffle",Reshuffle.viaRandomKey()).apply("Filteration",ParDo.of(newFilterRecordsFn(options.getFiltrationMode()))).apply("Preprocess",ParDo.of(newPreprocessRecordsFn()));mergedRecords=PCollectionList.of(changeRecordsFromDB).and(dlqRecords).apply("Flatten",Flatten.pCollections());}else{mergedRecords=dlqRecords;}CustomTransformationcustomTransformation=CustomTransformation.builder(options.getTransformationJarPath(),options.getTransformationClassName()).setCustomParameters(options.getTransformationCustomParameters()).build();if(options.getFailureInjectionParameter()!=null        &&!options.getFailureInjectionParameter().isBlank()){spannerMetadataConfig=SpannerServiceFactoryImpl.createSpannerService(spannerMetadataConfig,options.getFailureInjectionParameter());}SourceWriterTransform.ResultsourceWriterOutput=mergedRecords.apply("AssignShardId",// This emits PCollection<KV<Long,// TrimmedShardedDataChangeRecord>> which is Spanner change stream data with key as// PK// mod// number of parallelismParDo.of(newAssignShardIdFn(spannerConfig,ddlView,sourceSchema,shardingMode,shards.get(0).getLogicalShardId(),options.getSkipDirectoryName(),options.getShardingCustomJarPath(),options.getShardingCustomClassName(),options.getShardingCustomParameters(),options.getMaxShardConnections()*shards.size(),options.getSourceType(),options.getSessionFilePath(),options.getSchemaOverridesFilePath(),options.getTableOverrides(),options.getColumnOverrides()))// currently assume that all shards accept// the// same source type.withSideInputs(ddlView)).setCoder(KvCoder.of(VarLongCoder.of(),AvroCoder.of(TrimmedShardedDataChangeRecord.class))).apply("Reshuffle2",Reshuffle.of()).apply("Write to source",newSourceWriterTransform(shards,spannerMetadataConfig,options.getSourceDbTimezoneOffset(),ddlView,shadowTableDdlView,sourceSchema,options.getShadowTablePrefix(),options.getSkipDirectoryName(),connectionPoolSizePerWorker,options.getSourceType(),customTransformation,options.getSessionFilePath(),options.getSchemaOverridesFilePath(),options.getTableOverrides(),options.getColumnOverrides()));PCollection<FailsafeElement<String,String>>dlqPermErrorRecords=reconsumedElements.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));PCollection<FailsafeElement<String,String>>permErrorsFromSourceWriter=sourceWriterOutput.permanentErrors().setCoder(StringUtf8Coder.of()).apply("Reshuffle3",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize)).apply("Convert permanent errors from source writer to DLQ format",ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn())).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));PCollection<FailsafeElement<String,String>>permanentErrors=PCollectionList.of(dlqPermErrorRecords).and(permErrorsFromSourceWriter).apply(Flatten.pCollections()).apply("Reshuffle",Reshuffle.viaRandomKey());permanentErrors.apply("Update DLQ metrics",ParDo.of(newUpdateDlqMetricsFn(isRegularMode))).apply("DLQ: Write Severe errors to GCS",MapElements.via(newStringDeadLetterQueueSanitizer())).setCoder(StringUtf8Coder.of()).apply("Write To DLQ for severe errors",DLQWriteTransform.WriteDLQ.newBuilder().withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime()).withTmpDirectory((options).getDeadLetterQueueDirectory()+"/tmp_severe/").setIncludePaneInfo(true).build());PCollection<FailsafeElement<String,String>>retryErrors=sourceWriterOutput.retryableErrors().setCoder(StringUtf8Coder.of()).apply("Reshuffle4",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize)).apply("Convert retryable errors from source writer to DLQ format",ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn())).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));retryErrors.apply("DLQ: Write retryable Failures to GCS",MapElements.via(newStringDeadLetterQueueSanitizer())).setCoder(StringUtf8Coder.of()).apply("Write To DLQ for retryable errors",DLQWriteTransform.WriteDLQ.newBuilder().withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime()).withTmpDirectory(options.getDeadLetterQueueDirectory()+"/tmp_retry/").setIncludePaneInfo(true).build());PCollection<FailsafeElement<String,String>>skippedRecords=sourceWriterOutput.skippedSourceWrites().setCoder(StringUtf8Coder.of()).apply("Reshuffle5",Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize)).apply("Convert skipped records from source writer to DLQ format",ParDo.of(newConvertChangeStreamErrorRecordToFailsafeElementFn())).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()));skippedRecords.apply("Write skipped records to GCS",MapElements.via(newStringDeadLetterQueueSanitizer())).setCoder(StringUtf8Coder.of()).apply("Writing skipped records to GCS",DLQWriteTransform.WriteDLQ.newBuilder().withDlqDirectory(options.getDeadLetterQueueDirectory()+"/"+options.getSkipDirectoryName()).withTmpDirectory(options.getDeadLetterQueueDirectory()+"/tmp_skip/").setIncludePaneInfo(true).build());returnpipeline.run();}publicstaticSpannerIO.ReadChangeStreamgetReadChangeStreamDoFn(Optionsoptions,SpannerConfigspannerConfig){TimestampstartTime=Timestamp.now();if(!options.getStartTimestamp().equals("")){startTime=Timestamp.parseTimestamp(options.getStartTimestamp());}SpannerIO.ReadChangeStreamreadChangeStreamDoFn=SpannerIO.readChangeStream().withSpannerConfig(spannerConfig).withChangeStreamName(options.getChangeStreamName()).withMetadataInstance(options.getMetadataInstance()).withMetadataDatabase(options.getMetadataDatabase()).withInclusiveStartAt(startTime).withRpcPriority(options.getSpannerPriority());if(options.getSpannerMetadataTableName()!=null        &&!options.getSpannerMetadataTableName().isEmpty()){readChangeStreamDoFn=readChangeStreamDoFn.withMetadataTable(options.getSpannerMetadataTableName());}if(!options.getEndTimestamp().equals("")){returnreadChangeStreamDoFn.withInclusiveEndAt(Timestamp.parseTimestamp(options.getEndTimestamp()));}returnreadChangeStreamDoFn;}privatestaticDeadLetterQueueManagerbuildDlqManager(Optionsoptions){StringtempLocation=options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")?options.as(DataflowPipelineOptions.class).getTempLocation():options.as(DataflowPipelineOptions.class).getTempLocation()+"/";StringdlqDirectory=options.getDeadLetterQueueDirectory().isEmpty()?tempLocation+"dlq/":options.getDeadLetterQueueDirectory();LOG.info("Dead-letter queue directory: {}",dlqDirectory);options.setDeadLetterQueueDirectory(dlqDirectory);if("regular".equals(options.getRunMode())){returnDeadLetterQueueManager.create(dlqDirectory,options.getDlqMaxRetryCount(),true);}else{StringretryDlqUri=FileSystems.matchNewResource(dlqDirectory,true).resolve("severe",StandardResolveOptions.RESOLVE_DIRECTORY).toString();LOG.info("Dead-letter retry directory: {}",retryDlqUri);returnDeadLetterQueueManager.create(dlqDirectory,retryDlqUri,0,true);}}privatestaticConnectioncreateJdbcConnection(Shardshard){try{StringsourceConnectionUrl="jdbc:mysql://"+shard.getHost()+":"+shard.getPort()+"/"+shard.getDbName();HikariConfigconfig=newHikariConfig();config.setJdbcUrl(sourceConnectionUrl);config.setUsername(shard.getUserName());config.setPassword(shard.getPassword());config.setDriverClassName("com.mysql.cj.jdbc.Driver");HikariDataSourceds=newHikariDataSource(config);returnds.getConnection();}catch(java.sql.SQLExceptione){LOG.error("Sql error while discovering mysql schema: {}",e);thrownewRuntimeException(e);}}/**   * Creates a {@link CqlSession} for the given {@link CassandraShard}.   *   * @param cassandraShard The shard containing connection details.   * @return A {@link CqlSession} instance.   */privatestaticCqlSessioncreateCqlSession(CassandraShardcassandraShard){CqlSessionBuilderbuilder=CqlSession.builder();DriverConfigLoaderconfigLoader=CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());builder.withConfigLoader(configLoader);returnbuilder.build();}privatestaticSourceSchemafetchSourceSchema(Optionsoptions,List<Shard>shards){SourceSchemaScannerscanner=null;SourceSchemasourceSchema=null;try{if(options.getSourceType().equals(MYSQL_SOURCE_TYPE)){Connectionconnection=createJdbcConnection(shards.get(0));scanner=newMySqlInformationSchemaScanner(connection,shards.get(0).getDbName());sourceSchema=scanner.scan();connection.close();}else{try(CqlSessionsession=createCqlSession((CassandraShard)shards.get(0))){scanner=newCassandraInformationSchemaScanner(session,((CassandraShard)shards.get(0)).getKeySpaceName());sourceSchema=scanner.scan();}}}catch(SQLExceptione){thrownewRuntimeException("Unable to discover jdbc schema",e);}returnsourceSchema;}}

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.