Develop a custom connector for metadata import

This document provides a reference template to build a custom connector forextracting metadata from third-party sources, such as MySQL, SQL Server, andOracle. You can use this connector to import metadata into Dataplex Universal Catalogthrough amanaged connectivity pipeline.An example Python connector for Oracle Database Express Edition (XE) is includedas a starting point. You can also develop connectors using Java, Scala, or R.

How connectors work

A connector extracts metadata from a third-party data source, transforms themetadata to Dataplex Universal CatalogImportItem format, and generatesmetadata import files that can be imported by Dataplex Universal Catalog.

The connector is a part of a managed connectivity pipeline. A managedconnectivity pipeline is an orchestrated workflow that you use to importDataplex Universal Catalog metadata. The managed connectivity pipelineruns the connector and performs other tasks in the import workflow, such asrunning a metadata import job and capturing logs.

The managed connectivity pipeline runs the connector by using aGoogle Cloud Serverless for Apache Sparkbatch job. Serverless for Apache Spark provides aserverless Spark execution environment. Although you can build a connector thatdoesn't use Spark, we recommend that you use Spark because it can improve theperformance of your connector.

Connector requirements

The connector has the following requirements:

  • The connector must be an Artifact Registry image that can be run onServerless for Apache Spark.
  • The connector must generate metadata files in a format that can be importedby a Dataplex Universal Catalog metadata import job (themetadataJobs.createAPI method). For detailed requirements, seeMetadata import file.
  • The connector must accept the following command-line arguments to receiveinformation from the pipeline:

    Command-line argumentValue that pipeline provides
    target_project_idPROJECT_ID
    target_location_idREGION
    target_entry_group_idENTRY_GROUP_ID
    output_bucketCLOUD_STORAGE_BUCKET_ID
    output_folderFOLDER_ID

    The connector uses these arguments to generate metadata in a target entry groupprojects/PROJECT_ID/locations/REGION/entryGroups/ENTRY_GROUP_ID,and to write to a Cloud Storage bucketgs://CLOUD_STORAGE_BUCKET_ID/FOLDER_ID.Each execution of the pipeline creates a new folderFOLDER_ID in bucketCLOUD_STORAGE_BUCKET_ID. The connector should writemetadata import files to this folder.

The pipeline templates support PySpark connectors. The templates assume that the driver(mainPythonFileUri)is a local file on the connector image namedmain.py. You can modify thepipeline templates for other scenarios, such as a Spark connector, a differentdriver URI, or other options.

Here's how you use PySpark to create an import item in the metadata importfile.

"""PySpark schemas for the data."""entry_source_schema=StructType([StructField("display_name",StringType()),StructField("source",StringType())])aspect_schema=MapType(StringType(),StructType([StructField("aspect_type",StringType()),StructField("data",StructType([]))]))entry_schema=StructType([StructField("name",StringType()),StructField("entry_type",StringType()),StructField("fully_qualified_name",StringType()),StructField("parent_entry",StringType()),StructField("entry_source",entry_source_schema),StructField("aspects",aspect_schema)])import_item_schema=StructType([StructField("entry",entry_schema),StructField("aspect_keys",ArrayType(StringType())),StructField("update_mask",ArrayType(StringType()))])

Before you begin

This guide assumes that you're familiar with Python and PySpark.

Review the following information:

Do the following things. Create all resources in the same Google Cloudlocation.

  1. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.create permission.Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects createPROJECT_ID

      ReplacePROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set projectPROJECT_ID

      ReplacePROJECT_ID with your Google Cloud project name.

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the Dataplex, Dataproc, Workflows, and Artifact Registry APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enable permission.Learn how to grant roles.

    gcloudservicesenabledataplex.googleapis.com dataproc.googleapis.com workflows.googleapis.com artifactregistry.googleapis.com
  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. Toinitialize the gcloud CLI, run the following command:

    gcloudinit
  7. Grant roles to your user account. Run the following command once for each of the following IAM roles:roles/resourcemanager.projectCreator, roles/billing.projectManager, roles/serviceusage.admin, roles/iam.serviceAccountCreator, roles/iam.securityAdmin, roles/storage.admin, roles/artifactregistry.writer, roles/dataplex.entryGroupOwner, roles/dataplex.entryOwner, roles/dataplex.aspectTypeOwner

    gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE

    Replace the following:

    • PROJECT_ID: Your project ID.
    • USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.
    • ROLE: The IAM role that you grant to your user account.
  8. Set up authentication:

    1. Ensure that you have the Create Service Accounts IAM role (roles/iam.serviceAccountCreator) and the Project IAM Admin role (roles/resourcemanager.projectIamAdmin).Learn how to grant roles.
    2. Create the service account:

      gcloudiamservice-accountscreateSERVICE_ACCOUNT_NAME

      ReplaceSERVICE_ACCOUNT_NAME with a name for the service account.

    3. Grant theroles/owner IAM role to the service account:

      gcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com"--role=roles/owner

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      Note: The--role flag affects which resources the service account can access in your project. You can revoke these roles or grant additional roles later. In production environments, do not grant the Owner, Editor, or Viewer roles. Instead, grant apredefined role orcustom role that meets your needs.
  9. Create a Cloud Storage bucket to store the metadata import files.

  10. Create the following metadata resources in the same project.

    Note: This document assumes all the resources are in the same project for simplicity. If you want to create resources different projects, you can adjust the sample code.

    For example values, see theExample metadata resources for an Oracle source section of this document.

    1. Create an entry group.
    2. Create custom aspect types for the entries that you want to import. Use the naming conventionSOURCE-ENTITY_TO_IMPORT.

      For example, for an Oracle database, create an aspect type namedoracle-database.

      Optionally, you can create additional aspect types to store other information.

    3. Create custom entry types for the resources that you want to import, and assign the relevant aspect types to them. Use the naming conventionSOURCE-ENTITY_TO_IMPORT.

      For example, for an Oracle database, create an entry type namedoracle-database. Link it to the aspect type that is namedoracle-database.

  11. Ensure that your third-party source is accessible from your Google Cloud project. For more information, seeServerless for Apache Spark network configuration.

