Jump to content

Metadata Driven Pipelines for Microsoft Fabric – Part 2, Data Warehouse Style

Featured Replies

Posted

Microsoft Fabric provides both Data Lakehouse and Data Warehouse platforms for Data Analytics. In a separate post, I illustrated a Metadata Driven Pipeline pattern for Microsoft Fabric following the medallion architecture with Fabric Data Lakehouses used for both the Bronze and Gold layers and SQL views over tables for the Silver layer. Fabric Data Lakehouse is perfect for landing data into the Bronze layer. And if I can host my star schema in a Fabric Data Lakehouse, why consider Fabric Data Warehouse as the Gold layer? Because Fabric Data Warehouse has some features that may be a better fit for your organization.

 

 

 

Consider Fabric Data Warehouse for the Gold layer when:

 

 

In this blog post, I will provide an overview of a Metadata driven pipeline in Microsoft Fabric that follows the medallion architecture with Data Warehouse serving as the Gold layer. The intent is not to provide a full tutorial on building metadata driven pipelines or Microsoft Fabric; rather it is show you some new features of Fabric and give you some ideas implementing metadata driven pipelines in Fabric.

 

 

 

Metadata driven architecture for Fabric with Bronze Lakehouse and Gold Data Warehouse

 

 

largevv2px999.png.4b114675d0c9b6b95348b2189a8656aa.png

 

If you read my original post about Metadata Driven Pipelines for Fabric, the diagram above may look familiar to you. That’s because steps 1 – 7 are essentially the same as the original post and entail loading data to the Bronze Fabric Lakehouse and creating SQL Server views. If you read the first article and want to jump ahead, take a quick look at step 1 to see the different pipeline configuration table for loading to Gold Data Warehouse, then jump to step 8 and continue on to review loading the Gold Data Warehouse and connecting to it from Power BI. If you haven’t read the original post or want a refresher, follow the outline below for each step in the diagram:

 

 

 

1 - Define pipeline configuration tables

 

 

Tables are defined that contain the configuration for each type of data load, 1 table for loading from the source SQL database to the Bronze Fabric Lakehouse and a 2nd table defined for loading from the Bronze Fabric Lakehouse/Silver Views to the Gold Data Warehouse. Each table contains a row for each source/destination combination and includes such fields as source table name, source schema, date key, start date, and load type (full or incremental). The tables also contain fields for pipeline run results, such as the number of rows inserted, updated, status, and max table transaction date, which are updated after each table is loaded.

 

 

 

The table below shows the configuration table for loading from a Source SQL database to the Bronze Lakehouse:

 

largevv2px999.png.dccb4aa970b58c6dc71ecd2f43164e67.png

 

 

 

The table below shows the configuration table for loading from the Bronze Lakehouse to the Gold Warehouse:

 

largevv2px999.png.79ed464e180f42781030567e6c87c5d7.png

 

 

 

2 - Get Configuration details for tables load from Source to Bronze Lakehouse

 

 

Below is what the final Orchestrator pipeline will look like, with the relevant steps from the architecture diagram above indicated:

 

largevv2px999.png.7f65770ff7adb9d6b8f923e0c562b007.png

 

The orchestrator pipeline contains a Lookup activity on the Source to Bronze configuration table to get the list of tables to load from source to the Bronze.

 

largevv2px999.png.4901047d5994cf6a4b7980c8b8ee0f82.png

 

3 - Call child pipeline to load data from Source to Bronze Lakehouse

 

 

For each table defined in the Lookup activity, call a child pipeline to load the data from Source to Bronze Lakehouse, passing in the configuration detail from the lookup.

 

 

 

For Each activity:

 

largevv2px999.png.ebc17f078d24a80a678c2ebd8d505c14.png

 

Child pipeline to load from Source to Bronze Lakehouse:

 

largevv2px999.png.344d28c38b6bfae4c8aed4eda524eb26.png

 

 

 

4 - Copy Data from Source to Bronze Lakehouse

 

 

A step to set a variable called datepredicate is part of this pipeline. A selection predicate based upon date is needed for incremental loads from the source or if you want to load just a subset of the data. This simplifies the creation of the SQL source query string in the subsequent Copy Data activity.

 

 

 

largevv2px999.png.a3d6521de57145995fb8fd5a76c6268b.png

 

 

 

 

 

If the load type setting from the configuration table is a full load, do a Copy Data Activity from the Source to Bronze Lakehouse Delta Lake Table.

 

 

 

Full load Copy Data Source settings:

 

largevv2px999.png.1ec3539f071a792e26fac57e462e6083.png

 

 

 

Full Load Copy Data Destination Settings:

 

