You signed in with another tab or window.Reload to refresh your session.You signed out in another tab or window.Reload to refresh your session.You switched accounts on another tab or window.Reload to refresh your session.Dismiss alert
FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code.
FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code. In addition, FabricEngineer enables environment-as-code for Microsoft Fabric: create and manage Lakehouses, Warehouses, Variable Libraries, Notebooks, and Data Pipelines programmatically via the Fabric API. Using a template notebook, you can define standardized workspaces and deploy them either directly in Fabric or through your CI/CD pipeline.
Key Features
🚀Silver Layer Data Ingestion Services
Insert-Only Pattern: Efficient data ingestion with support for schema evolution and historization
SCD Type 2 (Slowly Changing Dimensions): Complete implementation of Type 2 SCD with automatic history tracking
Delta Load Support: Optimized incremental data processing with broadcast join capabilities
Schema Evolution: Automatic handling of schema changes with backward compatibility
📊Materialized Lake Views (MLV)
Automated MLV Generation: Create and manage materialized views with SQL generation
Schema-aware Operations: Intelligent handling of schema changes and column evolution
Lakehouse Integration: Seamless integration with Microsoft Fabric Lakehouse architecture
🏗️Environment-as-Code for Microsoft Fabric
Programmatic Provisioning: Create Lakehouses, Warehouses, Variable Libraries, Notebooks, and Data Pipelines via the Fabric API
Workspace Templating: Define standard workspaces with a parameterized template notebook
Flexible Deployment: Deploy directly in Microsoft Fabric or via CI/CD (e.g., GitHub Actions or Azure DevOps)
Repeatable Setups: Consistent, code-driven environments with minimal boilerplate
🔧Advanced Data Engineering Features
Configurable Transformations: Flexible transformation pipelines with custom business logic
Data Quality Controls: Built-in validation and data quality checks
Performance Optimization: Broadcast joins, partition strategies, and optimized query patterns
Comprehensive Logging: Integrated logging and performance monitoring with
Installation
pip install fabricengineer-py
Quick Start Guide
Prerequisites
Microsoft Fabric workspace with Lakehouse
PySpark environment
Python 3.11+
Usage Examples
Silver Layer Data Ingestion
Insert-Only Pattern
The Insert-Only service is ideal for append-only scenarios where you need to track all changes while maintaining performance.
frompyspark.sqlimportDataFrame,functionsasFfromfabricengineer.loggingimportTimeLoggerfromfabricengineer.transform.lakehouseimportLakehouseTablefromfabricengineer.transformimportSilverIngestionInsertOnlyServicedeftransform_projects(df:DataFrame,etl)->DataFrame:df=df.withColumn("dtime",F.to_timestamp("dtime"))returndfdeftransform_all(df:DataFrame,etl)->DataFrame:df=df.withColumn("data",F.lit("values"))returndf# Initialize performance monitoringtimer=TimeLogger()# Define table-specific transformationstransformations= {"*":transform_all,# Applied to all tables"projects":transform_projects# Applied only to projects table}# Configure source and destination tablessource_table=LakehouseTable(lakehouse="BronzeLakehouse",schema="schema",table="projects")destination_table=LakehouseTable(lakehouse="SilverLakehouse",schema=source_table.schema,table=source_table.table)# Initialize and configure the ETL serviceetl=SilverIngestionInsertOnlyService()etl.init(spark_=spark,notebookutils_=notebookutils,source_table=source_table,destination_table=destination_table,nk_columns=NK_COLUMNS,constant_columns=CONSTANT_COLUMNS,is_delta_load=IS_DELTA_LOAD,delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,transformations=transformations,exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,historize=HISTORIZE,partition_by_columns=PARTITION_BY_COLUMNS,df_bronze=None,create_historized_mlv=True)timer.start().log()etl.run()timer.stop().log()
SCD Type 2 (Slowly Changing Dimensions)
The SCD2 service implements Type 2 Slowly Changing Dimensions with automatic history tracking and current record management.
frompyspark.sqlimportDataFrame,functionsasFfromfabricengineer.loggingimportTimeLoggerfromfabricengineer.transform.lakehouseimportLakehouseTablefromfabricengineer.transformimportSilverIngestionSCD2Servicedeftransform_projects(df:DataFrame,etl)->DataFrame:df=df.withColumn("dtime",F.to_timestamp("dtime"))returndfdeftransform_all(df:DataFrame,etl)->DataFrame:df=df.withColumn("data",F.lit("values"))returndf# Initialize performance monitoringtimer=TimeLogger()# Define table-specific transformationstransformations= {"*":transform_all,# Applied to all tables"projects":transform_projects# Applied only to projects table}# Configure source and destination tablessource_table=LakehouseTable(lakehouse="BronzeLakehouse",schema="schema",table="projects")destination_table=LakehouseTable(lakehouse="SilverLakehouse",schema=source_table.schema,table=source_table.table)# Initialize and configure the ETL serviceetl=SilverIngestionSCD2Service()etl.init(spark_=spark,notebookutils_=notebookutils,source_table=source_table,destination_table=destination_table,nk_columns=NK_COLUMNS,constant_columns=CONSTANT_COLUMNS,is_delta_load=IS_DELTA_LOAD,delta_load_use_broadcast=DELTA_LOAD_USE_BROADCAST,transformations=transformations,exclude_comparing_columns=EXCLUDE_COLUMNS_FROM_COMPARING,include_comparing_columns=INCLUDE_COLUMNS_AT_COMPARING,historize=HISTORIZE,partition_by_columns=PARTITION_BY_COLUMNS,df_bronze=None)timer.start().log()etl.run()timer.stop().log()
Materialized Lake Views Management
Prerequisites
Configure a Utils Lakehouse as your default Lakehouse. The generated view SQL code will be saved as.sql.txt files in the lakehouse under/Files/mlv/{lakehouse}/{schema}/{table}.sql.txt.
fromfabricengineer.mlvimportMaterializeLakeView# Initialize the Materialized Lake View managermlv=MaterializedLakeView(lakehouse="SilverBusinessLakehouse",schema="schema",table="projects")print(mlv.to_dict())# Define your custom SQL querysql="""SELECT p.id ,p.projectname ,p.budget ,u.name AS projectleadFROM dbo.projects pLEFT JOIN users uON p.projectlead_id = u.id"""# Create or replace the materialized viewresult=mlv.create_or_replace(sql)display(result)
Environment-as-Code
Manage Lakehouse
fromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.itemsimportLakehouseworkspace=Workspace.get_by_name("<WORKSPACE_NAME")workspace_id=workspace.item.idlakehouse=Lakehouse(workspace_id,name="LakehouseName")# Static methodslakehouse_by_id=Lakehouse.get_by_id(workspace_id,id="LAKEHOUSE_ID")lakehouse_by_name=Lakehouse.get_by_name(workspace_id,name="LAKEHOUSE_NAME")lakehouses=Lakehouse.list(workspace_id)# Create lakehouselakehouse.create()lakehouse.create_if_not_exists()# Save creation# Update lakehouselakehouse.update(description="Updated description")# Fetch current api datalakehouse.fetch()# Check existsiflakehouse.exists():pass# Deletelakehouse.delete()# API Fieldsid=lakehouse.item.api.idworkspace_id=lakehouse.item.api.workspaceIddisplay_name=lakehouse.item.api.displayNameother=lakehouse.item.api.*# Setted fieldsfields:dict[str,Any]=lakehouse.item.fields
Manage WorkspaceFolder
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderworkspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.id# Create foldersroot_folder=WorkspaceFolder(workspace_id,name="RootFolder")root_folder.create_if_not_exists()sub_folder=WorkspaceFolder(workspace_id,name="SubFolder",parent_folder=root_folder)sub_folder.create_if_not_exists()# Static methodsfolder_by_id=WorkspaceFolder.get_by_id(workspace_id,id="FOLDER_ID")folder_by_name=WorkspaceFolder.get_by_name(workspace_id,name="RootFolder")folders=WorkspaceFolder.list(workspace_id)# Update (rename)root_folder.update(displayName="RootFolderRenamed")# Fetch current api dataroot_folder.fetch()# Check existsifroot_folder.exists():pass# Deletesub_folder.delete()# API Fieldsid=root_folder.item.api.idworkspace_id=root_folder.item.api.workspaceIddisplay_name=root_folder.item.api.displayNameparent_folder_id=root_folder.item.api.parentFolderId# Set fieldsfields:dict[str,Any]=root_folder.item.fields
Manage Warehouse
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimportWarehouseworkspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Warehouses")folder.create_if_not_exists()warehouse=Warehouse(workspace_id=workspace_id,name="WarehouseName",description="Description",folder=folder,collation_type="Latin1_General_100_BIN2_UTF8")# Static methodswarehouse_by_id=Warehouse.get_by_id(workspace_id,id="WAREHOUSE_ID")warehouse_by_name=Warehouse.get_by_name(workspace_id,name="WarehouseName")warehouses=Warehouse.list(workspace_id)# Createwarehouse.create()warehouse.create_if_not_exists()# Updatewarehouse.update(description="Updated description")# Fetchwarehouse.fetch()# Existsifwarehouse.exists():pass# Deletewarehouse.delete()# API Fieldsid=warehouse.item.api.idworkspace_id=warehouse.item.api.workspaceIddisplay_name=warehouse.item.api.displayNameconnection_string=warehouse.item.api.properties.connectionStringcollation_type=warehouse.item.api.properties.collationType# Set fieldsfields:dict[str,Any]=warehouse.item.fields
Manage Notebook
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (Notebook,IPYNBNotebookDefinition,CopyFabricNotebookDefinition)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Notebooks")folder.create_if_not_exists()# Empty notebooknotebook=Notebook(workspace_id=workspace_id,name="NotebookName",description="Description",folder=folder)# From .ipynb fileipynb_def=IPYNBNotebookDefinition(ipynb_path="/path/to/notebook.ipynb")notebook_from_ipynb=Notebook(workspace_id=workspace_id,name="NotebookFromIpynb",description="Description",definition=ipynb_def,folder=folder)# From copycopy_def=CopyFabricNotebookDefinition("<SOURCE_WORKSPACE_ID>","<SOURCE_NOTEBOOK_ID>")notebook_from_copy=Notebook(workspace_id=workspace_id,name="NotebookFromCopy",description="Description",definition=copy_def,folder=folder)# Static methodsnb_by_id=Notebook.get_by_id(workspace_id,id="NOTEBOOK_ID")nb_by_name=Notebook.get_by_name(workspace_id,name="NotebookName")notebooks=Notebook.list(workspace_id)# Createnotebook.create_if_not_exists()# Updatenotebook.update(description="Updated description")# Fetchnotebook.fetch()# Existsifnotebook.exists():pass# Deletenotebook.delete()# API Fieldsid=notebook.item.api.idworkspace_id=notebook.item.api.workspaceIddisplay_name=notebook.item.api.displayName# Set fieldsfields:dict[str,Any]=notebook.item.fields
Manage DataPipeline
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (DataPipeline,ZIPDataPipelineDefinition,CopyDataPipelineDefinition)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Pipelines")folder.create_if_not_exists()# Empty pipelinepipeline=DataPipeline(workspace_id=workspace_id,name="PipelineName",description="Description",folder=folder)# From ZIP definitionzip_def=ZIPDataPipelineDefinition(zip_path="/path/to/pipeline.zip")pipeline_from_zip=DataPipeline(workspace_id=workspace_id,name="PipelineFromZip",description="Description",definition=zip_def,folder=folder)# From copycopy_def=CopyDataPipelineDefinition("<SOURCE_WORKSPACE_ID>","<SOURCE_PIPELINE_ID>")pipeline_from_copy=DataPipeline(workspace_id=workspace_id,name="PipelineFromCopy",description="Description",definition=copy_def,folder=folder)# Static methodsdp_by_id=DataPipeline.get_by_id(workspace_id,id="PIPELINE_ID")dp_by_name=DataPipeline.get_by_name(workspace_id,name="PipelineName")pipelines=DataPipeline.list(workspace_id)# Createpipeline.create_if_not_exists()# Updatepipeline.update(description="Updated description")# Fetchpipeline.fetch()# Existsifpipeline.exists():pass# Deletepipeline.delete()# API Fieldsid=pipeline.item.api.idworkspace_id=pipeline.item.api.workspaceIddisplay_name=pipeline.item.api.displayName# Set fieldsfields:dict[str,Any]=pipeline.item.fields
Manage VariableLibrary
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspacefromfabricengineer.api.fabric.workspace.folder.folderimportWorkspaceFolderfromfabricengineer.api.fabric.workspace.itemsimport (VariableLibrary,VariableLibraryDefinition,VariableLibraryVariable)workspace=Workspace.get_by_name("<WORKSPACE_NAME>")workspace_id=workspace.item.idfolder=WorkspaceFolder(workspace_id,name="Variables")folder.create_if_not_exists()definition=VariableLibraryDefinition( ["TEST","PROD"],VariableLibraryVariable(name="ApiUrl",note="",type="String",value="https://api.test"),VariableLibraryVariable(name="ApiKey",note="",type="String",value="secret"))varlib=VariableLibrary(workspace_id=workspace_id,name="VariableLibrary",description="Description",folder=folder,definition=definition)# Static methodsvl_by_id=VariableLibrary.get_by_id(workspace_id,id="VARIABLE_LIBRARY_ID")vl_by_name=VariableLibrary.get_by_name(workspace_id,name="VariableLibrary")varlibs=VariableLibrary.list(workspace_id)# Createvarlib.create_if_not_exists()# Updatevarlib.update(description="Updated description")# Fetchvarlib.fetch()# Existsifvarlib.exists():pass# Deletevarlib.delete()# API Fieldsid=varlib.item.api.idworkspace_id=varlib.item.api.workspaceIddisplay_name=varlib.item.api.displayNameactive_value_set=varlib.item.api.properties.activeValueSetName# Set fieldsfields:dict[str,Any]=varlib.item.fields
Manage Workspace
fromfabricengineer.api.fabric.workspace.workspaceimportWorkspace# Createws=Workspace(name="MyWorkspace",description="New Workspace",capacity_id="<CAPACITY_ID>"# Optional)ws.create()ws.create_if_not_exists()# Static methodsws_by_id=Workspace.get_by_id("WORKSPACE_ID")ws_by_name=Workspace.get_by_name("MyWorkspace")workspaces=Workspace.list()# Updatews.update(description="Updated description")# Fetchws.fetch()# Existsifws.exists():pass# Deletews.delete()# API Fieldsid=ws.item.api.iddisplay_name=ws.item.api.displayNamedescription=ws.item.api.descriptioncapacity_id=ws.item.api.capacityIdcapacity_region=ws.item.api.capacityRegionone_lake_blob=ws.item.api.oneLakeEndpoints.blobEndpointone_lake_dfs=ws.item.api.oneLakeEndpoints.dfsEndpoint# Set fieldsfields:dict[str,Any]=ws.item.fields
Import specific package modules directly into your Fabric notebooks from GitHub releases:
# Cell 1:importrequestsVERSION="1.0.0"url=f"https://raw.githubusercontent.com/enricogoerlitz/fabricengineer-py/refs/tags/{VERSION}/src/fabricengineer/import_module/import_module.py"resp=requests.get(url)code=resp.textexec(code,globals())# This provides the 'import_module' functionassertcode.startswith("import requests")assert"def import_module"incode# Cell 2mlv_module=import_module("transform.mlv",VERSION)scd2_module=import_module("transform.silver.scd2",VERSION)insertonly_module=import_module("transform.silver.insertonly",VERSION)# Cell 3 - Use mlv moduleexec(mlv_module,globals())# Provides MaterializedLakeView class and mlv instancemlv.init(lakehouse="SilverBusinessLakehouse",schema="schema",table="projects")print(mlv.to_dict())# Cell 4 - Use scd2 moduleexec(scd2_module,globals())# Provides an instantiated etl objectetl.init(...)print(str(etl))# Cell 5 - Use insertonly moduleexec(insertonly_module,globals())# Provides an instantiated etl objectetl.init(...)print(str(etl))
Advanced Features
Performance Optimization
Broadcast Joins: Automatically optimize small table joins
Partition Strategies: Intelligent partitioning for better query performance
Schema Evolution: Handle schema changes without breaking existing pipelines
Delta Load Processing: Efficient incremental data processing
Data Quality & Validation
Automatic Validation: Built-in checks for data consistency and quality
Type Safety: Comprehensive type annotations for better development experience
Error Handling: Robust error handling and recovery mechanisms
Monitoring & Logging
fromfabricengineer.loggingimportTimeLogger,logger# Performance monitoringtimer=TimeLogger()timer.start().log()# Your ETL operations hereetl.run()timer.stop().log()# Custom fabricengineer logginglogger.info("Custom log message")logger.error("Error occurred during processing")
About
FabricEngineer is a comprehensive Python package designed specifically for Microsoft Fabric developers to streamline data transformation workflows and automate complex ETL processes. This package provides enterprise-grade solutions for building robust data pipelines with minimal boilerplate code.