Create a basic Python connector

The example basic Python connector creates top-level entries for anOracle data source by using the Dataplex Universal Catalog client library classes.Then, you provide the values for the entry fields.

The connector creates a metadata import file with the following entries:

  • Aninstance entry, with entry typeprojects/PROJECT_ID/locations/LOCATION/entryTypes/oracle-instance.This entry represents an Oracle Database XE system.
  • Adatabase entry, which represents a database inside the Oracle Database XEsystem.
Note: This basic example lets you create the metadata import file without aconnection to Oracle Database XE. To extract metadata in production, you need toset up a connection to your data source so that the connector can read datafrom it.

To build a basic Python connector, do the following:

  1. Clone thecloud-dataplex repository.

  2. Set up a local environment. We recommend that you use a virtual environment.

    mkdir venvpython -m venv venv/source venv/bin/activate

    Use theactiveormaintenanceversions of Python. Python versions 3.7 and later are supported.

  3. Create a Python project.

  4. Install requirements:

    pip install -r requirements.txt

    The following requirements are installed:

    google-cloud-dataplex==2.2.2google-cloud-storagegoogle-cloud-secret-manager
  5. Add amain.py pipeline file on the root of the project.

    fromsrcimportbootstrapif__name__=='__main__':bootstrap.run()

    When deploying your code to Serverless for Apache Spark, themain.pyfile serves as the entry point for execution. We recommend that you minimizethe amount of information that is stored in themain.py file; use thisfile to call functions and classes that are defined within your connector,such as thesrc/bootstap.py class.

  6. Create asrc folder to store the majority of the logic for your connector.

  7. Update thesrc/cmd_reader.py file with a Python class to accept command-linearguments. You can use theargeparsemodule to do this.

    """Command line reader."""importargparsedefread_args():"""Reads arguments from the command line."""parser=argparse.ArgumentParser()# Dataplex argumentsparser.add_argument("--target_project_id",type=str,required=True,help="The name of the target Google Cloud project to import the metadata into.")parser.add_argument("--target_location_id",type=str,required=True,help="The target Google Cloud location where the metadata will be imported into.")parser.add_argument("--target_entry_group_id",type=str,required=True,help="The ID of the entry group to import metadata into. ""The metadata will be imported into entry group with the following""full resource name: projects/${target_project_id}/""locations/${target_location_id}/entryGroups/${target_entry_group_id}.")# Oracle argumentsparser.add_argument("--host_port",type=str,required=True,help="Oracle host and port number separated by the colon (:).")parser.add_argument("--user",type=str,required=True,help="Oracle User.")parser.add_argument("--password-secret",type=str,required=True,help="Secret resource name in the Secret Manager for the Oracle password.")parser.add_argument("--database",type=str,required=True,help="Source Oracle database.")# Google Cloud Storage arguments# It is assumed that the bucket is in the same region as the entry groupparser.add_argument("--output_bucket",type=str,required=True,help="The Cloud Storage bucket to write the generated metadata import file.")parser.add_argument("--output_folder",type=str,required=True,help="A folder in the Cloud Storage bucket, to write the generated metadata import files.")returnvars(parser.parse_known_args()[0])

    In production environments, we recommend that you store the password inSecret Manager.

  8. Update thesrc/constants.py file with code to create constants.

    """Constants that are used in the different files."""importenumSOURCE_TYPE="oracle"# Symbols for replacementFORBIDDEN="#"ALLOWED="!"classEntryType(enum.Enum):"""Types of Oracle entries."""INSTANCE:str="projects/{project}/locations/{location}/entryTypes/oracle-instance"DATABASE:str="projects/{project}/locations/{location}/entryTypes/oracle-database"DB_SCHEMA:str="projects/{project}/locations/{location}/entryTypes/oracle-schema"TABLE:str="projects/{project}/locations/{location}/entryTypes/oracle-table"VIEW:str="projects/{project}/locations/{location}/entryTypes/oracle-view"
  9. Update thesrc/name_builder.py file with methods to build themetadata resources that you want the connector to createfor your Oracle resources. Use the conventions that are described in theExample metadata resources for an Oracle sourcesection of this document.

    """Builds Dataplex hierarchy identifiers."""fromtypingimportDictfromsrc.constantsimportEntryType,SOURCE_TYPE# Oracle cluster users start with C## prefix, but Dataplex doesn't accept #.# In that case in names it is changed to C!!, and escaped with backticks in FQNsFORBIDDEN_SYMBOL="#"ALLOWED_SYMBOL="!"defcreate_fqn(config:Dict[str,str],entry_type:EntryType,schema_name:str="",table_name:str=""):"""Creates a fully qualified name or Dataplex v1 hierarchy name."""ifFORBIDDEN_SYMBOLinschema_name:schema_name=f"`{schema_name}`"ifentry_type==EntryType.INSTANCE:# Requires backticks to escape columnreturnf"{SOURCE_TYPE}:`{config['host_port']}`"ifentry_type==EntryType.DATABASE:instance=create_fqn(config,EntryType.INSTANCE)returnf"{instance}.{config['database']}"ifentry_type==EntryType.DB_SCHEMA:database=create_fqn(config,EntryType.DATABASE)returnf"{database}.{schema_name}"ifentry_typein[EntryType.TABLE,EntryType.VIEW]:database=create_fqn(config,EntryType.DATABASE)returnf"{database}.{schema_name}.{table_name}"return""defcreate_name(config:Dict[str,str],entry_type:EntryType,schema_name:str="",table_name:str=""):"""Creates a Dataplex v2 hierarchy name."""ifFORBIDDEN_SYMBOLinschema_name:schema_name=schema_name.replace(FORBIDDEN_SYMBOL,ALLOWED_SYMBOL)ifentry_type==EntryType.INSTANCE:name_prefix=(f"projects/{config['target_project_id']}/"f"locations/{config['target_location_id']}/"f"entryGroups/{config['target_entry_group_id']}/"f"entries/")returnname_prefix+config["host_port"].replace(":","@")ifentry_type==EntryType.DATABASE:instance=create_name(config,EntryType.INSTANCE)returnf"{instance}/databases/{config['database']}"ifentry_type==EntryType.DB_SCHEMA:database=create_name(config,EntryType.DATABASE)returnf"{database}/database_schemas/{schema_name}"ifentry_type==EntryType.TABLE:db_schema=create_name(config,EntryType.DB_SCHEMA,schema_name)returnf"{db_schema}/tables/{table_name}"ifentry_type==EntryType.VIEW:db_schema=create_name(config,EntryType.DB_SCHEMA,schema_name)returnf"{db_schema}/views/{table_name}"return""defcreate_parent_name(config:Dict[str,str],entry_type:EntryType,parent_name:str=""):"""Generates a Dataplex v2 name of the parent."""ifentry_type==EntryType.DATABASE:returncreate_name(config,EntryType.INSTANCE)ifentry_type==EntryType.DB_SCHEMA:returncreate_name(config,EntryType.DATABASE)ifentry_type==EntryType.TABLE:returncreate_name(config,EntryType.DB_SCHEMA,parent_name)return""defcreate_entry_aspect_name(config:Dict[str,str],entry_type:EntryType):"""Generates an entry aspect name."""last_segment=entry_type.value.split("/")[-1]returnf"{config['target_project_id']}.{config['target_location_id']}.{last_segment}"

    Because thename_builder.py file is used for both the Python core code andthe PySpark core code, we recommend that you write the methods as purefunctions, instead of as members of a class.

  10. Update thesrc/top_entry_builder.py file with code tofill the top-level entries with data.

    """Non-Spark approach for building the entries."""importdataclassesimportjsonfromtypingimportList,Dictimportprotofromgoogle.cloudimportdataplex_v1fromsrc.constantsimportEntryTypefromsrcimportname_builderasnb@dataclasses.dataclass(slots=True)classImportItem:"""A template class for Import API."""entry:dataplex_v1.Entry=dataclasses.field(default_factory=dataplex_v1.Entry)aspect_keys:List[str]=dataclasses.field(default_factory=list)update_mask:List[str]=dataclasses.field(default_factory=list)def_dict_factory(data:object):"""Factory function required for converting Entry dataclass to dict."""defconvert(obj:object):ifisinstance(obj,proto.Message):returnproto.Message.to_dict(obj)returnobjreturndict((k,convert(v))fork,vindata)def_create_entry(config:Dict[str,str],entry_type:EntryType):"""Creates an entry based on a Dataplex library."""entry=dataplex_v1.Entry()entry.name=nb.create_name(config,entry_type)entry.entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])entry.fully_qualified_name=nb.create_fqn(config,entry_type)entry.parent_entry=nb.create_parent_name(config,entry_type)aspect_key=nb.create_entry_aspect_name(config,entry_type)# Add mandatory aspectentry_aspect=dataplex_v1.Aspect()entry_aspect.aspect_type=aspect_keyentry_aspect.data={}entry.aspects[aspect_key]=entry_aspectreturnentrydef_entry_to_import_item(entry:dataplex_v1.Entry):"""Packs entry to import item, accepted by the API,"""import_item=ImportItem()import_item.entry=entryimport_item.aspect_keys=list(entry.aspects.keys())import_item.update_mask="aspects"returnimport_itemdefcreate(config,entry_type:EntryType):"""Creates an entry, packs it to Import Item and converts to json."""import_item=_entry_to_import_item(_create_entry(config,entry_type))returnjson.dumps(dataclasses.asdict(import_item,dict_factory=_dict_factory))
  11. Update thesrc/bootstrap.py file with code togenerate the metadata import file and run the connector.

    """The entrypoint of a pipeline."""fromtypingimportDictfromsrc.constantsimportEntryTypefromsrcimportcmd_readerfromsrcimportsecret_managerfromsrcimportentry_builderfromsrcimportgcs_uploaderfromsrcimporttop_entry_builderfromsrc.oracle_connectorimportOracleConnectorFILENAME="output.jsonl"defwrite_jsonl(output_file,json_strings):"""Writes a list of string to the file in JSONL format."""# For simplicity, dataset is written into the one file. But it is not# mandatory, and the order doesn't matter for Import API.# The PySpark itself could dump entries into many smaller JSONL files.# Due to performance, it's recommended to dump to many smaller files.forstringinjson_strings:output_file.write(string+"\n")defprocess_dataset(connector:OracleConnector,config:Dict[str,str],schema_name:str,entry_type:EntryType,):"""Builds dataset and converts it to jsonl."""df_raw=connector.get_dataset(schema_name,entry_type)df=entry_builder.build_dataset(config,df_raw,schema_name,entry_type)returndf.toJSON().collect()defrun():"""Runs a pipeline."""config=cmd_reader.read_args()config["password"]=secret_manager.get_password(config["password_secret"])connector=OracleConnector(config)withopen(FILENAME,"w",encoding="utf-8")asfile:# Write top entries that don't require connection to the databasefile.writelines(top_entry_builder.create(config,EntryType.INSTANCE))file.writelines("\n")file.writelines(top_entry_builder.create(config,EntryType.DATABASE))# Get schemas, write them and collect to the listdf_raw_schemas=connector.get_db_schemas()schemas=[schema.USERNAMEforschemaindf_raw_schemas.select("USERNAME").collect()]schemas_json=entry_builder.build_schemas(config,df_raw_schemas).toJSON().collect()write_jsonl(file,schemas_json)# Ingest tables and views for every schema in a listforschemainschemas:print(f"Processing tables for{schema}")tables_json=process_dataset(connector,config,schema,EntryType.TABLE)write_jsonl(file,tables_json)print(f"Processing views for{schema}")views_json=process_dataset(connector,config,schema,EntryType.VIEW)write_jsonl(file,views_json)gcs_uploader.upload(config,FILENAME)
  12. Run the code locally.

    A metadata import file namedoutput.jsonl is returned. The file has twolines, each representing an import item. The managed connectivity pipelinereads this file when running the metadata import job.

  13. Optional: Extend the previous example to use the Dataplex Universal Catalogclient library classes to create import items for tables, schemas, and views.You can also run the Python example on Serverless for Apache Spark.

    We recommend that you create a connector that uses Spark (and runson Serverless for Apache Spark), because it can improve theperformance of your connector.

