Develop a custom connector for metadata import Stay organized with collections Save and categorize content based on your preferences.
This document provides a reference template to build a custom connector forextracting metadata from third-party sources, such as MySQL, SQL Server, andOracle. You can use this connector to import metadata into Dataplex Universal Catalogthrough amanaged connectivity pipeline.An example Python connector for Oracle Database Express Edition (XE) is includedas a starting point. You can also develop connectors using Java, Scala, or R.
How connectors work
A connector extracts metadata from a third-party data source, transforms themetadata to Dataplex Universal CatalogImportItem format, and generatesmetadata import files that can be imported by Dataplex Universal Catalog.
The connector is a part of a managed connectivity pipeline. A managedconnectivity pipeline is an orchestrated workflow that you use to importDataplex Universal Catalog metadata. The managed connectivity pipelineruns the connector and performs other tasks in the import workflow, such asrunning a metadata import job and capturing logs.
The managed connectivity pipeline runs the connector by using aGoogle Cloud Serverless for Apache Sparkbatch job. Serverless for Apache Spark provides aserverless Spark execution environment. Although you can build a connector thatdoesn't use Spark, we recommend that you use Spark because it can improve theperformance of your connector.
Connector requirements
The connector has the following requirements:
- The connector must be an Artifact Registry image that can be run onServerless for Apache Spark.
- The connector must generate metadata files in a format that can be importedby a Dataplex Universal Catalog metadata import job (the
metadataJobs.createAPI method). For detailed requirements, seeMetadata import file. The connector must accept the following command-line arguments to receiveinformation from the pipeline:
Command-line argument Value that pipeline provides target_project_idPROJECT_ID target_location_idREGION target_entry_group_idENTRY_GROUP_ID output_bucketCLOUD_STORAGE_BUCKET_ID output_folderFOLDER_ID The connector uses these arguments to generate metadata in a target entry group
projects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID,and to write to a Cloud Storage bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID.Each execution of the pipeline creates a new folderFOLDER_ID in bucketCLOUD_STORAGE_BUCKET_ID. The connector should writemetadata import files to this folder.
The pipeline templates support PySpark connectors. The templates assume that the driver(mainPythonFileUri)is a local file on the connector image namedmain.py. You can modify thepipeline templates for other scenarios, such as a Spark connector, a differentdriver URI, or other options.
Here's how you use PySpark to create an import item in the metadata importfile.
"""PySpark schemas for the data."""entry_source_schema=StructType([StructField("display_name",StringType()),StructField("source",StringType())])aspect_schema=MapType(StringType(),StructType([StructField("aspect_type",StringType()),StructField("data",StructType([]))]))entry_schema=StructType([StructField("name",StringType()),StructField("entry_type",StringType()),StructField("fully_qualified_name",StringType()),StructField("parent_entry",StringType()),StructField("entry_source",entry_source_schema),StructField("aspects",aspect_schema)])import_item_schema=StructType([StructField("entry",entry_schema),StructField("aspect_keys",ArrayType(StringType())),StructField("update_mask",ArrayType(StringType()))])Before you begin
This guide assumes that you're familiar with Python and PySpark.
Review the following information:
- Dataplex Universal Catalog metadata concepts
- Documentation aboutmetadata import jobs
Do the following things. Create all resources in the same Google Cloudlocation.
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
- Create a project: To create a project, you need the Project Creator role (
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission.Learn how to grant roles.
Create a Google Cloud project:
gcloud projects createPROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating.Select the Google Cloud project that you created:
gcloud config set projectPROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission.Learn how to grant roles.gcloudservicesenabledataplex.googleapis.com
dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com Install the Google Cloud CLI.
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
Toinitialize the gcloud CLI, run the following command:
gcloudinit
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwnergcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Set up authentication:
- Ensure that you have the Create Service Accounts IAM role (
roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin).Learn how to grant roles. Create the service account:
gcloudiamservice-accountscreateSERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAMEwith a name for the service account.Grant the
roles/ownerIAM role to the service account:gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com"--role=roles/owner
Replace the following:
SERVICE_ACCOUNT_NAME: the name of the service accountPROJECT_ID: the project ID where you created the service account
--roleflag affects which resources the service account can access in your project. You can revoke these roles or grant additional roles later. In production environments, do not grant the Owner, Editor, or Viewer roles. Instead, grant apredefined role orcustom role that meets your needs.
- Ensure that you have the Create Service Accounts IAM role (
Create a Cloud Storage bucket to store the metadata import files.
Create the following metadata resources in the same project.
Note: This document assumes all the resources are in the same project for simplicity. If you want to create resources different projects, you can adjust the sample code.For example values, see theExample metadata resources for an Oracle source section of this document.
- Create an entry group.
Create custom aspect types for the entries that you want to import. Use the naming convention
SOURCE-ENTITY_TO_IMPORT.For example, for an Oracle database, create an aspect type named
oracle-database.Optionally, you can create additional aspect types to store other information.
Create custom entry types for the resources that you want to import, and assign the relevant aspect types to them. Use the naming convention
SOURCE-ENTITY_TO_IMPORT.For example, for an Oracle database, create an entry type named
oracle-database. Link it to the aspect type that is namedoracle-database.
- Ensure that your third-party source is accessible from your Google Cloud project. For more information, seeServerless for Apache Spark network configuration.
Create a basic Python connector
The example basic Python connector creates top-level entries for anOracle data source by using the Dataplex Universal Catalog client library classes.Then, you provide the values for the entry fields.
The connector creates a metadata import file with the following entries:
- An
instanceentry, with entry typeprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance.This entry represents an Oracle Database XE system. - A
databaseentry, which represents a database inside the Oracle Database XEsystem.
To build a basic Python connector, do the following:
Clone the
cloud-dataplexrepository.Set up a local environment. We recommend that you use a virtual environment.
mkdir venvpython -m venv venv/source venv/bin/activateUse theactiveormaintenanceversions of Python. Python versions 3.7 and later are supported.
Create a Python project.
Install requirements:
pip install -r requirements.txtThe following requirements are installed:
google-cloud-dataplex==2.2.2google-cloud-storagegoogle-cloud-secret-managerAdd a
main.pypipeline file on the root of the project.fromsrcimportbootstrapif__name__=='__main__':bootstrap.run()When deploying your code to Serverless for Apache Spark, the
main.pyfile serves as the entry point for execution. We recommend that you minimizethe amount of information that is stored in themain.pyfile; use thisfile to call functions and classes that are defined within your connector,such as thesrc/bootstap.pyclass.Create a
srcfolder to store the majority of the logic for your connector.Update the
src/cmd_reader.pyfile with a Python class to accept command-linearguments. You can use theargeparsemodule to do this."""Command line reader."""importargparsedefread_args():"""Reads arguments from the command line."""parser=argparse.ArgumentParser()# Dataplex argumentsparser.add_argument("--target_project_id",type=str,required=True,help="The name of the target Google Cloud project to import the metadata into.")parser.add_argument("--target_location_id",type=str,required=True,help="The target Google Cloud location where the metadata will be imported into.")parser.add_argument("--target_entry_group_id",type=str,required=True,help="The ID of the entry group to import metadata into. ""The metadata will be imported into entry group with the following""full resource name: projects/${target_project_id}/""locations/${target_location_id}/entryGroups/${target_entry_group_id}.")# Oracle argumentsparser.add_argument("--host_port",type=str,required=True,help="Oracle host and port number separated by the colon (:).")parser.add_argument("--user",type=str,required=True,help="Oracle User.")parser.add_argument("--password-secret",type=str,required=True,help="Secret resource name in the Secret Manager for the Oracle password.")parser.add_argument("--database",type=str,required=True,help="Source Oracle database.")# Google Cloud Storage arguments# It is assumed that the bucket is in the same region as the entry groupparser.add_argument("--output_bucket",type=str,required=True,help="The Cloud Storage bucket to write the generated metadata import file.")parser.add_argument("--output_folder",type=str,required=True,help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")returnvars(parser.parse_known_args()[0])In production environments, we recommend that you store the password inSecret Manager.
Update the
src/constants.pyfile with code to create constants."""Constants that are used in the different files."""importenumSOURCE_TYPE="oracle"# Symbols for replacementFORBIDDEN="#"ALLOWED="!"classEntryType(enum.Enum):"""Types of Oracle entries."""INSTANCE:str="projects/{project}/locations/{location}/entryTypes/oracle-instance"DATABASE:str="projects/{project}/locations/{location}/entryTypes/oracle-database"DB_SCHEMA:str="projects/{project}/locations/{location}/entryTypes/oracle-schema"TABLE:str="projects/{project}/locations/{location}/entryTypes/oracle-table"VIEW:str="projects/{project}/locations/{location}/entryTypes/oracle-view"Update the
src/name_builder.pyfile with methods to build themetadata resources that you want the connector to createfor your Oracle resources. Use the conventions that are described in theExample metadata resources for an Oracle sourcesection of this document."""Builds Dataplex hierarchy identifiers."""fromtypingimportDictfromsrc.constantsimportEntryType,SOURCE_TYPE# Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.# In that case in names it is changed to C!!, and escaped with backticks in FQNsFORBIDDEN_SYMBOL="#"ALLOWED_SYMBOL="!"defcreate_fqn(config:Dict[str,str],entry_type:EntryType,schema_name:str="",table_name:str=""):"""Creates a fully qualified name or Dataplex v1 hierarchy name."""ifFORBIDDEN_SYMBOLinschema_name:schema_name=f"`{schema_name}`"ifentry_type==EntryType.INSTANCE:# Requires backticks to escape columnreturnf"{SOURCE_TYPE}:`{config['host_port']}`"ifentry_type==EntryType.DATABASE:instance=create_fqn(config,EntryType.INSTANCE)returnf"{instance}.{config['database']}"ifentry_type==EntryType.DB_SCHEMA:database=create_fqn(config,EntryType.DATABASE)returnf"{database}.{schema_name}"ifentry_typein[EntryType.TABLE,EntryType.VIEW]:database=create_fqn(config,EntryType.DATABASE)returnf"{database}.{schema_name}.{table_name}"return""defcreate_name(config:Dict[str,str],entry_type:EntryType,schema_name:str="",table_name:str=""):"""Creates a Dataplex v2 hierarchy name."""ifFORBIDDEN_SYMBOLinschema_name:schema_name=schema_name.replace(FORBIDDEN_SYMBOL,ALLOWED_SYMBOL)ifentry_type==EntryType.INSTANCE:name_prefix=(f"projects/{config['target_project_id']}/"f"locations/{config['target_location_id']}/"f"entryGroups/{config['target_entry_group_id']}/"f"entries/")returnname_prefix+config["host_port"].replace(":","@")ifentry_type==EntryType.DATABASE:instance=create_name(config,EntryType.INSTANCE)returnf"{instance}/databases/{config['database']}"ifentry_type==EntryType.DB_SCHEMA:database=create_name(config,EntryType.DATABASE)returnf"{database}/database_schemas/{schema_name}"ifentry_type==EntryType.TABLE:db_schema=create_name(config,EntryType.DB_SCHEMA,schema_name)returnf"{db_schema}/tables/{table_name}"ifentry_type==EntryType.VIEW:db_schema=create_name(config,EntryType.DB_SCHEMA,schema_name)returnf"{db_schema}/views/{table_name}"return""defcreate_parent_name(config:Dict[str,str],entry_type:EntryType,parent_name:str=""):"""Generates a Dataplex v2 name of the parent."""ifentry_type==EntryType.DATABASE:returncreate_name(config,EntryType.INSTANCE)ifentry_type==EntryType.DB_SCHEMA:returncreate_name(config,EntryType.DATABASE)ifentry_type==EntryType.TABLE:returncreate_name(config,EntryType.DB_SCHEMA,parent_name)return""defcreate_entry_aspect_name(config:Dict[str,str],entry_type:EntryType):"""Generates an entry aspect name."""last_segment=entry_type.value.split("/")[-1]returnf"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"Because the
name_builder.pyfile is used for both the Python core code andthe PySpark core code, we recommend that you write the methods as purefunctions, instead of as members of a class.Update the
src/top_entry_builder.pyfile with code tofill the top-level entries with data."""Non-Spark approach for building the entries."""importdataclassesimportjsonfromtypingimportList,Dictimportprotofromgoogle.cloudimportdataplex_v1fromsrc.constantsimportEntryTypefromsrcimportname_builderasnb@dataclasses.dataclass(slots=True)classImportItem:"""A template class for Import API."""entry:dataplex_v1.Entry=dataclasses.field(default_factory=dataplex_v1.Entry)aspect_keys:List[str]=dataclasses.field(default_factory=list)update_mask:List[str]=dataclasses.field(default_factory=list)def_dict_factory(data:object):"""Factory function required for converting Entry dataclass to dict."""defconvert(obj:object):ifisinstance(obj,proto.Message):returnproto.Message.to_dict(obj)returnobjreturndict((k,convert(v))fork,vindata)def_create_entry(config:Dict[str,str],entry_type:EntryType):"""Creates an entry based on a Dataplex library."""entry=dataplex_v1.Entry()entry.name=nb.create_name(config,entry_type)entry.entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])entry.fully_qualified_name=nb.create_fqn(config,entry_type)entry.parent_entry=nb.create_parent_name(config,entry_type)aspect_key=nb.create_entry_aspect_name(config,entry_type)# Add mandatory aspectentry_aspect=dataplex_v1.Aspect()entry_aspect.aspect_type=aspect_keyentry_aspect.data={}entry.aspects[aspect_key]=entry_aspectreturnentrydef_entry_to_import_item(entry:dataplex_v1.Entry):"""Packs entry to import item, accepted by the API,"""import_item=ImportItem()import_item.entry=entryimport_item.aspect_keys=list(entry.aspects.keys())import_item.update_mask="aspects"returnimport_itemdefcreate(config,entry_type:EntryType):"""Creates an entry, packs it to Import Item and converts to json."""import_item=_entry_to_import_item(_create_entry(config,entry_type))returnjson.dumps(dataclasses.asdict(import_item,dict_factory=_dict_factory))Update the
src/bootstrap.pyfile with code togenerate the metadata import file and run the connector."""The entrypoint of a pipeline."""fromtypingimportDictfromsrc.constantsimportEntryTypefromsrcimportcmd_readerfromsrcimportsecret_managerfromsrcimportentry_builderfromsrcimportgcs_uploaderfromsrcimporttop_entry_builderfromsrc.oracle_connectorimportOracleConnectorFILENAME="output.jsonl"defwrite_jsonl(output_file,json_strings):"""Writes a list of string to the file in JSONL format."""# For simplicity, dataset is written into the one file. But it is not# mandatory, and the order doesn't matter for Import API.# The PySpark itself could dump entries into many smaller JSONL files.# Due to performance, it's recommended to dump to many smaller files.forstringinjson_strings:output_file.write(string+"\n")defprocess_dataset(connector:OracleConnector,config:Dict[str,str],schema_name:str,entry_type:EntryType,):"""Builds dataset and converts it to jsonl."""df_raw=connector.get_dataset(schema_name,entry_type)df=entry_builder.build_dataset(config,df_raw,schema_name,entry_type)returndf.toJSON().collect()defrun():"""Runs a pipeline."""config=cmd_reader.read_args()config["password"]=secret_manager.get_password(config["password_secret"])connector=OracleConnector(config)withopen(FILENAME,"w",encoding="utf-8")asfile:# Write top entries that don't require connection to the databasefile.writelines(top_entry_builder.create(config,EntryType.INSTANCE))file.writelines("\n")file.writelines(top_entry_builder.create(config,EntryType.DATABASE))# Get schemas, write them and collect to the listdf_raw_schemas=connector.get_db_schemas()schemas=[schema.USERNAMEforschemaindf_raw_schemas.select("USERNAME").collect()]schemas_json=entry_builder.build_schemas(config,df_raw_schemas).toJSON().collect()write_jsonl(file,schemas_json)# Ingest tables and views for every schema in a listforschemainschemas:print(f"Processing tables for{schema}")tables_json=process_dataset(connector,config,schema,EntryType.TABLE)write_jsonl(file,tables_json)print(f"Processing views for{schema}")views_json=process_dataset(connector,config,schema,EntryType.VIEW)write_jsonl(file,views_json)gcs_uploader.upload(config,FILENAME)Run the code locally.
A metadata import file named
output.jsonlis returned. The file has twolines, each representing an import item. The managed connectivity pipelinereads this file when running the metadata import job.Optional: Extend the previous example to use the Dataplex Universal Catalogclient library classes to create import items for tables, schemas, and views.You can also run the Python example on Serverless for Apache Spark.
We recommend that you create a connector that uses Spark (and runson Serverless for Apache Spark), because it can improve theperformance of your connector.
Create a PySpark connector
This example is based on thePySpark DataFrame API. You can install PySpark SQL and run it locally beforerunning on Serverless for Apache Spark. If you install and run PySparklocally, install the PySpark library by using pip, but you don't need to installa local Spark cluster.
For performance reasons, this example doesn't use predefined classes from thePySpark library. Instead, the example creates DataFrames, converts theDataFrames into JSON entries, and then writes the output into a metadata importfile in JSON Lines format that can be imported into Dataplex Universal Catalog.
To build a connector using PySpark, do the following:
Clone the
cloud-dataplexrepository.Install PySpark:
pipinstallpysparkInstall requirements:
pip install -r requirements.txtThe following requirements are installed:
google-cloud-dataplex==2.2.2google-cloud-storagegoogle-cloud-secret-managerUpdate the
oracle_connector.pyfile with code to read data from anOracle data source and return DataFrames."""Reads Oracle using PySpark."""fromtypingimportDictfrompyspark.sqlimportSparkSession,DataFramefromsrc.constantsimportEntryTypeSPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"classOracleConnector:"""Reads data from Oracle and returns Spark Dataframes."""def__init__(self,config:Dict[str,str]):# PySpark entrypointself._spark=SparkSession.builder.appName("OracleIngestor") \.config("spark.jars",SPARK_JAR_PATH) \.getOrCreate()self._config=configself._url=f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"def_execute(self,query:str)->DataFrame:"""A generic method to execute any query."""returnself._spark.read.format("jdbc") \.option("driver","oracle.jdbc.OracleDriver") \.option("url",self._url) \.option("query",query) \.option("user",self._config["user"]) \.option("password",self._config["password"]) \.load()defget_db_schemas(self)->DataFrame:"""In Oracle, schemas are usernames."""query="SELECT username FROM dba_users"returnself._execute(query)def_get_columns(self,schema_name:str,object_type:str)->str:"""Gets a list of columns in tables or views in a batch."""# Every line here is a column that belongs to the table or to the view.# This SQL gets data from ALL the tables in a given schema.return(f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "f"col.DATA_TYPE, col.NULLABLE "f"FROM all_tab_columns col "f"INNER JOIN DBA_OBJECTS tab "f"ON tab.OBJECT_NAME = col.TABLE_NAME "f"WHERE tab.OWNER = '{schema_name}' "f"AND tab.OBJECT_TYPE = '{object_type}'")defget_dataset(self,schema_name:str,entry_type:EntryType):"""Gets data for a table or a view."""# Dataset means that these entities can contain end user data.short_type=entry_type.name# table or view, or the title of enum valuequery=self._get_columns(schema_name,short_type)returnself._execute(query)Add SQL queries to return the metadata that you want to import. The queriesneed to return the following information:
- Database schemas
- Tables that belong to these schemas
- Columns that belong to these tables, including the column name, columndata type, and whether the column is nullable or required
All of the columns of all the tables and views are stored in the samesystem table. You can select columns with the
_get_columnsmethod.Depending on the parameters that you provide, you can select columns forthe tables or for the views separately.Note the following:
- In Oracle, a database schema is owned by a database user and has the samename as that user.
- Schema objects are logical structures that are created by users. Objectssuch as tables or indexes can hold data, and objects like views orsynonyms consist of only a definition.
- The
ojdbc11.jarfile contains theOracle JDBC driver.
Update the
src/entry_builder.pyfile with sharedmethods for applying Spark transformations."""Creates entries with PySpark."""importpyspark.sql.functionsasFfrompyspark.sql.typesimportStringTypefromsrc.constantsimportEntryType,SOURCE_TYPEfromsrcimportname_builderasnb@F.udf(returnType=StringType())defchoose_metadata_type_udf(data_type:str):"""Choose the metadata type based on Oracle native type."""ifdata_type.startswith("NUMBER")ordata_typein["FLOAT","LONG"]:return"NUMBER"ifdata_type.startswith("VARCHAR")ordata_type.startswith("NVARCHAR2"):return"STRING"ifdata_type=="DATE":return"DATETIME"return"OTHER"defcreate_entry_source(column):"""Create Entry Source segment."""returnF.named_struct(F.lit("display_name"),column,F.lit("system"),F.lit(SOURCE_TYPE))defcreate_entry_aspect(entry_aspect_name):"""Create aspect with general information (usually it is empty)."""returnF.create_map(F.lit(entry_aspect_name),F.named_struct(F.lit("aspect_type"),F.lit(entry_aspect_name),F.lit("data"),F.create_map()))defconvert_to_import_items(df,aspect_keys):"""Convert entries to import items."""entry_columns=["name","fully_qualified_name","parent_entry","entry_source","aspects","entry_type"]# Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"# and "aspects" string in "update_mask"returndf.withColumn("entry",F.struct(entry_columns)) \.withColumn("aspect_keys",F.array([F.lit(key)forkeyinaspect_keys])) \.withColumn("update_mask",F.array(F.lit("aspects"))) \.drop(*entry_columns)defbuild_schemas(config,df_raw_schemas):"""Create a dataframe with database schemas from the list of usernames. Args: df_raw_schemas - a dataframe with only one column called USERNAME Returns: A dataframe with Dataplex-readable schemas. """entry_type=EntryType.DB_SCHEMAentry_aspect_name=nb.create_entry_aspect_name(config,entry_type)# For schema, parent name is the name of the databaseparent_name=nb.create_parent_name(config,entry_type)# Create user-defined function.create_name_udf=F.udf(lambdax:nb.create_name(config,entry_type,x),StringType())create_fqn_udf=F.udf(lambdax:nb.create_fqn(config,entry_type,x),StringType())# Fills the missed project and location into the entry type stringfull_entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])# Converts a list of schema names to the Dataplex-compatible formcolumn=F.col("USERNAME")df=df_raw_schemas.withColumn("name",create_name_udf(column)) \.withColumn("fully_qualified_name",create_fqn_udf(column)) \.withColumn("parent_entry",F.lit(parent_name)) \.withColumn("entry_type",F.lit(full_entry_type)) \.withColumn("entry_source",create_entry_source(column)) \.withColumn("aspects",create_entry_aspect(entry_aspect_name)) \.drop(column)df=convert_to_import_items(df,[entry_aspect_name])returndfdefbuild_dataset(config,df_raw,db_schema,entry_type):"""Build table entries from a flat list of columns. Args: df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE, and NULLABLE columns db_schema - parent database schema entry_type - entry type: table or view Returns: A dataframe with Dataplex-readable data of tables of views. """schema_key="dataplex-types.global.schema"# The transformation below does the following# 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED# 2. Renames NULLABLE to mode# 3. Renames DATA_TYPE to dataType# 4. Creates metadataType column based on dataType column# 5. Renames COLUMN_NAME to namedf=df_raw \.withColumn("mode",F.when(F.col("NULLABLE")=='Y',"NULLABLE").otherwise("REQUIRED")) \.drop("NULLABLE") \.withColumnRenamed("DATA_TYPE","dataType") \.withColumn("metadataType",choose_metadata_type_udf("dataType")) \.withColumnRenamed("COLUMN_NAME","name")# The transformation below aggregate fields, denormalizing the table# TABLE_NAME becomes top-level filed, and the rest is put into# the array type called "fields"aspect_columns=["name","mode","dataType","metadataType"]df=df.withColumn("columns",F.struct(aspect_columns))\.groupby('TABLE_NAME') \.agg(F.collect_list("columns").alias("fields"))# Create nested structured called aspects.# Fields are becoming a part of a `schema` struct# There is also an entry_aspect that is repeats entry_type as aspect_typeentry_aspect_name=nb.create_entry_aspect_name(config,entry_type)df=df.withColumn("schema",F.create_map(F.lit(schema_key),F.named_struct(F.lit("aspect_type"),F.lit(schema_key),F.lit("data"),F.create_map(F.lit("fields"),F.col("fields")))))\.withColumn("entry_aspect",create_entry_aspect(entry_aspect_name)) \.drop("fields")# Merge separate aspect columns into the one map called 'aspects'df=df.select(F.col("TABLE_NAME"),F.map_concat("schema","entry_aspect").alias("aspects"))# Define user-defined functions to fill the general information# and hierarchy namescreate_name_udf=F.udf(lambdax:nb.create_name(config,entry_type,db_schema,x),StringType())create_fqn_udf=F.udf(lambdax:nb.create_fqn(config,entry_type,db_schema,x),StringType())parent_name=nb.create_parent_name(config,entry_type,db_schema)full_entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])# Fill the top-level fieldscolumn=F.col("TABLE_NAME")df=df.withColumn("name",create_name_udf(column)) \.withColumn("fully_qualified_name",create_fqn_udf(column)) \.withColumn("entry_type",F.lit(full_entry_type)) \.withColumn("parent_entry",F.lit(parent_name)) \.withColumn("entry_source",create_entry_source(column)) \.drop(column)df=convert_to_import_items(df,[schema_key,entry_aspect_name])returndfNote the following:
- The methods build themetadata resources that the connectorcreates for your Oracle resources. Use the conventions that are described in theExample metadata resources for an Oracle sourcesection of this document.
- The
convert_to_import_itemsmethod applies to schemas, tables, andviews. Ensure that the output of the connector is one or more import items thatcan be processed by themetadataJobs.createmethod,not individual entries. - Even in a view, the column is called
TABLE_NAME.
Update the
bootstrap.pyfile with code togenerate the metadata import file and run the connector."""The entrypoint of a pipeline."""fromtypingimportDictfromsrc.constantsimportEntryTypefromsrcimportcmd_readerfromsrcimportsecret_managerfromsrcimportentry_builderfromsrcimportgcs_uploaderfromsrcimporttop_entry_builderfromsrc.oracle_connectorimportOracleConnectorFILENAME="output.jsonl"defwrite_jsonl(output_file,json_strings):"""Writes a list of string to the file in JSONL format."""# For simplicity, dataset is written into the one file. But it is not# mandatory, and the order doesn't matter for Import API.# The PySpark itself could dump entries into many smaller JSONL files.# Due to performance, it's recommended to dump to many smaller files.forstringinjson_strings:output_file.write(string+"\n")defprocess_dataset(connector:OracleConnector,config:Dict[str,str],schema_name:str,entry_type:EntryType,):"""Builds dataset and converts it to jsonl."""df_raw=connector.get_dataset(schema_name,entry_type)df=entry_builder.build_dataset(config,df_raw,schema_name,entry_type)returndf.toJSON().collect()defrun():"""Runs a pipeline."""config=cmd_reader.read_args()config["password"]=secret_manager.get_password(config["password_secret"])connector=OracleConnector(config)withopen(FILENAME,"w",encoding="utf-8")asfile:# Write top entries that don't require connection to the databasefile.writelines(top_entry_builder.create(config,EntryType.INSTANCE))file.writelines("\n")file.writelines(top_entry_builder.create(config,EntryType.DATABASE))# Get schemas, write them and collect to the listdf_raw_schemas=connector.get_db_schemas()schemas=[schema.USERNAMEforschemaindf_raw_schemas.select("USERNAME").collect()]schemas_json=entry_builder.build_schemas(config,df_raw_schemas).toJSON().collect()write_jsonl(file,schemas_json)# Ingest tables and views for every schema in a listforschemainschemas:print(f"Processing tables for{schema}")tables_json=process_dataset(connector,config,schema,EntryType.TABLE)write_jsonl(file,tables_json)print(f"Processing views for{schema}")views_json=process_dataset(connector,config,schema,EntryType.VIEW)write_jsonl(file,views_json)gcs_uploader.upload(config,FILENAME)This example saves the metadata import file as a single JSON Lines file. Youcan use PySpark tools like the
DataFrameWriterclass to output batches ofJSON in parallel.The connector can write entries to the metadata import file in any order.
Update the
gcs_uploader.pyfile with code to upload the metadata importfile to a Cloud Storage bucket."""Sends files to GCP storage."""fromtypingimportDictfromgoogle.cloudimportstoragedefupload(config:Dict[str,str],filename:str):"""Uploads a file to GCP bucket."""client=storage.Client()bucket=client.get_bucket(config["output_bucket"])folder=config["output_folder"]blob=bucket.blob(f"{folder}/{filename}")blob.upload_from_filename(filename)Build the connector image.
If your connector contains multiple files, or if you want to uselibraries that aren't included in the default Docker image, you must use acustom container.Serverless for Apache Spark runs workloads within Dockercontainers. Create a custom Docker image of the connector and store theimage inArtifact Registry.Serverless for Apache Spark reads the image from Artifact Registry.
Note: If the connector contains only one file, you can send the job toServerless for Apache Spark directly. Skip this step.Create a Dockerfile:
FROMdebian:11-slimENVDEBIAN_FRONTEND=noninteractiveRUNaptupdate &&aptinstall-yprocpstiniRUNaptinstall-ywgetENVSPARK_EXTRA_JARS_DIR=/opt/spark/jars/RUNmkdir-p"${SPARK_EXTRA_JARS_DIR}"COPYojdbc11.jar"${SPARK_EXTRA_JARS_DIR}"ENVCONDA_HOME=/opt/miniconda3ENVPYSPARK_PYTHON=${CONDA_HOME}/bin/pythonENVPATH=${CONDA_HOME}/bin:${PATH}RUNwgethttps://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.shRUNbashMiniconda3-py310_23.3.1-0-Linux-x86_64.sh-b-p/opt/miniconda3\ &&${CONDA_HOME}/bin/condaconfig--system--setalways_yesTrue\ &&${CONDA_HOME}/bin/condaconfig--system--setauto_update_condaFalse\ &&${CONDA_HOME}/bin/condaconfig--system--prependchannelsconda-forge\ &&${CONDA_HOME}/bin/condaconfig--system--setchannel_prioritystrictRUN${CONDA_HOME}/bin/condainstallmamba-nbase-cconda-forge\ &&${CONDA_HOME}/bin/mambainstall\conda\google-cloud-dataproc\google-cloud-logging\google-cloud-monitoring\google-cloud-storageRUNaptupdate &&aptinstall-ygitCOPYrequirements.txt.RUNpython-mpipinstall-rrequirements.txtENVPYTHONPATH=/opt/python/packagesRUNmkdir-p"${PYTHONPATH}/src/"COPYsrc/"${PYTHONPATH}/src/"COPYmain.py.RUNgroupadd-g1099sparkRUNuseradd-u1099-g1099-d/home/spark-msparkUSERsparkUse Conda as your package manager. Serverless for Apache Sparkmounts
pysparkinto the container at runtime, so youdon't need to install PySpark dependencies in your custom container image.Build the custom container image and push it to Artifact Registry.
#!/bin/bashIMAGE=oracle-pyspark:0.0.1PROJECT=<PROJECT_ID>REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pysparkdockerbuild-t"${IMAGE}".# Tag and push to GCP container registrygcloudconfigsetproject${PROJECT}gcloudauthconfigure-dockerus-central1-docker.pkg.devdockertag"${IMAGE}""${REPO_IMAGE}"dockerpush"${REPO_IMAGE}"Because one image can have multiple names, you can use the Docker tag toassign an alias to the image.
Run the connector on Serverless for Apache Spark.To submit a PySpark batch job using the custom container image, run the
gcloud dataproc batches submit pysparkcommand.gcloud dataproc batches submit pyspark main.py --project=PROJECT \ --region=REGION --batch=BATCH_ID \ --container-image=CUSTOM_CONTAINER_IMAGE \ --service-account=SERVICE_ACCOUNT_NAME \ --jars=PATH_TO_JAR_FILES \ --properties=PYSPARK_PROPERTIES \ --PIPELINE_ARGUMENTSNote the following:
- The JAR files are drivers for Spark. To read from Oracle, MySQL, orPostgres, you must provide Apache Spark a specific package. The packagecan be located in Cloud Storage or inside the container. If theJAR file is inside the container, the path is similar to
file:///path/to/file/driver.jar. In this example, the path to theJAR file is/opt/spark/jars/. - PIPELINE_ARGUMENTS are the command-line argumentsfor the connector.
The connector extracts metadata from the Oracle database, generates ametadata import file, and saves the metadata import file to aCloud Storage bucket.
- The JAR files are drivers for Spark. To read from Oracle, MySQL, orPostgres, you must provide Apache Spark a specific package. The packagecan be located in Cloud Storage or inside the container. If theJAR file is inside the container, the path is similar to
To manually import the metadata in the metadata import file intoDataplex Universal Catalog, run a metadata job. Use the
metadataJobs.createmethod.In the command line, add environment variables and create an alias forthe curl command.
PROJECT_ID=PROJECTLOCATION_ID=LOCATIONDATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_IDaliasgcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'Call the API method, passing the entry types and aspect types that youwant to import.
gcurlhttps://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID"-d"$(cat<<EOF{"type":"IMPORT","import_spec":{"source_storage_uri":"gs://BUCKET/FOLDER/","entry_sync_mode":"FULL","aspect_sync_mode":"INCREMENTAL","scope":{"entry_groups":["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],"entry_types":["projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance","projects/PROJECT/locations/LOCATION/entryTypes/oracle-database","projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema","projects/PROJECT/locations/LOCATION/entryTypes/oracle-table","projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],"aspect_types":["projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance","projects/dataplex-types/locations/global/aspectTypes/schema","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],},},}EOF)"The
schemaaspect type is a global aspect type that is defined byDataplex Universal Catalog.Note that the format that you use for aspect type names when calling theAPI method is different from the format that you use in the connectorcode.
Optional: Use Cloud Logging to view logs for the metadata job. Formore information, seeMonitor Dataplex Universal Catalog logs.
Set up pipeline orchestration
The previous sections showed how to build an example connector and run theconnector manually.
In a production environment, you run the connector as part of a managedconnectivity pipeline, by using an orchestration platform likeWorkflows.
To run a managed connectivity pipeline with the example connector, follow thesteps to import metadata using Workflows.Do these things:
- Create the workflow in the same Google Cloud location as theconnector.
In the workflow definition file, update the
submit_pyspark_extract_jobfunction with the following code to extract data from the Oracle databaseusing the connector that you created.-submit_pyspark_extract_job:call:http.postargs:url:${"https://dataproc.googleapis.com/v1/projects/"+args.TARGET_PROJECT_ID+"/locations/"+args.CLOUD_REGION+"/batches"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"headers:Content-Type:"application/json"query:batchId:${WORKFLOW_ID}body:pysparkBatch:mainPythonFileUri:file:///main.pyjars:file:///opt/spark/jars/ojdbc11.jarargs:-${"--host_port="+args.ORACLE_HOST_PORT}-${"--user="+args.ORACLE_USER}-${"--password="+args.ORACLE_PASSWORD}-${"--database="+args.ORACE_DATABASE}-${"--project="+args.TARGET_PROJECT_ID}-${"--location="+args.CLOUD_REGION}-${"--entry_group="+args.TARGET_ENTRY_GROUP_ID}-${"--bucket="+args.CLOUD_STORAGE_BUCKET_ID}-${"--folder="+WORKFLOW_ID}runtimeConfig:version:"2.0"containerImage:"us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"environmentConfig:executionConfig:serviceAccount:${args.SERVICE_ACCOUNT}result:RESPONSE_MESSAGEIn the workflow definition file, update the
submit_import_jobfunctionwith the following code to import the entries. The function calls themetadataJobs.createAPI method to run a metadata import job.-submit_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID +"/locations/" + args.CLOUD_REGION +"/metadataJobs?metadata_job_id=" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"body:type:IMPORTimport_spec:source_storage_uri:${"gs://" + args.CLOUD_STORAGE_BUCKET_ID +"/" + WORKFLOW_ID +"/"}entry_sync_mode:FULLaspect_sync_mode:INCREMENTALscope:entry_groups:-${"projects/" + args.TARGET_PROJECT_ID +"/locations/" + args.CLOUD_REGION +"/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}entry_types:-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"aspect_types:-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"-"projects/dataplex-types/locations/global/aspectTypes/schema"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"result:IMPORT_JOB_RESPONSEProvide the same entry types and aspect types that you included when youcalled the API method manually. Note that there isn't a comma at the endof each string.
When you execute the workflow, provide the following runtime arguments:
{"CLOUD_REGION":"us-central1","ORACLE_USER":"system","ORACLE_HOST_PORT":"x.x.x.x:1521","ORACLE_DATABASE":"xe","ADDITIONAL_CONNECTOR_ARGS":[],}
Optional: Use Cloud Logging to view logs for the managed connectivitypipeline. The log payload includes a link to the logs for theServerless for Apache Spark batch job and the metadata import job,as relevant. For more information, seeView workflow logs.
Optional: To improve the security, performance, and functionality of yourmanaged connectivity pipeline, consider doing the following things:
- Use Secret Managerto store the credentials for your third-party data source.
- Use PySpark to write the JSON Lines output into multiple metadata importfiles in parallel.
- Use a prefix to split big files (more than 100 MB) into smallerfiles.
- Add more custom aspects that capture additional business and technicalmetadata from your source.
Example metadata resources for an Oracle source
The example connector extracts metadata from an Oracle database and maps themetadata to corresponding Dataplex Universal Catalog metadata resources.
Hierarchy considerations
Every system in Dataplex Universal Catalog has a root entry that is the parententry for the system. Usually the root entry has aninstance entry type.The following table shows the example hierarchy of entry types and aspect typesfor an Oracle system. For example, theoracle-database entry type is linked toan aspect type that is also namedoracle-database.
| Entry type ID | Description | Linked aspect type ID |
|---|---|---|
oracle-instance | The root of the imported system. | oracle-instance |
oracle-database | The Oracle database. | oracle-database |
oracle-schema | The database schema. | oracle-schema |
oracle-table | A table. |
|
oracle-view | A view. |
|
Theschema aspect type is a global aspect type that is defined byDataplex Universal Catalog. It contains a description of the fields in a table,view, or other entity that has columns. Theoracle-schema custom aspect typecontains the name of the Oracle database schema.
- You can use a different name for the root entry if it's relevant for your system. For example, you can name the root entry for an Apache Kafka system as
cluster. - Depending on the system, you might have only schemas or databases. For many relational database management systems, "schema" and "database" are synonyms. To reduce the number of entry types in the hierarchy, consider choosing one term to use for all similar resources.
Example import item fields
The connector should use the following conventions for Oracle resources.
Fully qualified names: fully qualified names for Oracle resources use the following naming template. Forbidden characters are escaped with backticks.
Resource Template Example Instance SOURCE:ADDRESSUse the host and port number or the domain name of the system.
oracle:`localhost:1521`ororacle:`myinstance.com`Database SOURCE:ADDRESS.DATABASEoracle:`localhost:1521`.xeSchema SOURCE:ADDRESS.DATABASE.SCHEMAoracle:`localhost:1521`.xe.sysTable SOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAMEoracle:`localhost:1521`.xe.sys.ordersView SOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAMEoracle:`localhost:1521`.xe.sys.orders_viewEntry names or entry IDs: entries for Oracle resources use the following naming template. Forbidden characters are replaced with a permitted character. Resources use the prefix
projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.Resource Template Example Instance PREFIX/HOST_PORTprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521Database PREFIX/HOST_PORT/databases/DATABASEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xeSchema PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMAprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sysTable PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/ordersView PREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEWprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_viewParent entries: if an entry isn't a root entry for the system, the entry can have a parent entry field that describes its position in the hierarchy. The field should contain the name of the parent entry. We recommend that you generate this value.
The following table shows the parent entries for Oracle resources.
Entry Parent entry Instance ""(empty string)Database Instance name Schema Database name Table Schema name View Schema name Aspect map: the aspect map must contain at least one aspect that describes the entity to import. Here's an example aspect map for an Oracle table.
"example-project.us-central1.oracle-table":{"aspect_type":"example-project.us-central1.oracle-table","path":"","data":{}},
You can find predefined aspect types (like
schema) that define the table or view structure in thedataplex-typesproject, in thegloballocation.Aspect keys: aspect keys use the naming formatPROJECT.LOCATION.ASPECT_TYPE. The following table shows example aspect keys for Oracle resources.
Entry Example aspect key Instance example-project.us-central1.oracle-instanceDatabase example-project.us-central1.oracle-databaseSchema example-project.us-central1.oracle-schemaTable example-project.us-central1.oracle-tableView example-project.us-central1.oracle-view
What's next
Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2026-02-19 UTC.