SQL Server to BigQuery template

The SQL Server to BigQuery template is a batch pipeline that copies data from a SQL Server table into an existing BigQuery table. This pipeline uses JDBC to connect to SQL Server. For an extra layer of protection, you can also pass in a Cloud KMS key along with Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. For more information about encrypting your username, password, and connection string parameters, see theCloud KMS API encryption endpoint.

Pipeline requirements

  • The BigQuery table must exist before pipeline execution.
  • The BigQuery table must have a compatible schema.
  • The relational database must be accessible from the subnet where Dataflow runs.

Template parameters

Required parameters

  • connectionURL: The JDBC connection URL string. Can be passed in as a string that's Base64-encoded and then encrypted with a Cloud KMS key, or can be a Secret Manager secret in the form projects/{project}/secrets/{secret}/versions/{secret_version}. For example,jdbc:sqlserver://localhost;databaseName=sampledb.
  • outputTable: The BigQuery output table location. For example,<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: The temporary directory for the BigQuery loading process. For example,gs://your-bucket/your-files/temp_dir.

Optional parameters

  • connectionProperties: The properties string to use for the JDBC connection. The format of the string must be[propertyName=property;]*.For more information, see Configuration Properties (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) in the MySQL documentation. For example,unicode=true;characterEncoding=UTF-8.
  • username: The username to use for the JDBC connection. Can be passed in as a string that's encrypted with a Cloud KMS key, or can be a Secret Manager secret in the form projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password: The password to use for the JDBC connection. Can be passed in as a string that's encrypted with a Cloud KMS key, or can be a Secret Manager secret in the form projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query: The query to run on the source to extract the data. Note that some JDBC SQL and BigQuery types, although sharing the same name, have some differences. Some important SQL -> BigQuery type mappings to keep in mind areDATETIME --> TIMESTAMP. Type casting may be required if your schemas do not match. For example,select * from sampledb.sample_table.
  • KMSEncryptionKey: The Cloud KMS encryption key to use to decrypt the username, password, and connection string. If you pass in a Cloud KMS key, you must also encrypt the username, password, and connection string. For example,projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • useColumnAlias: If set totrue, the pipeline uses the column alias (AS) instead of the column name to map the rows to BigQuery. Defaults tofalse.
  • isTruncate: If set totrue, the pipeline truncates before loading data into BigQuery. Defaults tofalse, which causes the pipeline to append data.
  • partitionColumn: IfpartitionColumn is specified along with thetable, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, supportsLong andDateTime partition columns. Pass the column type throughpartitionColumnType.
  • partitionColumnType: The type of thepartitionColumn, accepts eitherlong ordatetime. Defaults to: long.
  • table: The table to read from when using partitions. This parameter also accepts a subquery in parentheses. For example,(select id, name from Person) as subq.
  • numPartitions: The number of partitions. With the lower and upper bound, this value forms partition strides for generatedWHERE clause expressions that are used to split the partition column evenly. When the input is less than1, the number is set to1.
  • lowerBound: The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.datetime partitionColumnType accepts lower bound in the formatyyyy-MM-dd HH:mm:ss.SSSZ. For example,2024-02-20 07:55:45.000+03:30.
  • upperBound: The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.datetime partitionColumnType accepts upper bound in the formatyyyy-MM-dd HH:mm:ss.SSSZ. For example,2024-02-20 07:55:45.000+03:30.
  • fetchSize: The number of rows to be fetched from database at a time. Not used for partitioned reads. Defaults to: 50000.
  • createDisposition: The BigQuery CreateDisposition to use. For example,CREATE_IF_NEEDED orCREATE_NEVER. Defaults to: CREATE_NEVER.
  • bigQuerySchemaPath: The Cloud Storage path for the BigQuery JSON schema. IfcreateDisposition is set toCREATE_IF_NEEDED, this parameter must be specified. For example,gs://your-bucket/your-schema.json.
  • outputDeadletterTable: The BigQuery table to use for messages that failed to reach the output table, formatted as"PROJECT_ID:DATASET_NAME.TABLE_NAME". If the table doesn't exist, it is created when the pipeline runs. If this parameter is not specified, the pipeline will fail on write errors.This parameter can only be specified ifuseStorageWriteApi oruseStorageWriteApiAtLeastOnce is set to true.
  • disabledAlgorithms: Comma separated algorithms to disable. If this value is set tonone, no algorithm is disabled. Use this parameter with caution, because the algorithms disabled by default might have vulnerabilities or performance issues. For example,SSLv3, RC4.
  • extraFilesToStage: Comma separated Cloud Storage paths or Secret Manager secrets for files to stage in the worker. These files are saved in the /extra_files directory in each worker. For example,gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>.
  • useStorageWriteApi: Iftrue, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value isfalse. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter totrue. To use exactly-once semantics, set the parameter tofalse. This parameter applies only whenuseStorageWriteApi istrue. The default value isfalse.