Create a PySpark connector

This example is based on thePySpark DataFrame API. You can install PySpark SQL and run it locally beforerunning on Serverless for Apache Spark. If you install and run PySparklocally, install the PySpark library by using pip, but you don't need to installa local Spark cluster.

For performance reasons, this example doesn't use predefined classes from thePySpark library. Instead, the example creates DataFrames, converts theDataFrames into JSON entries, and then writes the output into a metadata importfile in JSON Lines format that can be imported into Dataplex Universal Catalog.

To build a connector using PySpark, do the following:

  1. Clone thecloud-dataplex repository.

  2. Install PySpark:

    pipinstallpyspark
  3. Install requirements:

    pip install -r requirements.txt

    The following requirements are installed:

    google-cloud-dataplex==2.2.2google-cloud-storagegoogle-cloud-secret-manager
  4. Update theoracle_connector.py file with code to read data from anOracle data source and return DataFrames.

    """Reads Oracle using PySpark."""fromtypingimportDictfrompyspark.sqlimportSparkSession,DataFramefromsrc.constantsimportEntryTypeSPARK_JAR_PATH="/opt/spark/jars/ojdbc11.jar"classOracleConnector:"""Reads data from Oracle and returns Spark Dataframes."""def__init__(self,config:Dict[str,str]):# PySpark entrypointself._spark=SparkSession.builder.appName("OracleIngestor") \.config("spark.jars",SPARK_JAR_PATH) \.getOrCreate()self._config=configself._url=f"jdbc:oracle:thin:@{config['host_port']}:{config['database']}"def_execute(self,query:str)->DataFrame:"""A generic method to execute any query."""returnself._spark.read.format("jdbc") \.option("driver","oracle.jdbc.OracleDriver") \.option("url",self._url) \.option("query",query) \.option("user",self._config["user"]) \.option("password",self._config["password"]) \.load()defget_db_schemas(self)->DataFrame:"""In Oracle, schemas are usernames."""query="SELECT username FROM dba_users"returnself._execute(query)def_get_columns(self,schema_name:str,object_type:str)->str:"""Gets a list of columns in tables or views in a batch."""# Every line here is a column that belongs to the table or to the view.# This SQL gets data from ALL the tables in a given schema.return(f"SELECT col.TABLE_NAME, col.COLUMN_NAME, "f"col.DATA_TYPE, col.NULLABLE "f"FROM all_tab_columns col "f"INNER JOIN DBA_OBJECTS tab "f"ON tab.OBJECT_NAME = col.TABLE_NAME "f"WHERE tab.OWNER = '{schema_name}' "f"AND tab.OBJECT_TYPE = '{object_type}'")defget_dataset(self,schema_name:str,entry_type:EntryType):"""Gets data for a table or a view."""# Dataset means that these entities can contain end user data.short_type=entry_type.name# table or view, or the title of enum valuequery=self._get_columns(schema_name,short_type)returnself._execute(query)

    Add SQL queries to return the metadata that you want to import. The queriesneed to return the following information:

    • Database schemas
    • Tables that belong to these schemas
    • Columns that belong to these tables, including the column name, columndata type, and whether the column is nullable or required

    All of the columns of all the tables and views are stored in the samesystem table. You can select columns with the_get_columns method.Depending on the parameters that you provide, you can select columns forthe tables or for the views separately.

    Note the following:

    • In Oracle, a database schema is owned by a database user and has the samename as that user.
    • Schema objects are logical structures that are created by users. Objectssuch as tables or indexes can hold data, and objects like views orsynonyms consist of only a definition.
    • Theojdbc11.jar file contains theOracle JDBC driver.
  5. Update thesrc/entry_builder.py file with sharedmethods for applying Spark transformations.

    """Creates entries with PySpark."""importpyspark.sql.functionsasFfrompyspark.sql.typesimportStringTypefromsrc.constantsimportEntryType,SOURCE_TYPEfromsrcimportname_builderasnb@F.udf(returnType=StringType())defchoose_metadata_type_udf(data_type:str):"""Choose the metadata type based on Oracle native type."""ifdata_type.startswith("NUMBER")ordata_typein["FLOAT","LONG"]:return"NUMBER"ifdata_type.startswith("VARCHAR")ordata_type.startswith("NVARCHAR2"):return"STRING"ifdata_type=="DATE":return"DATETIME"return"OTHER"defcreate_entry_source(column):"""Create Entry Source segment."""returnF.named_struct(F.lit("display_name"),column,F.lit("system"),F.lit(SOURCE_TYPE))defcreate_entry_aspect(entry_aspect_name):"""Create aspect with general information (usually it is empty)."""returnF.create_map(F.lit(entry_aspect_name),F.named_struct(F.lit("aspect_type"),F.lit(entry_aspect_name),F.lit("data"),F.create_map()))defconvert_to_import_items(df,aspect_keys):"""Convert entries to import items."""entry_columns=["name","fully_qualified_name","parent_entry","entry_source","aspects","entry_type"]# Puts entry to "entry" key, a list of keys from aspects in "aspects_keys"# and "aspects" string in "update_mask"returndf.withColumn("entry",F.struct(entry_columns)) \.withColumn("aspect_keys",F.array([F.lit(key)forkeyinaspect_keys])) \.withColumn("update_mask",F.array(F.lit("aspects"))) \.drop(*entry_columns)defbuild_schemas(config,df_raw_schemas):"""Create a dataframe with database schemas from the list of usernames.    Args:        df_raw_schemas - a dataframe with only one column called USERNAME    Returns:        A dataframe with Dataplex-readable schemas.    """entry_type=EntryType.DB_SCHEMAentry_aspect_name=nb.create_entry_aspect_name(config,entry_type)# For schema, parent name is the name of the databaseparent_name=nb.create_parent_name(config,entry_type)# Create user-defined function.create_name_udf=F.udf(lambdax:nb.create_name(config,entry_type,x),StringType())create_fqn_udf=F.udf(lambdax:nb.create_fqn(config,entry_type,x),StringType())# Fills the missed project and location into the entry type stringfull_entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])# Converts a list of schema names to the Dataplex-compatible formcolumn=F.col("USERNAME")df=df_raw_schemas.withColumn("name",create_name_udf(column)) \.withColumn("fully_qualified_name",create_fqn_udf(column)) \.withColumn("parent_entry",F.lit(parent_name)) \.withColumn("entry_type",F.lit(full_entry_type)) \.withColumn("entry_source",create_entry_source(column)) \.withColumn("aspects",create_entry_aspect(entry_aspect_name)) \.drop(column)df=convert_to_import_items(df,[entry_aspect_name])returndfdefbuild_dataset(config,df_raw,db_schema,entry_type):"""Build table entries from a flat list of columns.    Args:        df_raw - a plain dataframe with TABLE_NAME, COLUMN_NAME, DATA_TYPE,                 and NULLABLE columns        db_schema - parent database schema        entry_type - entry type: table or view    Returns:        A dataframe with Dataplex-readable data of tables of views.    """schema_key="dataplex-types.global.schema"# The transformation below does the following# 1. Alters NULLABLE content from Y/N to NULLABLE/REQUIRED# 2. Renames NULLABLE to mode# 3. Renames DATA_TYPE to dataType# 4. Creates metadataType column based on dataType column# 5. Renames COLUMN_NAME to namedf=df_raw \.withColumn("mode",F.when(F.col("NULLABLE")=='Y',"NULLABLE").otherwise("REQUIRED")) \.drop("NULLABLE") \.withColumnRenamed("DATA_TYPE","dataType") \.withColumn("metadataType",choose_metadata_type_udf("dataType")) \.withColumnRenamed("COLUMN_NAME","name")# The transformation below aggregate fields, denormalizing the table# TABLE_NAME becomes top-level filed, and the rest is put into# the array type called "fields"aspect_columns=["name","mode","dataType","metadataType"]df=df.withColumn("columns",F.struct(aspect_columns))\.groupby('TABLE_NAME') \.agg(F.collect_list("columns").alias("fields"))# Create nested structured called aspects.# Fields are becoming a part of a `schema` struct# There is also an entry_aspect that is repeats entry_type as aspect_typeentry_aspect_name=nb.create_entry_aspect_name(config,entry_type)df=df.withColumn("schema",F.create_map(F.lit(schema_key),F.named_struct(F.lit("aspect_type"),F.lit(schema_key),F.lit("data"),F.create_map(F.lit("fields"),F.col("fields")))))\.withColumn("entry_aspect",create_entry_aspect(entry_aspect_name)) \.drop("fields")# Merge separate aspect columns into the one map called 'aspects'df=df.select(F.col("TABLE_NAME"),F.map_concat("schema","entry_aspect").alias("aspects"))# Define user-defined functions to fill the general information# and hierarchy namescreate_name_udf=F.udf(lambdax:nb.create_name(config,entry_type,db_schema,x),StringType())create_fqn_udf=F.udf(lambdax:nb.create_fqn(config,entry_type,db_schema,x),StringType())parent_name=nb.create_parent_name(config,entry_type,db_schema)full_entry_type=entry_type.value.format(project=config["target_project_id"],location=config["target_location_id"])# Fill the top-level fieldscolumn=F.col("TABLE_NAME")df=df.withColumn("name",create_name_udf(column)) \.withColumn("fully_qualified_name",create_fqn_udf(column)) \.withColumn("entry_type",F.lit(full_entry_type)) \.withColumn("parent_entry",F.lit(parent_name)) \.withColumn("entry_source",create_entry_source(column)) \.drop(column)df=convert_to_import_items(df,[schema_key,entry_aspect_name])returndf

    Note the following:

    • The methods build themetadata resources that the connectorcreates for your Oracle resources. Use the conventions that are described in theExample metadata resources for an Oracle sourcesection of this document.
    • Theconvert_to_import_items method applies to schemas, tables, andviews. Ensure that the output of the connector is one or more import items thatcan be processed by themetadataJobs.create method,not individual entries.
    • Even in a view, the column is calledTABLE_NAME.
  6. Update thebootstrap.py file with code togenerate the metadata import file and run the connector.

    """The entrypoint of a pipeline."""fromtypingimportDictfromsrc.constantsimportEntryTypefromsrcimportcmd_readerfromsrcimportsecret_managerfromsrcimportentry_builderfromsrcimportgcs_uploaderfromsrcimporttop_entry_builderfromsrc.oracle_connectorimportOracleConnectorFILENAME="output.jsonl"defwrite_jsonl(output_file,json_strings):"""Writes a list of string to the file in JSONL format."""# For simplicity, dataset is written into the one file. But it is not# mandatory, and the order doesn't matter for Import API.# The PySpark itself could dump entries into many smaller JSONL files.# Due to performance, it's recommended to dump to many smaller files.forstringinjson_strings:output_file.write(string+"\n")defprocess_dataset(connector:OracleConnector,config:Dict[str,str],schema_name:str,entry_type:EntryType,):"""Builds dataset and converts it to jsonl."""df_raw=connector.get_dataset(schema_name,entry_type)df=entry_builder.build_dataset(config,df_raw,schema_name,entry_type)returndf.toJSON().collect()defrun():"""Runs a pipeline."""config=cmd_reader.read_args()config["password"]=secret_manager.get_password(config["password_secret"])connector=OracleConnector(config)withopen(FILENAME,"w",encoding="utf-8")asfile:# Write top entries that don't require connection to the databasefile.writelines(top_entry_builder.create(config,EntryType.INSTANCE))file.writelines("\n")file.writelines(top_entry_builder.create(config,EntryType.DATABASE))# Get schemas, write them and collect to the listdf_raw_schemas=connector.get_db_schemas()schemas=[schema.USERNAMEforschemaindf_raw_schemas.select("USERNAME").collect()]schemas_json=entry_builder.build_schemas(config,df_raw_schemas).toJSON().collect()write_jsonl(file,schemas_json)# Ingest tables and views for every schema in a listforschemainschemas:print(f"Processing tables for{schema}")tables_json=process_dataset(connector,config,schema,EntryType.TABLE)write_jsonl(file,tables_json)print(f"Processing views for{schema}")views_json=process_dataset(connector,config,schema,EntryType.VIEW)write_jsonl(file,views_json)gcs_uploader.upload(config,FILENAME)

    This example saves the metadata import file as a single JSON Lines file. Youcan use PySpark tools like theDataFrameWriter class to output batches ofJSON in parallel.

    The connector can write entries to the metadata import file in any order.

  7. Update thegcs_uploader.py file with code to upload the metadata importfile to a Cloud Storage bucket.

    """Sends files to GCP storage."""fromtypingimportDictfromgoogle.cloudimportstoragedefupload(config:Dict[str,str],filename:str):"""Uploads a file to GCP bucket."""client=storage.Client()bucket=client.get_bucket(config["output_bucket"])folder=config["output_folder"]blob=bucket.blob(f"{folder}/{filename}")blob.upload_from_filename(filename)
  8. Build the connector image.

    If your connector contains multiple files, or if you want to uselibraries that aren't included in the default Docker image, you must use acustom container.Serverless for Apache Spark runs workloads within Dockercontainers. Create a custom Docker image of the connector and store theimage inArtifact Registry.Serverless for Apache Spark reads the image from Artifact Registry.

    Note: If the connector contains only one file, you can send the job toServerless for Apache Spark directly. Skip this step.

    1. Create a Dockerfile:

      FROMdebian:11-slimENVDEBIAN_FRONTEND=noninteractiveRUNaptupdate &&aptinstall-yprocpstiniRUNaptinstall-ywgetENVSPARK_EXTRA_JARS_DIR=/opt/spark/jars/RUNmkdir-p"${SPARK_EXTRA_JARS_DIR}"COPYojdbc11.jar"${SPARK_EXTRA_JARS_DIR}"ENVCONDA_HOME=/opt/miniconda3ENVPYSPARK_PYTHON=${CONDA_HOME}/bin/pythonENVPATH=${CONDA_HOME}/bin:${PATH}RUNwgethttps://repo.anaconda.com/miniconda/Miniconda3-py311_24.9.2-0-Linux-x86_64.shRUNbashMiniconda3-py310_23.3.1-0-Linux-x86_64.sh-b-p/opt/miniconda3\  &&${CONDA_HOME}/bin/condaconfig--system--setalways_yesTrue\  &&${CONDA_HOME}/bin/condaconfig--system--setauto_update_condaFalse\  &&${CONDA_HOME}/bin/condaconfig--system--prependchannelsconda-forge\  &&${CONDA_HOME}/bin/condaconfig--system--setchannel_prioritystrictRUN${CONDA_HOME}/bin/condainstallmamba-nbase-cconda-forge\    &&${CONDA_HOME}/bin/mambainstall\conda\google-cloud-dataproc\google-cloud-logging\google-cloud-monitoring\google-cloud-storageRUNaptupdate &&aptinstall-ygitCOPYrequirements.txt.RUNpython-mpipinstall-rrequirements.txtENVPYTHONPATH=/opt/python/packagesRUNmkdir-p"${PYTHONPATH}/src/"COPYsrc/"${PYTHONPATH}/src/"COPYmain.py.RUNgroupadd-g1099sparkRUNuseradd-u1099-g1099-d/home/spark-msparkUSERspark

      Use Conda as your package manager. Serverless for Apache Sparkmountspyspark into the container at runtime, so youdon't need to install PySpark dependencies in your custom container image.

    2. Build the custom container image and push it to Artifact Registry.

      #!/bin/bashIMAGE=oracle-pyspark:0.0.1PROJECT=<PROJECT_ID>REPO_IMAGE=us-central1-docker.pkg.dev/${PROJECT}/docker-repo/oracle-pysparkdockerbuild-t"${IMAGE}".# Tag and push to GCP container registrygcloudconfigsetproject${PROJECT}gcloudauthconfigure-dockerus-central1-docker.pkg.devdockertag"${IMAGE}""${REPO_IMAGE}"dockerpush"${REPO_IMAGE}"

      Because one image can have multiple names, you can use the Docker tag toassign an alias to the image.

  9. Run the connector on Serverless for Apache Spark.To submit a PySpark batch job using the custom container image, run thegcloud dataproc batches submit pyspark command.

    gcloud dataproc batches submit pyspark main.py --project=PROJECT \    --region=REGION --batch=BATCH_ID \    --container-image=CUSTOM_CONTAINER_IMAGE \    --service-account=SERVICE_ACCOUNT_NAME \    --jars=PATH_TO_JAR_FILES \    --properties=PYSPARK_PROPERTIES \    --PIPELINE_ARGUMENTS

    Note the following:

    • The JAR files are drivers for Spark. To read from Oracle, MySQL, orPostgres, you must provide Apache Spark a specific package. The packagecan be located in Cloud Storage or inside the container. If theJAR file is inside the container, the path is similar tofile:///path/to/file/driver.jar. In this example, the path to theJAR file is/opt/spark/jars/.
    • PIPELINE_ARGUMENTS are the command-line argumentsfor the connector.

    The connector extracts metadata from the Oracle database, generates ametadata import file, and saves the metadata import file to aCloud Storage bucket.

  10. To manually import the metadata in the metadata import file intoDataplex Universal Catalog, run a metadata job. Use themetadataJobs.create method.

    1. In the command line, add environment variables and create an alias forthe curl command.

      PROJECT_ID=PROJECTLOCATION_ID=LOCATIONDATAPLEX_API=dataplex.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION_IDaliasgcurl='curl -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json"'
    2. Call the API method, passing the entry types and aspect types that youwant to import.

      gcurlhttps://${DATAPLEX_API}/metadataJobs?metadata_job_id="JOB_ID"-d"$(cat<<EOF{"type":"IMPORT","import_spec":{"source_storage_uri":"gs://BUCKET/FOLDER/","entry_sync_mode":"FULL","aspect_sync_mode":"INCREMENTAL","scope":{"entry_groups":["projects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP_ID"],"entry_types":["projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance","projects/PROJECT/locations/LOCATION/entryTypes/oracle-database","projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema","projects/PROJECT/locations/LOCATION/entryTypes/oracle-table","projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"],"aspect_types":["projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance","projects/dataplex-types/locations/global/aspectTypes/schema","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table","projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"],},},}EOF)"

      Theschema aspect type is a global aspect type that is defined byDataplex Universal Catalog.

      Note that the format that you use for aspect type names when calling theAPI method is different from the format that you use in the connectorcode.

    3. Optional: Use Cloud Logging to view logs for the metadata job. Formore information, seeMonitor Dataplex Universal Catalog logs.