largevv2px999.png.4e8b5ab84e81929f1560bb560cacfe0c.png

 

 

 

If the load type setting from the configuration table is an incremental load, do a Copy Data Activity from the Source to Bronze Lakehouse but as set destination as a Parquet file.

 

 

 

Incremental load Copy Data source settings:

 

largevv2px999.png.db9efbf87a9585900cad8c6ad22b15f5.png

 

 

 

Incremental load Copy Data destination settings:

 

largevv2px999.png.4b287d261ed8e70b8e8e0c856751ead7.png

 

5 - Call Notebook for incremental load merge

 

 

For incremental loads only, call a Spark Notebook to merge the incremental data to the Bronze Delta Lake table.

 

largevv2px999.png.7ca2679bc3bf1b0e2e432ac283ba2735.png

 

 

 

 

 

Create or Merge to Deltalake Notebook below:

 

 

 

 

 

from delta.tables import *

from pyspark.sql.functions import *

 

lakehousePath = "abfss://yourpathhere"

tableName = "Invoices"

tableKey = "InvoiceID"

tableKey2 = None

dateColumn = "LastEditedWhen"

 

deltaTablePath = f"{lakehousePath}/Tables/{tableName}"

parquetFilePath = f"{lakehousePath}/Files/incremental/{tableName}/{tableName}.parquet"

 

df2 = spark.read.parquet(parquetFilePath)

 

if tableKey2 is None:

mergeKeyExpr = f"t.{tableKey} = s.{tableKey}"

else:

mergeKeyExpr = f"t.{tableKey} = s.{tableKey} AND t.{tableKey2} = s.{tableKey2}"

 

#Check if table already exists; if it does, do an upsert and return how many rows were inserted and update; if it does not exist, return how many rows were inserted

if DeltaTable.isDeltaTable(spark,deltaTablePath):

deltaTable = DeltaTable.forPath(spark,deltaTablePath)

deltaTable.alias("t").merge(

df2.alias("s"),

mergeKeyExpr

).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

history = deltaTable.history(1).select("operationMetrics")

operationMetrics = history.collect()[0]["operationMetrics"]

numInserted = operationMetrics["numTargetRowsInserted"]

numUpdated = operationMetrics["numTargetRowsUpdated"]

else:

df2.write.format("delta").save(deltaTablePath)

deltaTable = DeltaTable.forPath(spark,deltaTablePath)

operationMetrics = history.collect()[0]["operationMetrics"]

numInserted = operationMetrics["numTargetRowsInserted"]

numUpdated = 0

 

#Get the latest date loaded into the table - this will be used for watermarking; return the max date, the number of rows inserted and number updated

 

deltaTablePath = f"{lakehousePath}/Tables/{tableName}"

df3 = spark.read.format("delta").load(deltaTablePath)

maxdate = df3.agg(max(dateColumn)).collect()[0][0]

# print(maxdate)

maxdate_str = maxdate.strftime("%Y-%m-%d %H:%M:%S")

 

result = "maxdate="+maxdate_str + "|numInserted="+str(numInserted)+ "|numUpdated="+str(numUpdated)

# result = {"maxdate": maxdate_str, "numInserted": numInserted, "numUpdated": numUpdated}

mssparkutils.notebook.exit(str(result))

 

 

 

 

 

 

 

 

 

Return the number of rows inserted, updated and max date from the notebook results and store them in pipeline variables.

 

 

 

6 - Save pipeline run results to configuration table

 

 

For each table loaded, update the configuration table with the load details such as the number of rows read, inserted and updated from the variables or Copy Data output. What is especially critical for the incremental load is to update the start date configuration with the max transaction date loaded., which is returned in the Create or Merge to Deltalake notebook. This will be used to retrieve records from the source on the next subsequent run which are greater or equal to the max datetime of the table data loaded in this run.

 

largevv2px999.png.2a8ae487832ecce6dcde4a9b33301de7.png

 

7 - Leverage SQL Views over Bronze Lakehouse tables for Silver layer

 

 

SQL views are defined over the Bronze Lakehouse tables. These views will be the source for loading the Gold Warehouse tables from the Bronze Lakehouse tables.

 

 

 

While SQL views are supported in the Lakehouse SQL Endpoint, they are created and accessible only via the SQL Endpoint, which means that they are not available to us in a Data Factory Copy Data activity. The Copy Data Activity only leverages the Lakehouse endpoint at this time. However, views are accessible in the Fabric Datawarehouse in a Copy Data Activity. Therefore, views are created in a Fabric Data Warehouse. All Data Warehouse views reference Lakehouse Bronze tables and there is no data movement between Bronze to Silver. Below is a view created and used in this solution:

 

 

 

