Import metadata from a custom source using Workflows Stay organized with collections Save and categorize content based on your preferences.
This document describes how to import metadata from third-party sources intoDataplex Universal Catalog by setting up and running a managed connectivity pipeline inWorkflows. This pipeline extracts metadata from your custom data sourceand imports it into Dataplex Universal Catalog, creating necessaryentry groups.
For more information about managed connectivity, seeManaged connectivity overview.
Before you begin
Before you import metadata, complete the tasks in this section.
Build a connector
A connector extracts the metadata from your datasource and generates a metadata import file that can be imported byDataplex Universal Catalog. The connector is an Artifact Registry image that can be run onGoogle Cloud Serverless for Apache Spark.
Build a custom connector that extracts metadata from your third-party source.
For an example connector that you can use as a reference template to buildyour own connector, seeDevelop a custom connector for metadata import.
Configure Google Cloud resources
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.If you don't plan to run the pipeline on a schedule, you don't need toenable the Cloud Scheduler API.
Create secrets in Secret Managerto store the credentials for your third-party data source.
Configure your Virtual Private Cloud (VPC) networkto run Serverless for Apache Spark workloads.
Create a Cloud Storage bucket tostore the metadata import files.
Create the following Dataplex Universal Catalog resources:
Create custom aspect typesfor the entries that you want to import.
Create custom entry typesfor the entries that you want to import.
Required roles
A service account represents the identity of a workflow and determines whatpermissions the workflow has and which Google Cloud resources it canaccess. You need a service account for Workflows (to run thepipeline) and for Serverless for Apache Spark (to run the connector).
You can use the Compute Engine default service account(PROJECT_NUMBER-compute@developer.gserviceaccount.com), orcreate your own service account(or accounts) to run the managed connectivity pipeline.
Console
In the Google Cloud console, go to theIAM page.
Select the project that you want to import metadata into.
ClickGrant Access,and then enter the service account's email address.
Assign the following roles to the service account:
- Logs Writer
- Dataplex Entry Group Owner
- Dataplex Metadata Job Owner
- Dataplex Catalog Editor
- Dataproc Editor
- Dataproc Worker
- Secret Manager Secret Accessor - on the secret that storesthe credentials for your data source
- Storage Object User - on the Cloud Storage bucket
- Artifact Registry Reader - on the Artifact Registry repositorythat contains the connector image
- Service Account User - if you use different service accounts,grant the service account running Workflows this roleon the service account running the Serverless for Apache Sparkbatch jobs
- Workflows Invoker - if you want to schedule the pipeline
Save your changes.
gcloud
Grant roles to the service account. Run the following commands:
gcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/logging.logWritergcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.entryGroupOwnergcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.metadataJobOwnergcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.catalogEditorgcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.editorgcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.workerReplace the following:
PROJECT_ID: the name of the target Google Cloudproject to import the metadata into.SERVICE_ACCOUNT_ID: the service account, such asmy-service-account@my-project.iam.gserviceaccount.com.
Grant the service account the following roles on the resource level:
gcloud secrets add-iam-policy-bindingSECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/secretmanager.secretaccessorgcloud projects add-iam-policy-bindingPROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID')gcloud artifacts repositories add-iam-policy-bindingREPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT_ID} \ --role=roles/artifactregistry.readerReplace the following:
SECRET_ID: the ID of the secret that storesthe credentials for your data source. It uses the formatprojects/PROJECT_ID/secrets/SECRET_ID.BUCKET_ID: the name of the Cloud Storagebucket.REPOSITORY: the Artifact Registry repositorythat contains the connector image.REPOSITORY_LOCATION: the Google Cloudlocation where the repository is hosted.
Grant the service account running Workflows the
roles/iam.serviceAccountUserrole on the service accountrunning the Serverless for Apache Spark batch jobs. You must grantthis role even if you use the same service account for bothWorkflows and Serverless for Apache Spark.gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT_ID \ --member='SERVICE_ACCOUNT_ID' \ --role='roles/iam.serviceAccountUser'If you use different service accounts, the value for the
--memberflagis the service account running the Serverless for Apache Sparkbatch jobs.If you want to schedule the pipeline, grant the service account thefollowing role:
gcloud projects add-iam-policy-bindingPROJECT_ID \ --member="SERVICE_ACCOUNT_ID" \ --role=roles/workflows.invoker
Import metadata
To import metadata, create and then execute a workflow that runs the managedconnectivity pipeline. Optionally, you can also create a schedule for runningthe pipeline.
Console
Create the workflow.Provide the following information:
- Service account: the service account that you configured in theRequired rolessection of this document.
Encryption: selectGoogle-managed encryption key.
Note: Customer-managed encryption keys (CMEK) aren't propagated toDataplex Universal Catalog and Serverless for Apache Spark jobsfor end-to-end encryption.Define workflow: provide the following definition file:
main:params:[args]steps:-init:assign:-WORKFLOW_ID:${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}-NETWORK_URI:${default(map.get(args, "NETWORK_URI"), "")}-SUBNETWORK_URI:${default(map.get(args, "SUBNETWORK_URI"), "")}-NETWORK_TAGS:${default(map.get(args, "NETWORK_TAGS"), [])}-check_networking:switch:-condition:${NETWORK_URI != "" and SUBNETWORK_URI != ""}raise:"Error:cannotsetbothnetwork_uriandsubnetwork_uri.Pleaseselectone."-condition:${NETWORK_URI == "" and SUBNETWORK_URI == ""}steps:-submit_extract_job_with_default_network_uri:assign:-NETWORK_TYPE:"networkUri"-NETWORKING:${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}-condition:${NETWORK_URI != ""}steps:-submit_extract_job_with_network_uri:assign:-NETWORKING:${NETWORK_URI}-NETWORK_TYPE:"networkUri"-condition:${SUBNETWORK_URI != ""}steps:-submit_extract_job_with_subnetwork_uri:assign:-NETWORKING:${SUBNETWORK_URI}-NETWORK_TYPE:"subnetworkUri"next:check_create_target_entry_group-check_create_target_entry_group:switch:-condition:${args.CREATE_TARGET_ENTRY_GROUP == true}next:create_target_entry_group-condition:${args.CREATE_TARGET_ENTRY_GROUP == false}next:prepare_pyspark_job_body-create_target_entry_group:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:prepare_pyspark_job_body-prepare_pyspark_job_body:assign:-pyspark_batch_body:mainPythonFileUri:file:///main.pyargs:-${"--target_project_id=" + args.TARGET_PROJECT_ID}-${"--target_location_id=" + args.CLOUD_REGION}-${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}-${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}-${"--output_folder=" + WORKFLOW_ID}-${args.ADDITIONAL_CONNECTOR_ARGS}next:add_jar_file_uri_if_present-add_jar_file_uri_if_present:switch:-condition:${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}assign:-pyspark_batch_body.jarFileUris:${args.JAR_FILE_URI}next:generate_extract_job_link-generate_extract_job_link:call:sys.logargs:data:${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}severity:"INFO"next:submit_pyspark_extract_job-submit_pyspark_extract_job:call:http.postargs:url:${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"headers:Content-Type:"application/json"query:batchId:${WORKFLOW_ID}body:pysparkBatch:${pyspark_batch_body}runtimeConfig:containerImage:${args.CUSTOM_CONTAINER_IMAGE}environmentConfig:executionConfig:serviceAccount:${args.SERVICE_ACCOUNT}stagingBucket:${args.CLOUD_STORAGE_BUCKET_ID}${NETWORK_TYPE}:${NETWORKING}networkTags:${NETWORK_TAGS}result:RESPONSE_MESSAGEnext:check_pyspark_extract_job-check_pyspark_extract_job:call:http.getargs:url:${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:PYSPARK_EXTRACT_JOB_STATUSnext:check_pyspark_extract_job_done-check_pyspark_extract_job_done:switch:-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}next:generate_import_logs_link-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}raise:${PYSPARK_EXTRACT_JOB_STATUS}-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}raise:${PYSPARK_EXTRACT_JOB_STATUS}next:pyspark_extract_job_wait-pyspark_extract_job_wait:call:sys.sleepargs:seconds:30next:check_pyspark_extract_job-generate_import_logs_link:call:sys.logargs:data:${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}severity:"INFO"next:submit_import_job-submit_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"body:type:IMPORTimport_spec:source_storage_uri:${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}entry_sync_mode:FULLaspect_sync_mode:INCREMENTALlog_level:${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}scope:entry_groups:-${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}entry_types:${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}aspect_types:${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}result:IMPORT_JOB_RESPONSEnext:get_job_start_time-get_job_start_time:assign:-importJobStartTime:${sys.now()}next:import_job_startup_wait-import_job_startup_wait:call:sys.sleepargs:seconds:30next:initial_get_import_job-initial_get_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_status_available-check_import_job_status_available:switch:-condition:${sys.now() - importJobStartTime > 300}# 5 minutes = 300 secondsnext:kill_import_job-condition:${"status" in IMPORT_JOB_STATUS.body}next:check_import_job_donenext:import_job_status_wait-import_job_status_wait:call:sys.sleepargs:seconds:30next:check_import_job_status_available-check_import_job_done:switch:-condition:${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}next:the_end-condition:${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}raise:${IMPORT_JOB_STATUS}-condition:${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}raise:${IMPORT_JOB_STATUS}-condition:${IMPORT_JOB_STATUS.body.status.state == "FAILED"}raise:${IMPORT_JOB_STATUS}-condition:${sys.now() - importJobStartTime > 43200}# 12 hours = 43200 secondsnext:kill_import_jobnext:import_job_wait-get_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_done-import_job_wait:call:sys.sleepargs:seconds:30next:get_import_job-kill_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:get_killed_import_job-get_killed_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:KILLED_IMPORT_JOB_STATUSnext:killed-killed:raise:${KILLED_IMPORT_JOB_STATUS}-the_end:return:${IMPORT_JOB_STATUS}
To run the pipeline on demand,execute the workflow.
Provide the following runtime arguments:
{"TARGET_PROJECT_ID":"PROJECT_ID","CLOUD_REGION":"LOCATION_ID","TARGET_ENTRY_GROUP_ID":"ENTRY_GROUP_ID","CREATE_TARGET_ENTRY_GROUP":CREATE_ENTRY_GROUP_BOOLEAN,"CLOUD_STORAGE_BUCKET_ID":"BUCKET_ID","SERVICE_ACCOUNT":"SERVICE_ACCOUNT_ID","ADDITIONAL_CONNECTOR_ARGS":[ADDITIONAL_CONNECTOR_ARGUMENTS],"CUSTOM_CONTAINER_IMAGE":"CONTAINER_IMAGE","IMPORT_JOB_SCOPE_ENTRY_TYPES":[ENTRY_TYPES],"IMPORT_JOB_SCOPE_ASPECT_TYPES":[ASPECT_TYPES],"IMPORT_JOB_LOG_LEVEL":"INFO","JAR_FILE_URI":"","NETWORK_TAGS":[],"NETWORK_URI":"","SUBNETWORK_URI":""}Replace the following:
PROJECT_ID: the name of the target Google Cloudproject to import the metadata into.LOCATION_ID: the target Google Cloud locationwhere the Serverless for Apache Spark and metadata import jobs will run,and metadata will be imported into.ENTRY_GROUP_ID: the ID of the entry group to importmetadata into. The entry group ID can contain lowercase letters, numbers, andhyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.CREATE_ENTRY_GROUP_BOOLEAN: if you want the pipeline tocreate the entry group if it doesn't already exist in your project, set thisvalue totrue.BUCKET_ID: the name of the Cloud Storagebucket to store the metadata import file that is generated by the connector.Each workflow execution creates a new folder.SERVICE_ACCOUNT_ID: the service account that you configured in theRequired rolessection of this document. The service account runs the connector inServerless for Apache Spark.ADDITIONAL_CONNECTOR_ARGUMENTS: a list of additionalarguments to pass to the connector. For examples, seeDevelop a customconnector for metadata import. Enclose each argument in doublequotation marks, and separate the arguments with commas.CONTAINER_IMAGE: the custom container image of theconnector hosted in Artifact Registry.ENTRY_TYPES: a list of entry types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.ASPECT_TYPES: a list of aspect types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.- Optional: for the
NETWORK_TAGSargument, provide a list of network tags. - Optional: For the
NETWORK_URIargument, provide the URI of the VPCnetwork that connects to the data source. If you provide a network, omit thesubnetwork argument. - Optional: For the
SUBNETWORK_URIargument, provide the URI of the subnetwork thatconnects to the data source. If you provide a subnet, omit the network argument.
Depending on the amount of metadata that you import, the pipelinemight take several minutes or longer to run. For more information abouthow to view the progress, seeAccess workflow execution results.
After the pipeline has finished running, you cansearch for the imported metadata in Dataplex Universal Catalog.
Optional: If you want to run the pipeline on a schedule,create a schedule by usingCloud Scheduler. Provide the following information:
- Frequency: a unix-cron expression that defines the schedule torun the pipeline.
- Workflow argument: the runtime arguments for the connector, asdescribed in the previous step.
- Service account: the service account. The service accountmanages the scheduler.
gcloud
Save the following workload definition as a YAML file:
main:params:[args]steps:-init:assign:-WORKFLOW_ID:${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}-NETWORK_URI:${default(map.get(args, "NETWORK_URI"), "")}-SUBNETWORK_URI:${default(map.get(args, "SUBNETWORK_URI"), "")}-NETWORK_TAGS:${default(map.get(args, "NETWORK_TAGS"), [])}-check_networking:switch:-condition:${NETWORK_URI != "" and SUBNETWORK_URI != ""}raise:"Error:cannotsetbothnetwork_uriandsubnetwork_uri.Pleaseselectone."-condition:${NETWORK_URI == "" and SUBNETWORK_URI == ""}steps:-submit_extract_job_with_default_network_uri:assign:-NETWORK_TYPE:"networkUri"-NETWORKING:${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}-condition:${NETWORK_URI != ""}steps:-submit_extract_job_with_network_uri:assign:-NETWORKING:${NETWORK_URI}-NETWORK_TYPE:"networkUri"-condition:${SUBNETWORK_URI != ""}steps:-submit_extract_job_with_subnetwork_uri:assign:-NETWORKING:${SUBNETWORK_URI}-NETWORK_TYPE:"subnetworkUri"next:check_create_target_entry_group-check_create_target_entry_group:switch:-condition:${args.CREATE_TARGET_ENTRY_GROUP == true}next:create_target_entry_group-condition:${args.CREATE_TARGET_ENTRY_GROUP == false}next:prepare_pyspark_job_body-create_target_entry_group:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:prepare_pyspark_job_body-prepare_pyspark_job_body:assign:-pyspark_batch_body:mainPythonFileUri:file:///main.pyargs:-${"--target_project_id=" + args.TARGET_PROJECT_ID}-${"--target_location_id=" + args.CLOUD_REGION}-${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}-${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}-${"--output_folder=" + WORKFLOW_ID}-${args.ADDITIONAL_CONNECTOR_ARGS}next:add_jar_file_uri_if_present-add_jar_file_uri_if_present:switch:-condition:${args.JAR_FILE_URI != "" and args.JAR_FILE_URI != null}assign:-pyspark_batch_body.jarFileUris:${args.JAR_FILE_URI}next:generate_extract_job_link-generate_extract_job_link:call:sys.logargs:data:${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}severity:"INFO"next:submit_pyspark_extract_job-submit_pyspark_extract_job:call:http.postargs:url:${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"headers:Content-Type:"application/json"query:batchId:${WORKFLOW_ID}body:pysparkBatch:${pyspark_batch_body}runtimeConfig:containerImage:${args.CUSTOM_CONTAINER_IMAGE}environmentConfig:executionConfig:serviceAccount:${args.SERVICE_ACCOUNT}stagingBucket:${args.CLOUD_STORAGE_BUCKET_ID}${NETWORK_TYPE}:${NETWORKING}networkTags:${NETWORK_TAGS}result:RESPONSE_MESSAGEnext:check_pyspark_extract_job-check_pyspark_extract_job:call:http.getargs:url:${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:PYSPARK_EXTRACT_JOB_STATUSnext:check_pyspark_extract_job_done-check_pyspark_extract_job_done:switch:-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}next:generate_import_logs_link-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}raise:${PYSPARK_EXTRACT_JOB_STATUS}-condition:${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}raise:${PYSPARK_EXTRACT_JOB_STATUS}next:pyspark_extract_job_wait-pyspark_extract_job_wait:call:sys.sleepargs:seconds:30next:check_pyspark_extract_job-generate_import_logs_link:call:sys.logargs:data:${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}severity:"INFO"next:submit_import_job-submit_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"body:type:IMPORTimport_spec:source_storage_uri:${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}entry_sync_mode:FULLaspect_sync_mode:INCREMENTALlog_level:${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}scope:entry_groups:-${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}entry_types:${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}aspect_types:${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}result:IMPORT_JOB_RESPONSEnext:get_job_start_time-get_job_start_time:assign:-importJobStartTime:${sys.now()}next:import_job_startup_wait-import_job_startup_wait:call:sys.sleepargs:seconds:30next:initial_get_import_job-initial_get_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_status_available-check_import_job_status_available:switch:-condition:${sys.now() - importJobStartTime > 300}# 5 minutes = 300 secondsnext:kill_import_job-condition:${"status" in IMPORT_JOB_STATUS.body}next:check_import_job_donenext:import_job_status_wait-import_job_status_wait:call:sys.sleepargs:seconds:30next:check_import_job_status_available-check_import_job_done:switch:-condition:${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}next:the_end-condition:${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}raise:${IMPORT_JOB_STATUS}-condition:${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}raise:${IMPORT_JOB_STATUS}-condition:${IMPORT_JOB_STATUS.body.status.state == "FAILED"}raise:${IMPORT_JOB_STATUS}-condition:${sys.now() - importJobStartTime > 43200}# 12 hours = 43200 secondsnext:kill_import_jobnext:import_job_wait-get_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_done-import_job_wait:call:sys.sleepargs:seconds:30next:get_import_job-kill_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:get_killed_import_job-get_killed_import_job:call:http.getargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:KILLED_IMPORT_JOB_STATUSnext:killed-killed:raise:${KILLED_IMPORT_JOB_STATUS}-the_end:return:${IMPORT_JOB_STATUS}Define Bash variables,create the workflow, andoptionallycreate a schedule forrunning the pipeline:
# Define Bash variables (replace with your actual values)project_id="PROJECT_ID"region="LOCATION_ID"service_account="SERVICE_ACCOUNT_ID"workflow_source="WORKFLOW_DEFINITION_FILE.yaml"workflow_name="WORKFLOW_NAME"workflow_args='WORKFLOW_ARGUMENTS'# Create Workflows resourcegcloudworkflowsdeploy${workflow_name}\--project=${project_id}\--location=${region}\--source=${workflow_source}\--service-account=${service_account}# Create Cloud Scheduler jobgcloudschedulerjobscreatehttp${workflow_name}-scheduler\--project=${project_id}\--location=${region}\--schedule="CRON_SCHEDULE_EXPRESSION"\--time-zone="UTC"\--uri="https://workflowexecutions.googleapis.com/v1/projects/${project_id}/locations/${region}/workflows/${workflow_name}/executions"\--http-method="POST"\--oauth-service-account-email=${service_account}\--headers="Content-Type=application/json"\--message-body='{"argument": ${workflow_args}}'Replace the following:
PROJECT_ID: the name of the target Google Cloudproject to import the metadata into.LOCATION_ID: the target Google Cloud locationwhere the Serverless for Apache Spark and metadata import jobs will run,and metadata will be imported into.SERVICE_ACCOUNT_ID: the service account that you configured in theRequired rolessection of this document.WORKFLOW_DEFINITION_FILE: the path to theworkflow definition YAML file.WORKFLOW_NAME: the name of the workflow.WORKFLOW_ARGUMENTS: the runtime arguments to pass tothe connector. The arguments are in JSON format:{"TARGET_PROJECT_ID":"PROJECT_ID","CLOUD_REGION":"LOCATION_ID","TARGET_ENTRY_GROUP_ID":"ENTRY_GROUP_ID","CREATE_TARGET_ENTRY_GROUP":CREATE_ENTRY_GROUP_BOOLEAN,"CLOUD_STORAGE_BUCKET_ID":"BUCKET_ID","SERVICE_ACCOUNT":"SERVICE_ACCOUNT_ID","ADDITIONAL_CONNECTOR_ARGS":[ADDITIONAL_CONNECTOR_ARGUMENTS],"CUSTOM_CONTAINER_IMAGE":"CONTAINER_IMAGE","IMPORT_JOB_SCOPE_ENTRY_TYPES":[ENTRY_TYPES],"IMPORT_JOB_SCOPE_ASPECT_TYPES":[ASPECT_TYPES],"IMPORT_JOB_LOG_LEVEL":"INFO","JAR_FILE_URI":"","NETWORK_TAGS":[],"NETWORK_URI":"","SUBNETWORK_URI":""}For Cloud Scheduler, the double quotation marks inside thequoted string are escaped using backslashes (\).For example:
--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}".Replace the following:
ENTRY_GROUP_ID: the ID of the entry group to importmetadata into. The entry group ID can contain lowercase letters, numbers, andhyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.CREATE_ENTRY_GROUP_BOOLEAN: if you want the pipeline tocreate the entry group if it doesn't already exist in your project, set thisvalue totrue.BUCKET_ID: the name of the Cloud Storagebucket to store the metadata import file that is generated by the connector.Each workflow execution creates a new folder.ADDITIONAL_CONNECTOR_ARGUMENTS: a list of additionalarguments to pass to the connector. For examples, seeDevelop a customconnector for metadata import.CONTAINER_IMAGE: the custom container image of theconnector hosted in Artifact Registry.ENTRY_TYPES: a list of entry types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.ASPECT_TYPES: a list of aspect types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.- Optional: for the
NETWORK_TAGSargument, provide a list of network tags. - Optional: For the
NETWORK_URIargument, provide the URI of the VPCnetwork that connects to the data source. If you provide a network, omit thesubnetwork argument. - Optional: For the
SUBNETWORK_URIargument, provide the URI of the subnetwork thatconnects to the data source. If you provide a subnet, omit the network argument.
CRON_SCHEDULE_EXPRESSION: a cron expression thatdefines the schedule to run the pipeline. For example, to run the schedule atmidnight every day, use the expression0 0 * * *.
To run the pipeline on demand,execute the workflow:
workflow_name="WORKFLOW_NAME"workflow_args='WORKFLOW_ARGUMENTS'gcloudworkflowsrun"${workflow_name}"--project=${project_id}--location=${location}--data'${workflow_args}'The workflow arguments are in JSON format, but not escaped.
Depending on the amount of metadata that you import, the workflow mighttake several minutes or longer to run. For more information about how toview the progress, seeAccess workflow execution results.
After the pipeline has finished running, you cansearch for the imported metadata in Dataplex Universal Catalog.
Terraform
Clone the
cloud-dataplexrepository.The repository includes the following Terraform files:
main.tf:defines the Google Cloud resources to create.variables.tf:declares the variables.byo-connector.tfvars:defines the variables for your managed connectivity pipeline.
Edit the
.tfvarsfile to replace the placeholders with the informationfor your connector.project_id="PROJECT_ID"region="LOCATION_ID"service_account="SERVICE_ACCOUNT_ID"cron_schedule="CRON_SCHEDULE_EXPRESSION"workflow_args={"TARGET_PROJECT_ID":"PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP":CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}workflow_source=<<EOFmain:params:[args]steps:-init:assign:-WORKFLOW_ID:$${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}-NETWORK_URI:$${default(map.get(args,"NETWORK_URI"), "")}-SUBNETWORK_URI:$${default(map.get(args,"SUBNETWORK_URI"), "")}-NETWORK_TAGS:$${default(map.get(args,"NETWORK_TAGS"),[])}-check_networking:switch:-condition:$${NETWORK_URI!="" and SUBNETWORK_URI != ""}raise:"Error: cannot set both network_uri and subnetwork_uri. Please select one."-condition:$${NETWORK_URI!=""}steps:-submit_extract_job_with_network_uri:assign:-NETWORKING:$${NETWORK_URI}-NETWORK_TYPE:"networkUri"-condition:$${SUBNETWORK_URI!=""}steps:-submit_extract_job_with_subnetwork_uri:assign:-NETWORKING:$${SUBNETWORK_URI}-NETWORK_TYPE:"subnetworkUri"next:set_default_networking-set_default_networking:assign:-NETWORK_TYPE:"networkUri"-NETWORKING:$${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}next:check_create_target_entry_group-check_create_target_entry_group:switch:-condition:$${args.CREATE_TARGET_ENTRY_GROUP==true}next:create_target_entry_group-condition:$${args.CREATE_TARGET_ENTRY_GROUP==false}next:generate_extract_job_link-create_target_entry_group:call:http.postargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id="+args.TARGET_ENTRY_GROUP_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:generate_extract_job_link-generate_extract_job_link:call:sys.logargs:data:$${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project="+args.TARGET_PROJECT_ID}severity:"INFO"next:submit_pyspark_extract_job-submit_pyspark_extract_job:call:http.postargs:url:$${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"headers:Content-Type:"application/json"query:batchId:$${WORKFLOW_ID}body:pysparkBatch:mainPythonFileUri:file:///main.pyargs:-$${"--target_project_id="+args.TARGET_PROJECT_ID}-$${"--target_location_id="+args.CLOUD_REGION}-$${"--target_entry_group_id="+args.TARGET_ENTRY_GROUP_ID}-$${"--output_bucket="+args.CLOUD_STORAGE_BUCKET_ID}-$${"--output_folder="+WORKFLOW_ID}-$${args.ADDITIONAL_CONNECTOR_ARGS}runtimeConfig:containerImage:$${args.CUSTOM_CONTAINER_IMAGE}environmentConfig:executionConfig:serviceAccount:$${args.SERVICE_ACCOUNT}stagingBucket:$${args.CLOUD_STORAGE_BUCKET_ID}$${NETWORK_TYPE}:$${NETWORKING}networkTags:$${NETWORK_TAGS}result:RESPONSE_MESSAGEnext:check_pyspark_extract_job-check_pyspark_extract_job:call:http.getargs:url:$${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/"+WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:PYSPARK_EXTRACT_JOB_STATUSnext:check_pyspark_extract_job_done-check_pyspark_extract_job_done:switch:-condition:$${PYSPARK_EXTRACT_JOB_STATUS.body.state=="SUCCEEDED"}next:generate_import_logs_link-condition:$${PYSPARK_EXTRACT_JOB_STATUS.body.state=="CANCELLED"}raise:$${PYSPARK_EXTRACT_JOB_STATUS}-condition:$${PYSPARK_EXTRACT_JOB_STATUS.body.state=="FAILED"}raise:$${PYSPARK_EXTRACT_JOB_STATUS}next:pyspark_extract_job_wait-pyspark_extract_job_wait:call:sys.sleepargs:seconds:30next:check_pyspark_extract_job-generate_import_logs_link:call:sys.logargs:data:$${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}severity:"INFO"next:submit_import_job-submit_import_job:call:http.postargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id="+WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"body:type:IMPORTimport_spec:source_storage_uri:$${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}entry_sync_mode:FULLaspect_sync_mode:INCREMENTALlog_level:$${default(map.get(args,"IMPORT_JOB_LOG_LEVEL"), "INFO")}scope:entry_groups:-$${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}entry_types:$${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}aspect_types:$${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}result:IMPORT_JOB_RESPONSEnext:get_job_start_time-get_job_start_time:assign:-importJobStartTime:$${sys.now()}next:import_job_startup_wait-import_job_startup_wait:call:sys.sleepargs:seconds:30next:initial_get_import_job-initial_get_import_job:call:http.getargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/"+WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_status_available-check_import_job_status_available:switch:-condition:$${sys.now()-importJobStartTime >300} # 5 minutes = 300 secondsnext:kill_import_job-condition:$${"status"inIMPORT_JOB_STATUS.body}next:check_import_job_donenext:import_job_status_wait-import_job_status_wait:call:sys.sleepargs:seconds:30next:check_import_job_status_available-check_import_job_done:switch:-condition:$${IMPORT_JOB_STATUS.body.status.state=="SUCCEEDED"}next:the_end-condition:$${IMPORT_JOB_STATUS.body.status.state=="CANCELLED"}raise:$${IMPORT_JOB_STATUS}-condition:$${IMPORT_JOB_STATUS.body.status.state=="SUCCEEDED_WITH_ERRORS"}raise:$${IMPORT_JOB_STATUS}-condition:$${IMPORT_JOB_STATUS.body.status.state=="FAILED"}raise:$${IMPORT_JOB_STATUS}-condition:$${sys.now()-importJobStartTime >43200} # 12 hours = 43200 secondsnext:kill_import_jobnext:import_job_wait-get_import_job:call:http.getargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/"+WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:IMPORT_JOB_STATUSnext:check_import_job_done-import_job_wait:call:sys.sleepargs:seconds:30next:get_import_job-kill_import_job:call:http.postargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"next:get_killed_import_job-get_killed_import_job:call:http.getargs:url:$${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/"+WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"result:KILLED_IMPORT_JOB_STATUSnext:killed-killed:raise:$${KILLED_IMPORT_JOB_STATUS}-the_end:return:$${IMPORT_JOB_STATUS}EOFReplace the following:
PROJECT_ID: the name of the target Google Cloudproject to import the metadata into.LOCATION_ID: the target Google Cloud locationwhere the Serverless for Apache Spark and metadata import jobs will run,and metadata will be imported into.SERVICE_ACCOUNT_ID: the service account that you configured in theRequired rolessection of this document.CRON_SCHEDULE_EXPRESSION: a cron expression thatdefines the schedule to run the pipeline. For example, to run the schedule atmidnight every day, use the expression0 0 * * *.ENTRY_GROUP_ID: the ID of the entry group to importmetadata into. The entry group ID can contain lowercase letters, numbers, andhyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID.CREATE_ENTRY_GROUP_BOOLEAN: if you want the pipeline tocreate the entry group if it doesn't already exist in your project, set thisvalue totrue.BUCKET_ID: the name of the Cloud Storagebucket to store the metadata import file that is generated by the connector.Each workflow execution creates a new folder.ADDITIONAL_CONNECTOR_ARGUMENTS: a list of additionalarguments to pass to the connector. For examples, seeDevelop a customconnector for metadata import. Enclose each argument in doublequotation marks, and separate the arguments with commas.CONTAINER_IMAGE: the custom container image of theconnector hosted in Artifact Registry.ENTRY_TYPES: a list of entry types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.ASPECT_TYPES: a list of aspect types that are in scopefor import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID.TheLOCATION_IDmust be either the sameGoogle Cloud location that you import metadata into, orglobal.- Optional: for the
NETWORK_TAGSargument, provide a list of network tags. - Optional: For the
NETWORK_URIargument, provide the URI of the VPCnetwork that connects to the data source. If you provide a network, omit thesubnetwork argument. - Optional: For the
SUBNETWORK_URIargument, provide the URI of the subnetwork thatconnects to the data source. If you provide a subnet, omit the network argument.
Initialize Terraform:
terraform initValidate Terraform with your
.tfvarsfile:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvarsReplace
CONNECTOR_VARIABLES_FILEwith the nameof your variable definitions file.Deploy Terraform with your
.tfvarsfile:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvarsTerraform creates a workflow and a Cloud Scheduler job in thespecified project. Workflows runs the pipeline at theschedule that you specify.
Depending on the amount of metadata that you import, the workflow mighttake several minutes or longer to run. For more information about how toview the progress, seeAccess workflow execution results.
After the pipeline has finished running, you cansearch for the imported metadata in Dataplex Universal Catalog.
View job logs
Use Cloud Logging to view logs for a managed connectivity pipeline. The logpayload includes a link to the logs for the Serverless for Apache Sparkbatch job and the metadata import job, as relevant. For more information, seeView workflow logs.
Troubleshooting
Use the following troubleshooting suggestions:
- Configure the import job log level for the metadata job touse debug-level logginginstead of info-level logging.
- Review the logs for the Serverless for Apache Spark batch job (forconnector runs) and the metadata import job. For more information, seeQuery Serverless for Apache Spark logsandQuery metadata job logs.
- If an entry can't be imported using the pipeline and the error message doesn'tprovide enough information, try creating a custom entry with the same details,in a test entry group. For more information, seeCreate a custom entry.
What's next
- About metadata management in Dataplex Universal Catalog
- Develop a custom connector for metadata import
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.