Set up pipeline orchestration

The previous sections showed how to build an example connector and run theconnector manually.

In a production environment, you run the connector as part of a managedconnectivity pipeline, by using an orchestration platform likeWorkflows.

  1. To run a managed connectivity pipeline with the example connector, follow thesteps to import metadata using Workflows.Do these things:

    • Create the workflow in the same Google Cloud location as theconnector.
    • In the workflow definition file, update thesubmit_pyspark_extract_jobfunction with the following code to extract data from the Oracle databaseusing the connector that you created.

      -submit_pyspark_extract_job:call:http.postargs:url:${"https://dataproc.googleapis.com/v1/projects/"+args.TARGET_PROJECT_ID+"/locations/"+args.CLOUD_REGION+"/batches"}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"headers:Content-Type:"application/json"query:batchId:${WORKFLOW_ID}body:pysparkBatch:mainPythonFileUri:file:///main.pyjars:file:///opt/spark/jars/ojdbc11.jarargs:-${"--host_port="+args.ORACLE_HOST_PORT}-${"--user="+args.ORACLE_USER}-${"--password="+args.ORACLE_PASSWORD}-${"--database="+args.ORACE_DATABASE}-${"--project="+args.TARGET_PROJECT_ID}-${"--location="+args.CLOUD_REGION}-${"--entry_group="+args.TARGET_ENTRY_GROUP_ID}-${"--bucket="+args.CLOUD_STORAGE_BUCKET_ID}-${"--folder="+WORKFLOW_ID}runtimeConfig:version:"2.0"containerImage:"us-central1-docker.pkg.dev/PROJECT/REPOSITORY/oracle-pyspark"environmentConfig:executionConfig:serviceAccount:${args.SERVICE_ACCOUNT}result:RESPONSE_MESSAGE
    • In the workflow definition file, update thesubmit_import_job functionwith the following code to import the entries. The function calls themetadataJobs.create API method to run a metadata import job.

      -submit_import_job:call:http.postargs:url:${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID +"/locations/" + args.CLOUD_REGION +"/metadataJobs?metadata_job_id=" + WORKFLOW_ID}auth:type:OAuth2scopes:"https://www.googleapis.com/auth/cloud-platform"body:type:IMPORTimport_spec:source_storage_uri:${"gs://" + args.CLOUD_STORAGE_BUCKET_ID +"/" + WORKFLOW_ID +"/"}entry_sync_mode:FULLaspect_sync_mode:INCREMENTALscope:entry_groups:-${"projects/" + args.TARGET_PROJECT_ID +"/locations/" + args.CLOUD_REGION +"/entryGroups/"+args.TARGET_ENTRY_GROUP_ID}entry_types:-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-instance"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-database"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-schema"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-table"-"projects/PROJECT/locations/LOCATION/entryTypes/oracle-view"aspect_types:-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-instance"-"projects/dataplex-types/locations/global/aspectTypes/schema"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-database"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-schema"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-table"-"projects/PROJECT/locations/LOCATION/aspectTypes/oracle-view"result:IMPORT_JOB_RESPONSE

      Provide the same entry types and aspect types that you included when youcalled the API method manually. Note that there isn't a comma at the endof each string.

    • When you execute the workflow, provide the following runtime arguments:

      {"CLOUD_REGION":"us-central1","ORACLE_USER":"system","ORACLE_HOST_PORT":"x.x.x.x:1521","ORACLE_DATABASE":"xe","ADDITIONAL_CONNECTOR_ARGS":[],}
  2. Optional: Use Cloud Logging to view logs for the managed connectivitypipeline. The log payload includes a link to the logs for theServerless for Apache Spark batch job and the metadata import job,as relevant. For more information, seeView workflow logs.

  3. Optional: To improve the security, performance, and functionality of yourmanaged connectivity pipeline, consider doing the following things:

    1. Use Secret Managerto store the credentials for your third-party data source.
    2. Use PySpark to write the JSON Lines output into multiple metadata importfiles in parallel.
    3. Use a prefix to split big files (more than 100 MB) into smallerfiles.
    4. Add more custom aspects that capture additional business and technicalmetadata from your source.