largevv2px999.png.2f64f4d1df3228824f07ea43d6437a65.png

 

 

 

8 - Get configuration details to load tables from Silver Views/Bronze Lakehouse to Gold Data Warehouse

 

 

With the tables loaded in the Bronze Lakehouse and the views defined in the Silver layer, we can load the tables to the Gold Data Warehouse. In the orchestrator pipeline, do a Lookup on the configuration table to get the details for each Gold table load:

 

largevv2px999.png.0a0242c0e9c07b6f07ebae7f54aefa8f.png

 

 

 

9 - Call child pipeline to load data from Silver Views/Bronze Lakehouse to Gold Data Warehouse

 

 

For each table configuration returned from the Lookup activity, call a child pipeline to load the data from the Silver views over the Bronze Lakehouse tables to the Gold Data Warehouse, passing in the configuration detail from the lookup.

 

 

 

largevv2px999.png.15f722b2b8fabc6eb10fa63259e99229.png

 

 

 

 

 

Child pipeline to load from Bronze Lakehouse to Gold Warehouse:

 

 

 

largevv2px999.png.b187155b06a49aaa774f81d2000bca3b.png

 

 

 

If the load type from the configuration table is a full load, do a Copy Data Activity from the Silver Warehouse View to Gold Warehouse Table.

 

 

 

Full load Copy Data Source settings:

 

 

 

largevv2px999.png.0560e7050ba70cbf7a4d23e721251a18.png

 

 

 

 

 

Full Load Copy Data Destination Settings:

 

 

 

largevv2px999.png.eb18b2f1ae602810ec2a58e82df7ac6c.png

 

 

 

 

 

10 - Call SQL Stored procedure for incremental load to Gold Data Warehouse tables

 

 

For incremental loads, SQL stored procedures are called to insert or update rows in the Gold Data Warehouse table. The stored procedure to invoke is part of the Gold Warehouse pipeline configuration table and is defined for each table that is incrementally loaded. With the Delta Lakehouse as Gold architecture, it was fairly simple to define metadata driven attributes for incremental loads with a single Spark Notebook. SQL is not quite as flexible so I opted to create a separate stored procedure for each incremental load.

 

 

 

Here's an example stored procedure:

 

 

 

 

 

CREATE PROC [Gold].[incrLoadInvoicedSales]

@StartDate DATETIME,

@EndDate DATETIME

AS

BEGIN

 

SET NOCOUNT ON;

 

DECLARE @UpdateCount INT, @InsertCount INT

 

IF @StartDate IS NULL

BEGIN

SELECT @StartDate = isnull(MAX(LastUpdated),'2013-01-01')

FROM [jhFTAFabricWarehouse].[Gold].[invoicedSales]

END;

 

IF @EndDate IS NULL

BEGIN

SET @EndDate = '9999-12-31'

END

 

UPDATE target

SET target.InvoiceDate = source.InvoiceDate,

target.CustomerID = source.CustomerID,

target.StockItemID = source.StockItemID,

target.SalespersonPersonID = source.SalespersonPersonID,

target.ExtendedPrice = source.ExtendedPrice,

target.Quantity = source.Quantity,

target.GrossProfit = source.GrossProfit,

target.TaxAmount = source.TaxAmount,

target.LastUpdated = source.LastUpdated

FROM [jhFTAFabricWarehouse].[Gold].[invoicedSales] AS target

INNER JOIN [jhFTAFabricWarehouse].[silver].[vInvoicedSales] AS source

ON (target.InvoiceID = source.InvoiceID AND target.InvoiceLineID = source.InvoiceLineID)

WHERE source.LastUpdated BETWEEN @StartDate and @EndDate;

 

SELECT @UpdateCount = @@ROWCOUNT

 

INSERT INTO [jhFTAFabricWarehouse].[Gold].[invoicedSales] (InvoiceID, InvoiceLineID, InvoiceDate, CustomerID, StockItemID, SalespersonPersonID,

ExtendedPrice, Quantity,GrossProfit,TaxAmount, LastUpdated)

SELECT source.InvoiceID, source.InvoiceLineID, source.InvoiceDate, source.CustomerID, source.StockItemID, source.SalespersonPersonID,

source.ExtendedPrice, source.Quantity, source.GrossProfit, source.TaxAmount,source.LastUpdated

FROM [jhFTAFabricWarehouse].[silver].[vInvoicedSales] AS source

LEFT JOIN [jhFTAFabricWarehouse].[Gold].[invoicedSales] AS target

ON (target.InvoiceID = source.InvoiceID AND target.InvoiceLineID = source.InvoiceLineID)

