Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code.

NotificationsYou must be signed in to change notification settings

enricogoerlitz/fabricengineer-py

Repository files navigation

CICD

Description

FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code. In addition, FabricEngineer enables environment-as-code for Microsoft Fabric: create and manage Lakehouses, Warehouses, Variable Libraries, Notebooks, and Data Pipelines programmatically via the Fabric API. Using a template notebook, you can define standardized workspaces and deploy them either directly in Fabric or through your CI/CD pipeline.

Key Features

🚀Silver Layer Data Ingestion Services

  • Insert-Only Pattern: Efficient data ingestion with support for schema evolution and historization
  • SCD Type 2 (Slowly Changing Dimensions): Complete implementation of Type 2 SCD with automatic history tracking
  • Delta Load Support: Optimized incremental data processing with broadcast join capabilities
  • Schema Evolution: Automatic handling of schema changes with backward compatibility

📊Materialized Lake Views (MLV)

  • Automated MLV Generation: Create and manage materialized views with SQL generation
  • Schema-aware Operations: Intelligent handling of schema changes and column evolution
  • Lakehouse Integration: Seamless integration with Microsoft Fabric Lakehouse architecture

🏗️Environment-as-Code for Microsoft Fabric

  • Programmatic Provisioning: Create Lakehouses, Warehouses, Variable Libraries, Notebooks, and Data Pipelines via the Fabric API
  • Workspace Templating: Define standard workspaces with a parameterized template notebook
  • Flexible Deployment: Deploy directly in Microsoft Fabric or via CI/CD (e.g., GitHub Actions or Azure DevOps)
  • Repeatable Setups: Consistent, code-driven environments with minimal boilerplate

🔧Advanced Data Engineering Features

  • Configurable Transformations: Flexible transformation pipelines with custom business logic
  • Data Quality Controls: Built-in validation and data quality checks
  • Performance Optimization: Broadcast joins, partition strategies, and optimized query patterns
  • Comprehensive Logging: Integrated logging and performance monitoring with

Installation

pip install fabricengineer-py

Quick Start Guide

Prerequisites

  • Microsoft Fabric workspace with Lakehouse
  • PySpark environment
  • Python 3.11+

Usage Examples

Silver Layer Data Ingestion

Insert-Only Pattern

The Insert-Only service is ideal for append-only scenarios where you need to track all changes while maintaining performance.

frompyspark.sqlimportDataFrame,functionsasFfromfabricengineer.loggingimportTimeLoggerfromfabricengineer.transform.lakehouseimportLakehouseTablefromfabricengineer.transformimportSilverIngestionInsertOnlyServicedeftransform_projects(df:DataFrame,etl)->DataFrame:df=df.withColumn("dtime",F.to_timestamp("dtime"))returndfdeftransform_all(df:DataFrame,etl)->DataFrame:df=df.withColumn("data",F.lit("values"))returndf# Initialize performance monitoringtimer=TimeLogger()# Define table-specific transformationstransformations= {"*":transform_all,# Applied to all tables"projects":transform_projects# Applied only to projects table}# Configure source and destination tablessource_table=LakehouseTable(lakehouse="BronzeLakehouse",schema="schema",table="projects")destination_table=LakehouseTable(lakehouse="SilverLakehouse",schema=source_table.schema,table=source_table.table)# Initialize and configure the ETL serviceetl=SilverIngestionInsertOnlyService()etl.init(spark_=spark,notebookutils_=notebookutils,source_table=source_table,destination_table=destination_table,nk_columns=NK_COLUMNS,constant_columns=CONSTANT_COLUMNS,is_delta_load=IS_DELTA_LOAD,delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,transformations=transformations,exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,historize=HISTORIZE,partition_by_columns=PARTITION_BY_COLUMNS,df_bronze=None,create_historized_mlv=True)timer.start().log()etl.run()timer.stop().log()

SCD Type 2 (Slowly Changing Dimensions)

The SCD2 service implements Type 2 Slowly Changing Dimensions with automatic history tracking and current record management.