Example metadata resources for an Oracle source

The example connector extracts metadata from an Oracle database and maps themetadata to corresponding Dataplex Universal Catalog metadata resources.

Hierarchy considerations

Every system in Dataplex Universal Catalog has a root entry that is the parententry for the system. Usually the root entry has aninstance entry type.The following table shows the example hierarchy of entry types and aspect typesfor an Oracle system. For example, theoracle-database entry type is linked toan aspect type that is also namedoracle-database.

Entry type IDDescriptionLinked aspect type ID
oracle-instanceThe root of the imported system.oracle-instance
oracle-databaseThe Oracle database.oracle-database
oracle-schemaThe database schema.oracle-schema
oracle-tableA table.

oracle-table

schema

oracle-viewA view.

oracle-view

schema

Theschema aspect type is a global aspect type that is defined byDataplex Universal Catalog. It contains a description of the fields in a table,view, or other entity that has columns. Theoracle-schema custom aspect typecontains the name of the Oracle database schema.

Note:
  • You can use a different name for the root entry if it's relevant for your system. For example, you can name the root entry for an Apache Kafka system ascluster.
  • Depending on the system, you might have only schemas or databases. For many relational database management systems, "schema" and "database" are synonyms. To reduce the number of entry types in the hierarchy, consider choosing one term to use for all similar resources.