WHERE target.InvoiceID IS NULL AND target.InvoiceLineID IS NULL AND source.LastUpdated BETWEEN @StartDate and @EndDate

END

 

SELECT @InsertCount = @@ROWCOUNT

 

SELECT @UpdateCount as UpdateCount, @InsertCount as InsertCount, @StartDate as MaxDate

 

 

 

 

 

In the pipeline, a Lookup Activity calls the stored procedure to insert or update the Gold table and returns the number rows inserted, updated, and max date:

 

 

 

largevv2px999.png.3a86593d834ed5c173e52f6ab8007f1c.png

 

 

 

 

 

Output from Lookup Activity which returns counts and max date:

 

 

 

largevv2px999.png.4e3552ec3230e6754c881b0d48cb46d2.png

 

11 - Save pipeline run results to configuration table

 

 

Like step 6 for the Load Source to Bronze pipeline, update the Gold configuration table with the load details such as the number of rows read, inserted and updated. Again, it is critical to update the start date configuration with the max transaction date for incremental loads.

 

largevv2px999.png.d97eb5d99c369f2466ae9ae962878cd5.png

 

 

 

12 - Create Fabric Dataset

 

 

The Fabric Data Warehouse’s default dataset storage mode is DirectQuery for all tables and cannot be changed to Import. And, at this time, we cannot create a new dataset in the Fabric workspace with Import storage mode:

 

 

 

largevv2px999.png.f2439948dd0903e380b44d99b0525cec.png

 

 

 

In order to use Import storage mode, we need to create the Power BI dataset in Power BI Desktop. Note that import is currently available only by connecting to the SQL endpoint of the Fabric Data Warehouse or Data Lakehouse.

 

 

 

In Power BI Desktop, connect to the OneLake datahub, choose the Fabric Data Warehouse, and switch the Connect type to Connect to SQL Endpoint:

 

largevv2px999.png.bd3092628cdafff08439aa0511fe7d94.png

 

 

 

Choose the tables in the Gold schema:

 

 

 

largevv2px999.thumb.png.d3ff8edabaabe0e036d20787d7aa37d7.png

 

 

 

Specify Import for connection settings:

 

largevv2px999.png.c8405f3e9765773b1d80b201f68cb597.png

 

 

 

Now model your dataset just like any other Power BI dataset.

 

 

 

largevv2px999.png.69d8471e7c995d0a449bb41abd1c8d0e.png

 

 

 

Like we did for the Fabric Lakehouse Bronze transaction tables and Fabric Data Warehouse Gold fact tables, we can configure Power BI fact tables to load only new or changed data via incremental refresh:

 

 

 

Define RangeStart and RangeEnd parameters in Power Query. Use the parameters to filter your fact rows on the fact tables.

 

largevv2px999.png.ecda5aee29294008110ab6ccfcc75623.png

 

 

 

In the Power BI desktop table pane, right click on the fact table name and choose Incremental Refresh:

 

largevv2px999.png.fba68aa4eff242d82c894aaa212bf32c.png

 

 

 

Define the settings for the incremental refresh:

 

largevv2px999.thumb.png.eff537f9cc2d411705ad63648472ebd8.png

 

 

 

13 – Publish Power BI dataset to Fabric Workspace

 

 

Save and publish the dataset to the Fabric workspace:

 

 

 

largevv2px999.png.4577ea9dab49b2de79cee19b1d420489.png

 

 

 

 

 

Go to the dataset setting in the Fabric service to schedule the dataset refresh:

 

 

 

 

largevv2px999.png.5a8edce5d2f2659a5c5ece4432158624.png

 

 

 

 

 

14 - Create Power BI Reports

 

 

Finally create your Power BI reports over the new dataset in the Fabric tenant or in Power BI desktop. Below I used the Auto Report feature to get a head start:

 

 

 

largevv2px999.png.ec3f2b54ca9fbafbb363a81df8eb56f2.png

 

 

 

largevv2px999.png.e5fd16435b14730f1367d758ab02b1f9.png

 

 

 

I hope you all have been having as much fun with Microsoft Fabric as I have! To learn more, check out the additional resources below:

 

https://learn.microsoft.com/en-us/fabric/get-started/

 

What is a lakehouse? - Microsoft Fabric

 

Data Factory in Microsoft Fabric documentation - Microsoft Fabric

 

Data Warehousing in Microsoft Fabric - Microsoft Fabric

 

Fabric decision guide - lakehouse or data warehouse - Microsoft Fabric

 

Microsoft Fabric: Lakehouse vs Warehouse video | James Serra's Blog

 

Microsoft Fabric – Microsoft Fabric – Criteria to make decision – Part 3

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...