Posted August 24, 20231 yr Microsoft Fabric provides both Data Lakehouse and Data Warehouse platforms for Data Analytics. In a separate post, I illustrated a Metadata Driven Pipeline pattern for Microsoft Fabric following the medallion architecture with Fabric Data Lakehouses used for both the Bronze and Gold layers and SQL views over tables for the Silver layer. Fabric Data Lakehouse is perfect for landing data into the Bronze layer. And if I can host my star schema in a Fabric Data Lakehouse, why consider Fabric Data Warehouse as the Gold layer? Because Fabric Data Warehouse has some features that may be a better fit for your organization. Consider Fabric Data Warehouse for the Gold layer when: Support for multi-table transactions is needed Migrating stored procedures for ETL from other SQL platforms T-SQL operations such as UPDATE, INSERT, DELETE or CREATE TABLE , are needed, especially for migration scenarios Your workload has many “trickle transactions” (updates to a few rows), which generally perform better in SQL than Spark for large tables Power BI Datasets Import Storage mode is desired due to features currently not supported in Direct Lake or for very complex DAX calculations. Developers want to use existing SQL skills In this blog post, I will provide an overview of a Metadata driven pipeline in Microsoft Fabric that follows the medallion architecture with Data Warehouse serving as the Gold layer. The intent is not to provide a full tutorial on building metadata driven pipelines or Microsoft Fabric; rather it is show you some new features of Fabric and give you some ideas implementing metadata driven pipelines in Fabric. Metadata driven architecture for Fabric with Bronze Lakehouse and Gold Data Warehouse If you read my original post about Metadata Driven Pipelines for Fabric, the diagram above may look familiar to you. That’s because steps 1 – 7 are essentially the same as the original post and entail loading data to the Bronze Fabric Lakehouse and creating SQL Server views. If you read the first article and want to jump ahead, take a quick look at step 1 to see the different pipeline configuration table for loading to Gold Data Warehouse, then jump to step 8 and continue on to review loading the Gold Data Warehouse and connecting to it from Power BI. If you haven’t read the original post or want a refresher, follow the outline below for each step in the diagram: 1 - Define pipeline configuration tables Tables are defined that contain the configuration for each type of data load, 1 table for loading from the source SQL database to the Bronze Fabric Lakehouse and a 2nd table defined for loading from the Bronze Fabric Lakehouse/Silver Views to the Gold Data Warehouse. Each table contains a row for each source/destination combination and includes such fields as source table name, source schema, date key, start date, and load type (full or incremental). The tables also contain fields for pipeline run results, such as the number of rows inserted, updated, status, and max table transaction date, which are updated after each table is loaded. The table below shows the configuration table for loading from a Source SQL database to the Bronze Lakehouse: The table below shows the configuration table for loading from the Bronze Lakehouse to the Gold Warehouse: 2 - Get Configuration details for tables load from Source to Bronze Lakehouse Below is what the final Orchestrator pipeline will look like, with the relevant steps from the architecture diagram above indicated: The orchestrator pipeline contains a Lookup activity on the Source to Bronze configuration table to get the list of tables to load from source to the Bronze. 3 - Call child pipeline to load data from Source to Bronze Lakehouse For each table defined in the Lookup activity, call a child pipeline to load the data from Source to Bronze Lakehouse, passing in the configuration detail from the lookup. For Each activity: Child pipeline to load from Source to Bronze Lakehouse: 4 - Copy Data from Source to Bronze Lakehouse A step to set a variable called datepredicate is part of this pipeline. A selection predicate based upon date is needed for incremental loads from the source or if you want to load just a subset of the data. This simplifies the creation of the SQL source query string in the subsequent Copy Data activity. If the load type setting from the configuration table is a full load, do a Copy Data Activity from the Source to Bronze Lakehouse Delta Lake Table. Full load Copy Data Source settings: Full Load Copy Data Destination Settings: If the load type setting from the configuration table is an incremental load, do a Copy Data Activity from the Source to Bronze Lakehouse but as set destination as a Parquet file. Incremental load Copy Data source settings: Incremental load Copy Data destination settings: 5 - Call Notebook for incremental load merge For incremental loads only, call a Spark Notebook to merge the incremental data to the Bronze Delta Lake table. Create or Merge to Deltalake Notebook below: from delta.tables import * from pyspark.sql.functions import * lakehousePath = "abfss://yourpathhere" tableName = "Invoices" tableKey = "InvoiceID" tableKey2 = None dateColumn = "LastEditedWhen" deltaTablePath = f"{lakehousePath}/Tables/{tableName}" parquetFilePath = f"{lakehousePath}/Files/incremental/{tableName}/{tableName}.parquet" df2 = spark.read.parquet(parquetFilePath) if tableKey2 is None: mergeKeyExpr = f"t.{tableKey} = s.{tableKey}" else: mergeKeyExpr = f"t.{tableKey} = s.{tableKey} AND t.{tableKey2} = s.{tableKey2}" #Check if table already exists; if it does, do an upsert and return how many rows were inserted and update; if it does not exist, return how many rows were inserted if DeltaTable.isDeltaTable(spark,deltaTablePath): deltaTable = DeltaTable.forPath(spark,deltaTablePath) deltaTable.alias("t").merge( df2.alias("s"), mergeKeyExpr ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() history = deltaTable.history(1).select("operationMetrics") operationMetrics = history.collect()[0]["operationMetrics"] numInserted = operationMetrics["numTargetRowsInserted"] numUpdated = operationMetrics["numTargetRowsUpdated"] else: df2.write.format("delta").save(deltaTablePath) deltaTable = DeltaTable.forPath(spark,deltaTablePath) operationMetrics = history.collect()[0]["operationMetrics"] numInserted = operationMetrics["numTargetRowsInserted"] numUpdated = 0 #Get the latest date loaded into the table - this will be used for watermarking; return the max date, the number of rows inserted and number updated deltaTablePath = f"{lakehousePath}/Tables/{tableName}" df3 = spark.read.format("delta").load(deltaTablePath) maxdate = df3.agg(max(dateColumn)).collect()[0][0] # print(maxdate) maxdate_str = maxdate.strftime("%Y-%m-%d %H:%M:%S") result = "maxdate="+maxdate_str + "|numInserted="+str(numInserted)+ "|numUpdated="+str(numUpdated) # result = {"maxdate": maxdate_str, "numInserted": numInserted, "numUpdated": numUpdated} mssparkutils.notebook.exit(str(result)) Return the number of rows inserted, updated and max date from the notebook results and store them in pipeline variables. 6 - Save pipeline run results to configuration table For each table loaded, update the configuration table with the load details such as the number of rows read, inserted and updated from the variables or Copy Data output. What is especially critical for the incremental load is to update the start date configuration with the max transaction date loaded., which is returned in the Create or Merge to Deltalake notebook. This will be used to retrieve records from the source on the next subsequent run which are greater or equal to the max datetime of the table data loaded in this run. 7 - Leverage SQL Views over Bronze Lakehouse tables for Silver layer SQL views are defined over the Bronze Lakehouse tables. These views will be the source for loading the Gold Warehouse tables from the Bronze Lakehouse tables. While SQL views are supported in the Lakehouse SQL Endpoint, they are created and accessible only via the SQL Endpoint, which means that they are not available to us in a Data Factory Copy Data activity. The Copy Data Activity only leverages the Lakehouse endpoint at this time. However, views are accessible in the Fabric Datawarehouse in a Copy Data Activity. Therefore, views are created in a Fabric Data Warehouse. All Data Warehouse views reference Lakehouse Bronze tables and there is no data movement between Bronze to Silver. Below is a view created and used in this solution: 8 - Get configuration details to load tables from Silver Views/Bronze Lakehouse to Gold Data Warehouse With the tables loaded in the Bronze Lakehouse and the views defined in the Silver layer, we can load the tables to the Gold Data Warehouse. In the orchestrator pipeline, do a Lookup on the configuration table to get the details for each Gold table load: 9 - Call child pipeline to load data from Silver Views/Bronze Lakehouse to Gold Data Warehouse For each table configuration returned from the Lookup activity, call a child pipeline to load the data from the Silver views over the Bronze Lakehouse tables to the Gold Data Warehouse, passing in the configuration detail from the lookup. Child pipeline to load from Bronze Lakehouse to Gold Warehouse: If the load type from the configuration table is a full load, do a Copy Data Activity from the Silver Warehouse View to Gold Warehouse Table. Full load Copy Data Source settings: Full Load Copy Data Destination Settings: 10 - Call SQL Stored procedure for incremental load to Gold Data Warehouse tables For incremental loads, SQL stored procedures are called to insert or update rows in the Gold Data Warehouse table. The stored procedure to invoke is part of the Gold Warehouse pipeline configuration table and is defined for each table that is incrementally loaded. With the Delta Lakehouse as Gold architecture, it was fairly simple to define metadata driven attributes for incremental loads with a single Spark Notebook. SQL is not quite as flexible so I opted to create a separate stored procedure for each incremental load. Here's an example stored procedure: CREATE PROC [Gold].[incrLoadInvoicedSales] @StartDate DATETIME, @EndDate DATETIME AS BEGIN SET NOCOUNT ON; DECLARE @UpdateCount INT, @InsertCount INT IF @StartDate IS NULL BEGIN SELECT @StartDate = isnull(MAX(LastUpdated),'2013-01-01') FROM [jhFTAFabricWarehouse].[Gold].[invoicedSales] END; IF @EndDate IS NULL BEGIN SET @EndDate = '9999-12-31' END UPDATE target SET target.InvoiceDate = source.InvoiceDate, target.CustomerID = source.CustomerID, target.StockItemID = source.StockItemID, target.SalespersonPersonID = source.SalespersonPersonID, target.ExtendedPrice = source.ExtendedPrice, target.Quantity = source.Quantity, target.GrossProfit = source.GrossProfit, target.TaxAmount = source.TaxAmount, target.LastUpdated = source.LastUpdated FROM [jhFTAFabricWarehouse].[Gold].[invoicedSales] AS target INNER JOIN [jhFTAFabricWarehouse].[silver].[vInvoicedSales] AS source ON (target.InvoiceID = source.InvoiceID AND target.InvoiceLineID = source.InvoiceLineID) WHERE source.LastUpdated BETWEEN @StartDate and @EndDate; SELECT @UpdateCount = @@ROWCOUNT INSERT INTO [jhFTAFabricWarehouse].[Gold].[invoicedSales] (InvoiceID, InvoiceLineID, InvoiceDate, CustomerID, StockItemID, SalespersonPersonID, ExtendedPrice, Quantity,GrossProfit,TaxAmount, LastUpdated) SELECT source.InvoiceID, source.InvoiceLineID, source.InvoiceDate, source.CustomerID, source.StockItemID, source.SalespersonPersonID, source.ExtendedPrice, source.Quantity, source.GrossProfit, source.TaxAmount,source.LastUpdated FROM [jhFTAFabricWarehouse].[silver].[vInvoicedSales] AS source LEFT JOIN [jhFTAFabricWarehouse].[Gold].[invoicedSales] AS target ON (target.InvoiceID = source.InvoiceID AND target.InvoiceLineID = source.InvoiceLineID) WHERE target.InvoiceID IS NULL AND target.InvoiceLineID IS NULL AND source.LastUpdated BETWEEN @StartDate and @EndDate END SELECT @InsertCount = @@ROWCOUNT SELECT @UpdateCount as UpdateCount, @InsertCount as InsertCount, @StartDate as MaxDate In the pipeline, a Lookup Activity calls the stored procedure to insert or update the Gold table and returns the number rows inserted, updated, and max date: Output from Lookup Activity which returns counts and max date: 11 - Save pipeline run results to configuration table Like step 6 for the Load Source to Bronze pipeline, update the Gold configuration table with the load details such as the number of rows read, inserted and updated. Again, it is critical to update the start date configuration with the max transaction date for incremental loads. 12 - Create Fabric Dataset The Fabric Data Warehouse’s default dataset storage mode is DirectQuery for all tables and cannot be changed to Import. And, at this time, we cannot create a new dataset in the Fabric workspace with Import storage mode: In order to use Import storage mode, we need to create the Power BI dataset in Power BI Desktop. Note that import is currently available only by connecting to the SQL endpoint of the Fabric Data Warehouse or Data Lakehouse. In Power BI Desktop, connect to the OneLake datahub, choose the Fabric Data Warehouse, and switch the Connect type to Connect to SQL Endpoint: Choose the tables in the Gold schema: Specify Import for connection settings: Now model your dataset just like any other Power BI dataset. Like we did for the Fabric Lakehouse Bronze transaction tables and Fabric Data Warehouse Gold fact tables, we can configure Power BI fact tables to load only new or changed data via incremental refresh: Define RangeStart and RangeEnd parameters in Power Query. Use the parameters to filter your fact rows on the fact tables. In the Power BI desktop table pane, right click on the fact table name and choose Incremental Refresh: Define the settings for the incremental refresh: 13 – Publish Power BI dataset to Fabric Workspace Save and publish the dataset to the Fabric workspace: Go to the dataset setting in the Fabric service to schedule the dataset refresh: 14 - Create Power BI Reports Finally create your Power BI reports over the new dataset in the Fabric tenant or in Power BI desktop. Below I used the Auto Report feature to get a head start: I hope you all have been having as much fun with Microsoft Fabric as I have! To learn more, check out the additional resources below: https://learn.microsoft.com/en-us/fabric/get-started/ What is a lakehouse? - Microsoft Fabric Data Factory in Microsoft Fabric documentation - Microsoft Fabric Data Warehousing in Microsoft Fabric - Microsoft Fabric Fabric decision guide - lakehouse or data warehouse - Microsoft Fabric Microsoft Fabric: Lakehouse vs Warehouse video | James Serra's Blog Microsoft Fabric – Microsoft Fabric – Criteria to make decision – Part 3 Continue reading...
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.