frompyspark.sqlimportDataFrame,functionsasFfromfabricengineer.loggingimportTimeLoggerfromfabricengineer.transform.lakehouseimportLakehouseTablefromfabricengineer.transformimportSilverIngestionSCD2Servicedeftransform_projects(df:DataFrame,etl)->DataFrame:df=df.withColumn("dtime",F.to_timestamp("dtime"))returndfdeftransform_all(df:DataFrame,etl)->DataFrame:df=df.withColumn("data",F.lit("values"))returndf# Initialize performance monitoringtimer=TimeLogger()# Define table-specific transformationstransformations= {"*":transform_all,# Applied to all tables"projects":transform_projects# Applied only to projects table}# Configure source and destination tablessource_table=LakehouseTable(lakehouse="BronzeLakehouse",schema="schema",table="projects")destination_table=LakehouseTable(lakehouse="SilverLakehouse",schema=source_table.schema,table=source_table.table)# Initialize and configure the ETL serviceetl=SilverIngestionSCD2Service()etl.init(spark_=spark,notebookutils_=notebookutils,source_table=source_table,destination_table=destination_table,nk_columns=NK_COLUMNS,constant_columns=CONSTANT_COLUMNS,is_delta_load=IS_DELTA_LOAD,delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,transformations=transformations,exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,historize=HISTORIZE,partition_by_columns=PARTITION_BY_COLUMNS,df_bronze=None)timer.start().log()etl.run()timer.stop().log()

Materialized Lake Views Management

Prerequisites

Configure a Utils Lakehouse as your default Lakehouse. The generated view SQL code will be saved as.sql.txt files in the lakehouse under/Files/mlv/{lakehouse}/{schema}/{table}.sql.txt.

fromfabricengineer.mlvimportMaterializeLakeView# Initialize the Materialized Lake View managermlv=MaterializedLakeView(lakehouse="SilverBusinessLakehouse",schema="schema",table="projects")print(mlv.to_dict())# Define your custom SQL querysql="""SELECT    p.id    ,p.projectname    ,p.budget    ,u.name AS projectleadFROM dbo.projects pLEFT JOIN users uON p.projectlead_id = u.id"""# Create or replace the materialized viewresult=mlv.create_or_replace(sql)display(result)

Environment-as-Code

Manage Lakehouse

fromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.itemsimportLakehouseworkspace=Workspace.get_by_name("<WORKSPACE_NAME")workspace_id=workspace.item.idlakehouse=Lakehouse(workspace_id,name="LakehouseName")# Static methodslakehouse_by_id=Lakehouse.get_by_id(workspace_id,id="LAKEHOUSE_ID")lakehouse_by_name=Lakehouse.get_by_name(workspace_id,name="LAKEHOUSE_NAME")lakehouses=Lakehouse.list(workspace_id)# Create lakehouselakehouse.create()lakehouse.create_if_not_exists()# Save creation# Update lakehouselakehouse.update(description="Updated description")# Fetch current api datalakehouse.fetch()# Check existsiflakehouse.exists():pass# Deletelakehouse.delete()# API Fieldsid=lakehouse.item.api.idworkspace_id=lakehouse.item.api.workspaceIddisplay_name=lakehouse.item.api.displayNameother=lakehouse.item.api.*# Setted fieldsfields:dict[str,Any]=lakehouse.item.fields

Manage WorkspaceFolder

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderworkspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.id# Create foldersroot_folder=WorkspaceFolder(workspace_id,name="RootFolder")root_folder.create_if_not_exists()sub_folder=WorkspaceFolder(workspace_id,name="SubFolder",parent_folder=root_folder)sub_folder.create_if_not_exists()# Static methodsfolder_by_id=WorkspaceFolder.get_by_id(workspace_id,id="FOLDER_ID")folder_by_name=WorkspaceFolder.get_by_name(workspace_id,name="RootFolder")folders=WorkspaceFolder.list(workspace_id)# Update (rename)root_folder.update(displayName="RootFolderRenamed")# Fetch current api dataroot_folder.fetch()# Check existsifroot_folder.exists():pass# Deletesub_folder.delete()# API Fieldsid=root_folder.item.api.idworkspace_id=root_folder.item.api.workspaceIddisplay_name=root_folder.item.api.displayNameparent_folder_id=root_folder.item.api.parentFolderId# Set fieldsfields:dict[str,Any]=root_folder.item.fields