Run the template

Console

  1. Go to the DataflowCreate job from template page.
  2. Go to Create job from template
  3. In theJob name field, enter a unique job name.
  4. Optional: ForRegional endpoint, select a value from the drop-down menu. The default region isus-central1.

    For a list of regions where you can run a Dataflow job, seeDataflow locations.

  5. From theDataflow template drop-down menu, select theSQL Server to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. ClickRun job.

gcloud

Note: To use the Google Cloud CLI to run flex templates, you must haveGoogle Cloud CLI version 284.0.0 or later.

In your shell or terminal, run the template:

gclouddataflowflex-templaterunJOB_NAME\--project=PROJECT_ID\--region=REGION_NAME\--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/SQLServer_to_BigQuery\--parameters\connectionURL=JDBC_CONNECTION_URL,\query=SOURCE_SQL_QUERY,\outputTable=PROJECT_ID:DATASET.TABLE_NAME,bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\connectionProperties=CONNECTION_PROPERTIES,\username=CONNECTION_USERNAME,\password=CONNECTION_PASSWORD,\KMSEncryptionKey=KMS_ENCRYPTION_KEY

Replace the following:

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, seeprojects.templates.launch.

POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch{"launchParameter":{"jobName":"JOB_NAME","containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/SQLServer_to_BigQuery","parameters":{"connectionURL":"JDBC_CONNECTION_URL","query":"SOURCE_SQL_QUERY","outputTable":"PROJECT_ID:DATASET.TABLE_NAME","bigQueryLoadingTemporaryDirectory":"PATH_TO_TEMP_DIR_ON_GCS","connectionProperties":"CONNECTION_PROPERTIES","username":"CONNECTION_USERNAME","password":"CONNECTION_PASSWORD","KMSEncryptionKey":"KMS_ENCRYPTION_KEY"},"environment":{"zone":"us-central1-f"}}}

Replace the following:

Template source code

Java

