Pub/Sub to BigQuery with Python UDF template Stay organized with collections Save and categorize content based on your preferences.
The Pub/Sub to BigQuery with Python UDF template is a streaming pipeline that reads JSON-formatted messages from Pub/Sub and writes them to a BigQuery table. Optionally, you can provide a user-defined function (UDF) written in Python to process the incoming messages.
Pipeline requirements
- The BigQuery table must exist and have a schema.
- The Pub/Sub message data must use JSON format, or you must provide a UDF that converts the message data to JSON. The JSON data must match the BigQuery table schema. For example, if the JSON payloads are formatted as
{"k1":"v1", "k2":"v2"}, the BigQuery table must have two string columns namedk1andk2. - Specify the
inputSubscriptionorinputTopicparameter, but not both.
Template parameters
Required parameters
- outputTableSpec: The BigQuery table to write to, formatted as
PROJECT_ID:DATASET_NAME.TABLE_NAME.
Optional parameters
- inputTopic: The Pub/Sub topic to read from, formatted as
projects/<PROJECT_ID>/topics/<TOPIC_NAME>. - inputSubscription: The Pub/Sub subscription to read from, formatted as
projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>. - outputDeadletterTable: The BigQuery table to use for messages that failed to reach the output table, formatted as
PROJECT_ID:DATASET_NAME.TABLE_NAME. If the table doesn't exist, it is created when the pipeline runs. If this parameter is not specified, the valueOUTPUT_TABLE_SPEC_error_recordsis used instead. - useStorageWriteApiAtLeastOnce: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once semantics, set the parameter to
false. This parameter applies only whenuseStorageWriteApiistrue. The default value isfalse. - useStorageWriteApi: If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is
false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApiistrueanduseStorageWriteApiAtLeastOnceisfalse, then you must set this parameter. Defaults to: 0. - storageWriteApiTriggeringFrequencySec: When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApiistrueanduseStorageWriteApiAtLeastOnceisfalse, then you must set this parameter. - pythonExternalTextTransformGcsPath: The Cloud Storage path pattern for the Python code containing your user-defined functions. For example,
gs://your-bucket/your-function.py. - pythonExternalTextTransformFunctionName: The name of the function to call from your Python file. Use only letters, digits, and underscores. For example,
'transform' or 'transform_udf1'.
User-defined function
Optionally, you can extend this template by writing a user-defined function(UDF). The template calls the UDF for each input element. Element payloads areserialized as JSON strings. For more information, seeCreateuser-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
Run the template
Console
- Go to the DataflowCreate job from template page. Go to Create job from template
- In theJob name field, enter a unique job name.
- Optional: ForRegional endpoint, select a value from the drop-down menu. The default region is
us-central1.For a list of regions where you can run a Dataflow job, seeDataflow locations.
- From theDataflow template drop-down menu, select thePub/Sub to BigQuery with Python UDF template.
- In the provided parameter fields, enter your parameter values.
- Optional: To switch from exactly-once processing toat-least-once streaming mode, selectAt Least Once.
- ClickRun job.
gcloud
Note: To use the Google Cloud CLI to run flex templates, you must haveGoogle Cloud CLI version 284.0.0 or later.In your shell or terminal, run the template:
gclouddataflowflex-templaterunJOB_NAME\--template-file-gcs-locationgs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang\--regionREGION_NAME\--staging-locationSTAGING_LOCATION\--parameters\inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME
Replace the following:
JOB_NAME: a unique job name of your choiceREGION_NAME: theregion where you want todeploy your Dataflow job—for example,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in thenon-dated parent folder in the bucket—gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket—gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION: the location for staging local files (for example,gs://your-bucket/staging)TOPIC_NAME: your Pub/Sub topic nameDATASET: your BigQuery datasetTABLE_NAME: your BigQuery table name
API
To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, seeprojects.templates.launch.
POSThttps://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch{"launch_parameter":{"jobName":"JOB_NAME","parameters":{"inputTopic":"projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME","outputTableSpec":"PROJECT_ID:DATASET.TABLE_NAME"},"containerSpecGcsPath":"gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",}}
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceLOCATION: theregion where you want todeploy your Dataflow job—for example,us-central1VERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in thenon-dated parent folder in the bucket—gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket—gs://dataflow-templates-REGION_NAME/
STAGING_LOCATION: the location for staging local files (for example,gs://your-bucket/staging)TOPIC_NAME: your Pub/Sub topic nameDATASET: your BigQuery datasetTABLE_NAME: your BigQuery table name
Template source code
Java
/* * Copyright (C) 2018 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */packagecom.google.cloud.teleport.v2.templates;import staticcom.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;importcom.google.api.services.bigquery.model.TableRow;importcom.google.cloud.teleport.metadata.MultiTemplate;importcom.google.cloud.teleport.metadata.Template;importcom.google.cloud.teleport.metadata.TemplateCategory;importcom.google.cloud.teleport.metadata.TemplateParameter;importcom.google.cloud.teleport.v2.coders.FailsafeElementCoder;importcom.google.cloud.teleport.v2.common.UncaughtExceptionLogger;importcom.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;importcom.google.cloud.teleport.v2.templates.PubSubToBigQuery.Options;importcom.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;importcom.google.cloud.teleport.v2.transforms.ErrorConverters;importcom.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;importcom.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;importcom.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;importcom.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToPubSubFailsafeElementFn;importcom.google.cloud.teleport.v2.utils.BigQueryIOUtils;importcom.google.cloud.teleport.v2.utils.ResourceUtils;importcom.google.cloud.teleport.v2.values.FailsafeElement;importcom.google.common.base.Strings;importcom.google.common.collect.ImmutableList;importjava.nio.charset.StandardCharsets;importorg.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.coders.CoderRegistry;importorg.apache.beam.sdk.coders.StringUtf8Coder;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;importorg.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;importorg.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;importorg.apache.beam.sdk.io.gcp.bigquery.WriteResult;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubIO;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;importorg.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.DoFn;importorg.apache.beam.sdk.transforms.Flatten;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.transforms.PTransform;importorg.apache.beam.sdk.transforms.ParDo;importorg.apache.beam.sdk.values.PCollection;importorg.apache.beam.sdk.values.PCollectionList;importorg.apache.beam.sdk.values.PCollectionTuple;importorg.apache.beam.sdk.values.Row;importorg.apache.beam.sdk.values.TupleTag;importorg.apache.beam.sdk.values.TupleTagList;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/** * The {@link PubSubToBigQuery} pipeline is a streaming pipeline which ingests data in JSON format * from Cloud Pub/Sub, executes a UDF, and outputs the resulting records to BigQuery. Any errors * which occur in the transformation of the data or execution of the UDF will be output to a * separate errors table in BigQuery. The errors table will be created if it does not exist prior to * execution. Both output and error tables are specified by the user as template parameters. * * <p><b>Pipeline Requirements</b> * * <ul> * <li>The Pub/Sub topic exists. * <li>The BigQuery output table exists. * </ul> * * <p>Check out <a * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_PubSub_to_BigQuery_Flex.md">README</a> * for instructions on how to use or modify this template. */@MultiTemplate({@Template(name="PubSub_to_BigQuery_Flex",category=TemplateCategory.STREAMING,displayName="Pub/Sub to BigQuery",description="The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "+"You can use the template as a quick solution to move Pub/Sub data to BigQuery. "+"The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",optionsClass=Options.class,skipOptions={"pythonExternalTextTransformGcsPath","pythonExternalTextTransformFunctionName",},flexContainerName="googlecloud-to-googlecloud",documentation="https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",contactInformation="https://cloud.google.com/support",requirements={"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.","The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."},streaming=true,supportsAtLeastOnce=true,supportsExactlyOnce=true),@Template(name="PubSub_to_BigQuery_Xlang",category=TemplateCategory.STREAMING,displayName="Pub/Sub to BigQuery with Python UDFs",type=Template.TemplateType.XLANG,description="The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic or subscription, and writes them to a BigQuery table. "+"You can use the template as a quick solution to move Pub/Sub data to BigQuery. "+"The template reads JSON-formatted messages from Pub/Sub and converts them to BigQuery elements.",optionsClass=Options.class,skipOptions={"javascriptTextTransformGcsPath","javascriptTextTransformFunctionName","javascriptTextTransformReloadIntervalMinutes"},flexContainerName="googlecloud-to-googlecloud-xlang",documentation="https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-bigquery",contactInformation="https://cloud.google.com/support",requirements={"The <a href=\"https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\">`data` field</a> of Pub/Sub messages must use the JSON format, described in this <a href=\"https://developers.google.com/api-client-library/java/google-http-java-client/json\">JSON guide</a>. For example, messages with values in the `data` field formatted as `{\"k1\":\"v1\", \"k2\":\"v2\"}` can be inserted into a BigQuery table with two columns, named `k1` and `k2`, with a string data type.","The output table must exist prior to running the pipeline. The table schema must match the input JSON objects."},streaming=true,supportsAtLeastOnce=true,supportsExactlyOnce=true)})publicclassPubSubToBigQuery{/** The log to output status messages to. */privatestaticfinalLoggerLOG=LoggerFactory.getLogger(PubSubToBigQuery.class);/** The tag for the main output for the UDF. */publicstaticfinalTupleTag<FailsafeElement<PubsubMessage,String>>UDF_OUT=newTupleTag<FailsafeElement<PubsubMessage,String>>(){};/** The tag for the main output of the json transformation. */publicstaticfinalTupleTag<TableRow>TRANSFORM_OUT=newTupleTag<TableRow>(){};/** The tag for the dead-letter output of the udf. */publicstaticfinalTupleTag<FailsafeElement<PubsubMessage,String>>UDF_DEADLETTER_OUT=newTupleTag<FailsafeElement<PubsubMessage,String>>(){};/** The tag for the dead-letter output of the json to table row transform. */publicstaticfinalTupleTag<FailsafeElement<PubsubMessage,String>>TRANSFORM_DEADLETTER_OUT=newTupleTag<FailsafeElement<PubsubMessage,String>>(){};/** The default suffix for error tables if dead letter table is not specified. */publicstaticfinalStringDEFAULT_DEADLETTER_TABLE_SUFFIX="_error_records";/** Pubsub message/string coder for pipeline. */publicstaticfinalFailsafeElementCoder<PubsubMessage,String>CODER=FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(),StringUtf8Coder.of());/** String/String Coder for FailsafeElement. */publicstaticfinalFailsafeElementCoder<String,String>FAILSAFE_ELEMENT_CODER=FailsafeElementCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of());/** * The {@link Options} class provides the custom execution options passed by the executor at the * command-line. */publicinterfaceOptionsextendsPipelineOptions,BigQueryStorageApiStreamingOptions,PythonExternalTextTransformerOptions,DataflowPipelineWorkerPoolOptions{@TemplateParameter.BigQueryTable(order=1,groupName="Target",description="BigQuery output table",helpText="The BigQuery table to write to, formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`.")StringgetOutputTableSpec();voidsetOutputTableSpec(Stringvalue);@TemplateParameter.PubsubTopic(order=2,groupName="Source",optional=true,description="Input Pub/Sub topic",helpText="The Pub/Sub topic to read from, formatted as `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")StringgetInputTopic();voidsetInputTopic(Stringvalue);@TemplateParameter.PubsubSubscription(order=3,groupName="Source",optional=true,description="Pub/Sub input subscription",helpText="The Pub/Sub subscription to read from, "+"formatted as `projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>`.")StringgetInputSubscription();voidsetInputSubscription(Stringvalue);@TemplateParameter.BigQueryTable(order=4,optional=true,description="Table for messages failed to reach the output table (i.e., Deadletter table)",helpText="The BigQuery table to use for messages that failed to reach the output table, "+"formatted as `PROJECT_ID:DATASET_NAME.TABLE_NAME`. If the table "+"doesn't exist, it is created when the pipeline runs. "+"If this parameter is not specified, "+"the value `OUTPUT_TABLE_SPEC_error_records` is used instead.")StringgetOutputDeadletterTable();voidsetOutputDeadletterTable(Stringvalue);@TemplateParameter.Boolean(order=5,optional=true,parentName="useStorageWriteApi",parentTriggerValues={"true"},description="Use at at-least-once semantics in BigQuery Storage Write API",helpText="When using the Storage Write API, specifies the write semantics. "+"To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)"+", set this parameter to true. "+"To use exactly-once semantics, set the parameter to `false`. "+"This parameter applies only when `useStorageWriteApi` is `true`. "+"The default value is `false`.")@Default.Boolean(false)@OverrideBooleangetUseStorageWriteApiAtLeastOnce();voidsetUseStorageWriteApiAtLeastOnce(Booleanvalue);}/** * The main entry-point for pipeline execution. This method will start the pipeline but will not * wait for it's execution to finish. If blocking execution is required, use the {@link * PubSubToBigQuery#run(Options)} method to start the pipeline and invoke {@code * result.waitUntilFinish()} on the {@link PipelineResult}. * * @param args The command-line args passed by the executor. */publicstaticvoidmain(String[]args){UncaughtExceptionLogger.register();Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);// options.setWorkerDiskType(//// "compute.googleapis.com/projects/cloud-teleport-testing/zones/us-central1-a/diskTypes/t2a-test");run(options);}/** * Runs the pipeline to completion with the specified options. This method does not wait until the * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result * object to block until the pipeline is finished running if blocking programmatic execution is * required. * * @param options The execution options. * @return The pipeline result. */publicstaticPipelineResultrun(Optionsoptions){booleanuseInputSubscription=!Strings.isNullOrEmpty(options.getInputSubscription());booleanuseInputTopic=!Strings.isNullOrEmpty(options.getInputTopic());if(useInputSubscription==useInputTopic){thrownewIllegalArgumentException("Either input topic or input subscription must be provided, but not both.");}Pipelinepipeline=Pipeline.create(options);CoderRegistrycoderRegistry=pipeline.getCoderRegistry();coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(),CODER);/* * Steps: * 1) Read messages in from Pub/Sub * 2) Transform the PubsubMessages into TableRows * - Transform message payload via UDF * - Convert UDF result to TableRow objects * 3) Write successful records out to BigQuery * 4) Write failed records out to BigQuery *//* * Step #1: Read messages in from Pub/Sub * Either from a Subscription or Topic */PCollection<PubsubMessage>messages=null;if(useInputSubscription){messages=pipeline.apply("ReadPubSubSubscription",PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()));}else{messages=pipeline.apply("ReadPubSubTopic",PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));}PCollectionTupleconvertedTableRows=messages/* * Step #2: Transform the PubsubMessages into TableRows */.apply("ConvertMessageToTableRow",newPubsubMessageToTableRow(options));/* * Step #3: Write the successful records out to BigQuery */WriteResultwriteResult=convertedTableRows.get(TRANSFORM_OUT).apply("WriteSuccessfulRecords",BigQueryIO.writeTableRows().withoutValidation().withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WriteDisposition.WRITE_APPEND).withExtendedErrorInfo().withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).to(options.getOutputTableSpec()));/* * Step 3 Contd. * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement */PCollection<FailsafeElement<String,String>>failedInserts=BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult,options).apply("WrapInsertionErrors",MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()).via((BigQueryInsertErrore)->wrapBigQueryInsertError(e))).setCoder(FAILSAFE_ELEMENT_CODER);/* * Step #4: Write records that failed table row transformation * or conversion out to BigQuery deadletter table. */PCollectionList.of(ImmutableList.of(convertedTableRows.get(UDF_DEADLETTER_OUT),convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))).apply("Flatten",Flatten.pCollections()).apply("WriteFailedRecords",ErrorConverters.WritePubsubMessageErrors.newBuilder().setErrorRecordsTable(!Strings.isNullOrEmpty(options.getOutputDeadletterTable())?options.getOutputDeadletterTable():options.getOutputTableSpec()+DEFAULT_DEADLETTER_TABLE_SUFFIX).setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson()).build());// 5) Insert records that failed insert into deadletter tablefailedInserts.apply("WriteFailedRecords",ErrorConverters.WriteStringMessageErrors.newBuilder().setErrorRecordsTable(!Strings.isNullOrEmpty(options.getOutputDeadletterTable())?options.getOutputDeadletterTable():options.getOutputTableSpec()+DEFAULT_DEADLETTER_TABLE_SUFFIX).setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson()).build());returnpipeline.run();}/** * The {@link PubsubMessageToTableRow} class is a {@link PTransform} which transforms incoming * {@link PubsubMessage} objects into {@link TableRow} objects for insertion into BigQuery while * applying an optional UDF to the input. The executions of the UDF and transformation to {@link * TableRow} objects is done in a fail-safe way by wrapping the element with it's original payload * inside the {@link FailsafeElement} class. The {@link PubsubMessageToTableRow} transform will * output a {@link PCollectionTuple} which contains all output and dead-letter {@link * PCollection}. * * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}: * * <ul> * <li>{@link PubSubToBigQuery#UDF_OUT} - Contains all {@link FailsafeElement} records * successfully processed by the optional UDF. * <li>{@link PubSubToBigQuery#UDF_DEADLETTER_OUT} - Contains all {@link FailsafeElement} * records which failed processing during the UDF execution. * <li>{@link PubSubToBigQuery#TRANSFORM_OUT} - Contains all records successfully converted from * JSON to {@link TableRow} objects. * <li>{@link PubSubToBigQuery#TRANSFORM_DEADLETTER_OUT} - Contains all {@link FailsafeElement} * records which couldn't be converted to table rows. * </ul> */staticclassPubsubMessageToTableRowextendsPTransform<PCollection<PubsubMessage>,PCollectionTuple>{privatefinalOptionsoptions;PubsubMessageToTableRow(Optionsoptions){this.options=options;}@OverridepublicPCollectionTupleexpand(PCollection<PubsubMessage>input){booleanuseJavascriptUdf=!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());booleanusePythonUdf=!Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());if(useJavascriptUdf &&usePythonUdf){thrownewIllegalArgumentException("Either javascript or Python gcs path must be provided, but not both.");}PCollectionTupleudfOut;if(usePythonUdf){PCollection<Row>udfRowsOut=input// Map the incoming messages into FailsafeElements so we can recover from failures// across multiple transforms..apply("MapToRecord",PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.pubSubMappingFunction()).setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA).apply("InvokeUDF",PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder().setFileSystemPath(options.getPythonExternalTextTransformGcsPath()).setFunctionName(options.getPythonExternalTextTransformFunctionName()).build());udfOut=udfRowsOut.apply("MapRowsToFailsafeElements",ParDo.of(newRowToPubSubFailsafeElementFn(UDF_OUT,UDF_DEADLETTER_OUT)).withOutputTags(UDF_OUT,TupleTagList.of(UDF_DEADLETTER_OUT)));}else{udfOut=input// Map the incoming messages into FailsafeElements so we can recover from failures// across multiple transforms..apply("MapToRecord",ParDo.of(newPubsubMessageToFailsafeElementFn())).apply("InvokeUDF",FailsafeJavascriptUdf.<PubsubMessage>newBuilder().setFileSystemPath(options.getJavascriptTextTransformGcsPath()).setFunctionName(options.getJavascriptTextTransformFunctionName()).setReloadIntervalMinutes(options.getJavascriptTextTransformReloadIntervalMinutes()).setSuccessTag(UDF_OUT).setFailureTag(UDF_DEADLETTER_OUT).build());}// Convert the records which were successfully processed by the UDF into TableRow objects.PCollectionTuplejsonToTableRowOut=udfOut.get(UDF_OUT).apply("JsonToTableRow",FailsafeJsonToTableRow.<PubsubMessage>newBuilder().setSuccessTag(TRANSFORM_OUT).setFailureTag(TRANSFORM_DEADLETTER_OUT).build());// Re-wrap the PCollections so we can return a single PCollectionTuplereturnPCollectionTuple.of(UDF_OUT,udfOut.get(UDF_OUT)).and(UDF_DEADLETTER_OUT,udfOut.get(UDF_DEADLETTER_OUT)).and(TRANSFORM_OUT,jsonToTableRowOut.get(TRANSFORM_OUT)).and(TRANSFORM_DEADLETTER_OUT,jsonToTableRowOut.get(TRANSFORM_DEADLETTER_OUT));}}/** * The {@link PubsubMessageToFailsafeElementFn} wraps an incoming {@link PubsubMessage} with the * {@link FailsafeElement} class so errors can be recovered from and the original message can be * output to a error records table. */staticclassPubsubMessageToFailsafeElementFnextendsDoFn<PubsubMessage,FailsafeElement<PubsubMessage,String>>{@ProcessElementpublicvoidprocessElement(ProcessContextcontext){PubsubMessagemessage=context.element();context.output(FailsafeElement.of(message,newString(message.getPayload(),StandardCharsets.UTF_8)));}}}What's next
- Learn aboutDataflow templates.
- See the list ofGoogle-provided templates.
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.