Manage Warehouse

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimportWarehouseworkspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Warehouses")folder.create_if_not_exists()warehouse=Warehouse(workspace_id=workspace_id,name="WarehouseName",description="Description",folder=folder,collation_type="Latin1_General_100_BIN2_UTF8")# Static methodswarehouse_by_id=Warehouse.get_by_id(workspace_id,id="WAREHOUSE_ID")warehouse_by_name=Warehouse.get_by_name(workspace_id,name="WarehouseName")warehouses=Warehouse.list(workspace_id)# Createwarehouse.create()warehouse.create_if_not_exists()# Updatewarehouse.update(description="Updated description")# Fetchwarehouse.fetch()# Existsifwarehouse.exists():pass# Deletewarehouse.delete()# API Fieldsid=warehouse.item.api.idworkspace_id=warehouse.item.api.workspaceIddisplay_name=warehouse.item.api.displayNameconnection_string=warehouse.item.api.properties.connectionStringcollation_type=warehouse.item.api.properties.collationType# Set fieldsfields:dict[str,Any]=warehouse.item.fields

Manage Notebook

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (Notebook,IPYNBNotebookDefinition,CopyFabricNotebookDefinition)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Notebooks")folder.create_if_not_exists()# Empty notebooknotebook=Notebook(workspace_id=workspace_id,name="NotebookName",description="Description",folder=folder)# From .ipynb fileipynb_def=IPYNBNotebookDefinition(ipynb_path="/path/to/notebook.ipynb")notebook_from_ipynb=Notebook(workspace_id=workspace_id,name="NotebookFromIpynb",description="Description",definition=ipynb_def,folder=folder)# From copycopy_def=CopyFabricNotebookDefinition("<SOURCE_WORKSPACE_ID>","<SOURCE_NOTEBOOK_ID>")notebook_from_copy=Notebook(workspace_id=workspace_id,name="NotebookFromCopy",description="Description",definition=copy_def,folder=folder)# Static methodsnb_by_id=Notebook.get_by_id(workspace_id,id="NOTEBOOK_ID")nb_by_name=Notebook.get_by_name(workspace_id,name="NotebookName")notebooks=Notebook.list(workspace_id)# Createnotebook.create_if_not_exists()# Updatenotebook.update(description="Updated description")# Fetchnotebook.fetch()# Existsifnotebook.exists():pass# Deletenotebook.delete()# API Fieldsid=notebook.item.api.idworkspace_id=notebook.item.api.workspaceIddisplay_name=notebook.item.api.displayName# Set fieldsfields:dict[str,Any]=notebook.item.fields

Manage DataPipeline

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (DataPipeline,ZIPDataPipelineDefinition,CopyDataPipelineDefinition)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Pipelines")folder.create_if_not_exists()# Empty pipelinepipeline=DataPipeline(workspace_id=workspace_id,name="PipelineName",description="Description",folder=folder)# From ZIP definitionzip_def=ZIPDataPipelineDefinition(zip_path="/path/to/pipeline.zip")pipeline_from_zip=DataPipeline(workspace_id=workspace_id,name="PipelineFromZip",description="Description",definition=zip_def,folder=folder)# From copycopy_def=CopyDataPipelineDefinition("<SOURCE_WORKSPACE_ID>","<SOURCE_PIPELINE_ID>")pipeline_from_copy=DataPipeline(workspace_id=workspace_id,name="PipelineFromCopy",description="Description",definition=copy_def,folder=folder)# Static methodsdp_by_id=DataPipeline.get_by_id(workspace_id,id="PIPELINE_ID")dp_by_name=DataPipeline.get_by_name(workspace_id,name="PipelineName")pipelines=DataPipeline.list(workspace_id)# Createpipeline.create_if_not_exists()# Updatepipeline.update(description="Updated description")# Fetchpipeline.fetch()# Existsifpipeline.exists():pass# Deletepipeline.delete()# API Fieldsid=pipeline.item.api.idworkspace_id=pipeline.item.api.workspaceIddisplay_name=pipeline.item.api.displayName# Set fieldsfields:dict[str,Any]=pipeline.item.fields