/* * Copyright (C) 2018 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */packagecom.google.cloud.teleport.v2.templates;import staticcom.google.cloud.teleport.v2.transforms.BigQueryConverters.wrapBigQueryInsertError;import staticcom.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;import staticcom.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;importcom.google.api.services.bigquery.model.ErrorProto;importcom.google.api.services.bigquery.model.TableRow;importcom.google.cloud.teleport.metadata.Template;importcom.google.cloud.teleport.metadata.TemplateCategory;importcom.google.cloud.teleport.v2.coders.FailsafeElementCoder;importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;importcom.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;importcom.google.cloud.teleport.v2.transforms.ErrorConverters;importcom.google.cloud.teleport.v2.utils.BigQueryIOUtils;importcom.google.cloud.teleport.v2.utils.GCSAwareValueProvider;importcom.google.cloud.teleport.v2.utils.JdbcConverters;importcom.google.cloud.teleport.v2.utils.ResourceUtils;importcom.google.cloud.teleport.v2.utils.SecretManagerUtils;importcom.google.cloud.teleport.v2.values.FailsafeElement;importcom.google.common.annotations.VisibleForTesting;importcom.google.common.base.Strings;importjava.util.List;importjava.util.Objects;importjava.util.stream.Collectors;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.coders.StringUtf8Coder;importorg.apache.beam.sdk.io.FileSystems;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;importorg.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;importorg.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;importorg.apache.beam.sdk.io.gcp.bigquery.WriteResult;importorg.apache.beam.sdk.io.jdbc.JdbcIO;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.ValueProvider.StaticValueProvider;importorg.apache.beam.sdk.transforms.DoFn;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.transforms.ParDo;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.TypeDescriptor;importorg.apache.beam.sdk.values.TypeDescriptors;importorg.joda.time.DateTime;importorg.joda.time.format.DateTimeFormat;importorg.joda.time.format.DateTimeFormatter;/** * A template that copies data from a relational database using JDBC to an existing BigQuery table. * * <p>Check out <a * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/jdbc-and-googlecloud/README_Jdbc_to_BigQuery_Flex.md">README</a> * for instructions on how to use or modify this template. */@Template(name="Jdbc_to_BigQuery_Flex",category=TemplateCategory.BATCH,displayName="JDBC to BigQuery with BigQuery Storage API support",description={"The JDBC to BigQuery template is a batch pipeline that copies data from a relational database table into an existing BigQuery table. "+"This pipeline uses JDBC to connect to the relational database. You can use this template to copy data from any relational database with available JDBC drivers into BigQuery.","For an extra layer of protection, you can also pass in a Cloud KMS key along with a Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. "+"See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your username, password, and connection string parameters."},optionsClass=JdbcToBigQueryOptions.class,flexContainerName="jdbc-and-googlecloud",documentation="https://cloud.google.com/dataflow/docs/guides/templates/provided/jdbc-to-bigquery",contactInformation="https://cloud.google.com/support",preview=true,requirements={"The JDBC drivers for the relational database must be available.","If BigQuery table already exist before pipeline execution, it must have a compatible schema.","The relational database must be accessible from the subnet where Dataflow runs."})publicclassJdbcToBigQuery{/** Coder for FailsafeElement. */privatestaticfinalFailsafeElementCoder<String,String>FAILSAFE_ELEMENT_CODER=FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());/**   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If   * blocking execution is required, use the {@link JdbcToBigQuery#run} method to start the pipeline   * and invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.   *   * @param args The command-line arguments to the pipeline.   */publicstaticvoidmain(String[]args){UncaughtExceptionLogger.register();// Parse the user options passed from the command-lineJdbcToBigQueryOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(JdbcToBigQueryOptions.class);run(options,writeToBQTransform(options));}/**   * Create the pipeline with the supplied options.   *   * @param options The execution parameters to the pipeline.   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.   * @return The result of the pipeline execution.   */@VisibleForTestingstaticPipelineResultrun(JdbcToBigQueryOptionsoptions,Write<TableRow>writeToBQ){// Validate BQ STORAGE_WRITE_API optionsBigQueryIOUtils.validateBQStorageApiOptionsBatch(options);if(!options.getUseStorageWriteApi()        &&!options.getUseStorageWriteApiAtLeastOnce()        &&!Strings.isNullOrEmpty(options.getOutputDeadletterTable())){thrownewIllegalArgumentException("outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");}// Create the pipelinePipelinepipeline=Pipeline.create(options);/*     * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper     *        2) Append TableRow to BigQuery via BigQueryIO     */JdbcIO.DataSourceConfigurationdataSourceConfiguration=JdbcIO.DataSourceConfiguration.create(StaticValueProvider.of(options.getDriverClassName()),maybeDecrypt(maybeParseSecret(options.getConnectionURL()),options.getKMSEncryptionKey())).withUsername(maybeDecrypt(maybeParseSecret(options.getUsername()),options.getKMSEncryptionKey())).withPassword(maybeDecrypt(maybeParseSecret(options.getPassword()),options.getKMSEncryptionKey()));if(options.getDriverJars()!=null){dataSourceConfiguration=dataSourceConfiguration.withDriverJars(options.getDriverJars());}if(options.getConnectionProperties()!=null){dataSourceConfiguration=dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());}/*     * Step 1: Read records via JDBC and convert to TableRow     *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}     */PCollection<TableRow>rows;if(options.getPartitionColumn()!=null &&options.getTable()!=null){// Read with PartitionsJdbcIO.ReadWithPartitions<TableRow,?>readIO=null;finalStringpartitionColumnType=options.getPartitionColumnType();if(partitionColumnType==null||"long".equals(partitionColumnType)){JdbcIO.ReadWithPartitions<TableRow,Long>longTypeReadIO=JdbcIO.<TableRow,Long>readWithPartitions(TypeDescriptors.longs()).withDataSourceConfiguration(dataSourceConfiguration).withTable(options.getTable()).withPartitionColumn(options.getPartitionColumn()).withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));if(options.getLowerBound()!=null &&options.getUpperBound()!=null){// Check if lower bound and upper bound are long type.try{longTypeReadIO=longTypeReadIO.withLowerBound(Long.valueOf(options.getLowerBound())).withUpperBound(Long.valueOf(options.getUpperBound()));}catch(NumberFormatExceptione){thrownewNumberFormatException("Expected Long values for lowerBound and upperBound, received : "+e.getMessage());}}readIO=longTypeReadIO;}elseif("datetime".equals(partitionColumnType)){JdbcIO.ReadWithPartitions<TableRow,DateTime>dateTimeReadIO=JdbcIO.<TableRow,DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class)).withDataSourceConfiguration(dataSourceConfiguration).withTable(options.getTable()).withPartitionColumn(options.getPartitionColumn()).withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));if(options.getLowerBound()!=null &&options.getUpperBound()!=null){DateTimeFormatterdateFormatter=DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();// Check if lowerBound and upperBound are DateTime type.try{dateTimeReadIO=dateTimeReadIO.withLowerBound(dateFormatter.parseDateTime(options.getLowerBound())).withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));}catch(IllegalArgumentExceptione){thrownewIllegalArgumentException("Expected DateTime values in the format for lowerBound and upperBound, received : "+e.getMessage());}}readIO=dateTimeReadIO;}else{thrownewIllegalStateException("Received unsupported partitionColumnType.");}if(options.getNumPartitions()!=null){readIO=readIO.withNumPartitions(options.getNumPartitions());}if(options.getFetchSize()!=null &&options.getFetchSize() >0){readIO=readIO.withFetchSize(options.getFetchSize());}rows=pipeline.apply("Read from JDBC with Partitions",readIO);}else{if(options.getQuery()==null){thrownewIllegalArgumentException("Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");}JdbcIO.Read<TableRow>readIO=JdbcIO.<TableRow>read().withDataSourceConfiguration(dataSourceConfiguration).withQuery(newGCSAwareValueProvider(options.getQuery())).withCoder(TableRowJsonCoder.of()).withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));if(options.getFetchSize()!=null &&options.getFetchSize() >0){readIO=readIO.withFetchSize(options.getFetchSize());}rows=pipeline.apply("Read from JdbcIO",readIO);}/*     * Step 2: Append TableRow to an existing BigQuery table     */WriteResultwriteResult=rows.apply("Write to BigQuery",writeToBQ);/*     * Step 3.     * If using Storage Write API, capture failed inserts and either     *   a) write error rows to DLQ     *   b) fail the pipeline     */if(options.getUseStorageWriteApi()||options.getUseStorageWriteApiAtLeastOnce()){PCollection<BigQueryInsertError>insertErrors=BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult,options);if(!Strings.isNullOrEmpty(options.getOutputDeadletterTable())){/*         * Step 3a.         * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement         */PCollection<FailsafeElement<String,String>>failedInserts=insertErrors.apply("WrapInsertionErrors",MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()).via((BigQueryInsertErrore)->wrapBigQueryInsertError(e))).setCoder(FAILSAFE_ELEMENT_CODER);/*         * Step 3a Contd.         * Insert records that failed insert into deadletter table         */failedInserts.apply("WriteFailedRecords",ErrorConverters.WriteStringMessageErrors.newBuilder().setErrorRecordsTable(options.getOutputDeadletterTable()).setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson()).setUseWindowedTimestamp(false).build());}else{/*         * Step 3b.         * Fail pipeline upon write errors if no DLQ was specified         */insertErrors.apply(ParDo.of(newThrowWriteErrorsDoFn()));}}// Execute the pipeline and return the result.returnpipeline.run();}staticclassThrowWriteErrorsDoFnextendsDoFn<BigQueryInsertError,Void>{@ProcessElementpublicvoidprocessElement(ProcessContextc){BigQueryInsertErrorinsertError=Objects.requireNonNull(c.element());List<String>errorMessages=insertError.getError().getErrors().stream().map(ErrorProto::getMessage).collect(Collectors.toList());StringstackTrace=String.join("\nCaused by:",errorMessages);thrownewIllegalStateException(String.format("Failed to insert row %s.\nCaused by: %s",insertError.getRow(),stackTrace));}}/**   * Create the {@link Write} transform that outputs the collection to BigQuery as per input option.   */@VisibleForTestingstaticWrite<TableRow>writeToBQTransform(JdbcToBigQueryOptionsoptions){// Needed for loading GCS filesystem before Pipeline.Create callFileSystems.setDefaultPipelineOptions(options);Write<TableRow>write=BigQueryIO.writeTableRows().withoutValidation().withCreateDisposition(Write.CreateDisposition.valueOf(options.getCreateDisposition())).withWriteDisposition(options.getIsTruncate()?Write.WriteDisposition.WRITE_TRUNCATE:Write.WriteDisposition.WRITE_APPEND).withCustomGcsTempLocation(StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())).withExtendedErrorInfo().to(options.getOutputTable());if(Write.CreateDisposition.valueOf(options.getCreateDisposition())!=Write.CreateDisposition.CREATE_NEVER){write=write.withJsonSchema(getGcsFileAsString(options.getBigQuerySchemaPath()));}if(options.getUseStorageWriteApi()||options.getUseStorageWriteApiAtLeastOnce()){write=write.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());}returnwrite;}/**   * Retrieves a secret value from SecretManagerUtils if the input string matches the specified   * pattern.   *   * @param secret The input string representing a potential secret.   * @return The secret value if the input matches the pattern and the secret is found, otherwise   *     the original input string.   */privatestaticStringmaybeParseSecret(Stringsecret){// Check if the input string is not null.if(secret!=null){// Check if the input string matches the pattern for secrets stored in SecretManagerUtils.if(secret.matches("projects/.*/secrets/.*/versions/.*")){// Use .* to match any characters// Retrieve the secret value from SecretManagerUtils.returnSecretManagerUtils.getSecret(secret);}}// If the input is null or doesn't match the pattern, return the original input.returnsecret;}}

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.