Example import item fields

The connector should use the following conventions for Oracle resources.

  • Fully qualified names: fully qualified names for Oracle resources use the following naming template. Forbidden characters are escaped with backticks.

    ResourceTemplateExample
    Instance

    SOURCE:ADDRESS

    Use the host and port number or the domain name of the system.

    oracle:`localhost:1521` ororacle:`myinstance.com`
    DatabaseSOURCE:ADDRESS.DATABASEoracle:`localhost:1521`.xe
    SchemaSOURCE:ADDRESS.DATABASE.SCHEMAoracle:`localhost:1521`.xe.sys
    TableSOURCE:ADDRESS.DATABASE.SCHEMA.TABLE_NAMEoracle:`localhost:1521`.xe.sys.orders
    ViewSOURCE:ADDRESS.DATABASE.SCHEMA.VIEW_NAMEoracle:`localhost:1521`.xe.sys.orders_view
  • Entry names or entry IDs: entries for Oracle resources use the following naming template. Forbidden characters are replaced with a permitted character. Resources use the prefixprojects/PROJECT/locations/LOCATION/entryGroups/ENTRY_GROUP/entries.

    ResourceTemplateExample
    InstancePREFIX/HOST_PORTprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521
    DatabasePREFIX/HOST_PORT/databases/DATABASEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe
    SchemaPREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMAprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys
    TablePREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/tables/TABLEprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/tables/orders
    ViewPREFIX/HOST_PORT/databases/DATABASE/database_schemas/SCHEMA/views/VIEWprojects/example-project/locations/us-central1/entryGroups/oracle-prod/entries/10.1.1.1@1521/databases/xe/database_schemas/sys/views/orders_view
  • Parent entries: if an entry isn't a root entry for the system, the entry can have a parent entry field that describes its position in the hierarchy. The field should contain the name of the parent entry. We recommend that you generate this value.

    The following table shows the parent entries for Oracle resources.

    EntryParent entry
    Instance"" (empty string)
    DatabaseInstance name
    SchemaDatabase name
    TableSchema name
    ViewSchema name
  • Aspect map: the aspect map must contain at least one aspect that describes the entity to import. Here's an example aspect map for an Oracle table.

    "example-project.us-central1.oracle-table":{"aspect_type":"example-project.us-central1.oracle-table","path":"","data":{}},

    You can find predefined aspect types (likeschema) that define the table or view structure in thedataplex-types project, in theglobal location.

  • Aspect keys: aspect keys use the naming formatPROJECT.LOCATION.ASPECT_TYPE. The following table shows example aspect keys for Oracle resources.

    EntryExample aspect key
    Instanceexample-project.us-central1.oracle-instance
    Databaseexample-project.us-central1.oracle-database
    Schemaexample-project.us-central1.oracle-schema
    Tableexample-project.us-central1.oracle-table
    Viewexample-project.us-central1.oracle-view

What's next

Except as otherwise noted, the content of this page is licensed under theCreative Commons Attribution 4.0 License, and code samples are licensed under theApache 2.0 License. For details, see theGoogle Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.

Last updated 2026-02-19 UTC.