Manage VariableLibrary

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (VariableLibrary,VariableLibraryDefinition,VariableLibraryVariable)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Variables")folder.create_if_not_exists()definition=VariableLibraryDefinition(    ["TEST","PROD"],VariableLibraryVariable(name="ApiUrl",note="",type="String",value="https://api.test"),VariableLibraryVariable(name="ApiKey",note="",type="String",value="secret"))varlib=VariableLibrary(workspace_id=workspace_id,name="VariableLibrary",description="Description",folder=folder,definition=definition)# Static methodsvl_by_id=VariableLibrary.get_by_id(workspace_id,id="VARIABLE_LIBRARY_ID")vl_by_name=VariableLibrary.get_by_name(workspace_id,name="VariableLibrary")varlibs=VariableLibrary.list(workspace_id)# Createvarlib.create_if_not_exists()# Updatevarlib.update(description="Updated description")# Fetchvarlib.fetch()# Existsifvarlib.exists():pass# Deletevarlib.delete()# API Fieldsid=varlib.item.api.idworkspace_id=varlib.item.api.workspaceIddisplay_name=varlib.item.api.displayNameactive_value_set=varlib.item.api.properties.activeValueSetName# Set fieldsfields:dict[str,Any]=varlib.item.fields

Manage Workspace

fromfabricengineer.api.fabric.workspace.workspaceimportWorkspace# Createws=Workspace(name="MyWorkspace",description="New Workspace",capacity_id="<CAPACITY_ID>"# Optional)ws.create()ws.create_if_not_exists()# Static methodsws_by_id=Workspace.get_by_id("WORKSPACE_ID")ws_by_name=Workspace.get_by_name("MyWorkspace")workspaces=Workspace.list()# Updatews.update(description="Updated description")# Fetchws.fetch()# Existsifws.exists():pass# Deletews.delete()# API Fieldsid=ws.item.api.iddisplay_name=ws.item.api.displayNamedescription=ws.item.api.descriptioncapacity_id=ws.item.api.capacityIdcapacity_region=ws.item.api.capacityRegionone_lake_blob=ws.item.api.oneLakeEndpoints.blobEndpointone_lake_dfs=ws.item.api.oneLakeEndpoints.dfsEndpoint# Set fieldsfields:dict[str,Any]=ws.item.fields

Use WorkspaceItemCreationPipeline

fromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (DataPipeline,CopyDataPipelineDefinition,ZIPDataPipelineDefinition,Notebook,CopyFabricNotebookDefinition,IPYNBNotebookDefinition,VariableLibrary,VariableLibraryProperties,VariableLibraryDefinitionWarehouse,Lakehouse)fromfabricengineer.api.fabric.workspace.create_pipelineimport (WorkspaceItemCreationPipeline,PipelineItemStatus)workspace_id="<WORKSPACE_ID>"# Foldersroot_folder=WorkspaceFolder(workspace_id,name="RootFolder")sub_folder=WorkspaceFolder(workspace_id,name="SubFolder",parent_folder=root_folder)# DataPipelinedata_pipeline_empty=DataPipeline(workspace_id=workspace_id,name=name,description="Description",folder=None)zip_path="/path/to/pipeline.zip"zip_defintion=ZIPDataPipelineDefinition(zip_path=path)data_pipeline_from_zip=DataPipeline(workspace_id=workspace_id,name=name,description="Description",definition=zip_definition,folder=root_folder)copy_data_pipeline_definition=CopyDataPipelineDefinition("<WORKSPACE_ID>","<PIPELINE_ID>")data_pipeline_copy=DataPipeline(workspace_id=workspace_id,name=name,description="Description",definition=copy_data_pipeline_definition,folder=sub_folder)# Lakehouselakehouse=Lakehouse(workspace_id=workspace_id,name="LakehouseName",description="Description",folder=root_folder)# Notebooknotebook_empty=Notebook(workspace_id=workspace_id,name="Notebook",description="Description",folder=None)ipynb_path="/path/to/notebook.ipynb"ipynb_notebook_definition=IPYNBNotebookDefinition(ipynb_path=ipynb_path)notebook_from_ipynb=Notebook(workspace_id=workspace_id,name="Notebook",description="Description",definition=ipynb_notebook_definition,folder=None)copy_notebook_definition=CopyFabricNotebookDefinition("<WORKSPACE_ID>","<NOTEBOOK_ID>")notebook_from_copy=Notebook(workspace_id=workspace_id,name="Notebook",description="Description",definition=copy_notebook_definition,folder=None)# VariableLibraryvarlib_definition=VariableLibraryDefinition(    ["TEST","PROD"],VariableLibraryVariable(name="Variable1",note="",type="String",value="blub-default"    ),VariableLibraryVariable(name="Variable2",note="",type="String",value="blab-default"    ))variable_library=VariableLibrary(workspace_id=WORKSPACE_ID,name="VariableLibrary",definition=definition)# Warehousewarehouse=Warehouse(workspace_id=workspace_id,name="WarehouseName",description="Description",folder=root_folder)# Create and execute WorkspaceItemCreationPipelinepipeline=WorkspaceItemCreationPipeline([root_folder,sub_folder,data_pipeline_empty,data_pipeline_from_zip,data_pipeline_copy,notebook_empty,notebook_from_ipynb,notebook_from_copy,variable_library,lakehouse,warehouse])result=pipeline.run(in_parallel=True)print(result)

Remote Module Import for Fabric Notebooks

Import specific package modules directly into your Fabric notebooks from GitHub releases:

# Cell 1:importrequestsVERSION="1.0.0"url=f"https://raw.githubusercontent.com/enricogoerlitz/fabricengineer-py/refs/tags/{VERSION}/src/fabricengineer/import_module/import_module.py"resp=requests.get(url)code=resp.textexec(code,globals())# This provides the 'import_module' functionassertcode.startswith("import requests")assert"def import_module"incode# Cell 2mlv_module=import_module("transform.mlv",VERSION)scd2_module=import_module("transform.silver.scd2",VERSION)insertonly_module=import_module("transform.silver.insertonly",VERSION)# Cell 3 - Use mlv moduleexec(mlv_module,globals())# Provides MaterializedLakeView class and mlv instancemlv.init(lakehouse="SilverBusinessLakehouse",schema="schema",table="projects")print(mlv.to_dict())# Cell 4 - Use scd2 moduleexec(scd2_module,globals())# Provides an instantiated etl objectetl.init(...)print(str(etl))# Cell 5 - Use insertonly moduleexec(insertonly_module,globals())# Provides an instantiated etl objectetl.init(...)print(str(etl))

Advanced Features

Performance Optimization

  • Broadcast Joins: Automatically optimize small table joins
  • Partition Strategies: Intelligent partitioning for better query performance
  • Schema Evolution: Handle schema changes without breaking existing pipelines
  • Delta Load Processing: Efficient incremental data processing

Data Quality & Validation

  • Automatic Validation: Built-in checks for data consistency and quality
  • Type Safety: Comprehensive type annotations for better development experience
  • Error Handling: Robust error handling and recovery mechanisms

Monitoring & Logging

fromfabricengineer.loggingimportTimeLogger,logger# Performance monitoringtimer=TimeLogger()timer.start().log()# Your ETL operations hereetl.run()timer.stop().log()# Custom fabricengineer logginglogger.info("Custom log message")logger.error("Error occurred during processing")

About

FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code.

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

[8]ページ先頭

©2009-2025 Movatter.jp