K
KapilSamant
Introduction
If you work in data engineering, you may have encountered the term "Medallion Architecture." This design pattern organizes data within a Lakehouse into distinct layers to facilitate efficient processing and analysis. Read more about it here. This is also a recommended design approach for Microsoft Fabric. To make a Lakehouse usable, data must pass through several layers: Bronze, Silver, and Gold. Each layer focus on progressively enhancing data cleanliness and quality. In this article, we will specifically explore how to build the bronze layer using real-time data streaming from existing PostgreSQL databases. This approach enables real-time analytics and supports AI applications by providing a real time, raw, and unprocessed data.
Image source - What is a Medallion Architecture?
What is Bronze Layer?
This layer is often referred to as the Raw Zone, where data is stored in its original format and structure. According to the common definition, the data in this layer is typically append-only and immutable, but this can be misunderstood. While the intention is to preserve the original data as it was ingested, this does not mean that there will be no deletions or updates. Instead, if deletions or updates occur, the original values are preserved as older versions. This approach ensures that historical data remains accessible and unaltered. Delta Lake is commonly used to manage this data, as they support versioning and maintain a complete history of changes
PostgreSQL as the source for Bronze Layer
Imagine you have multiple PostgreSQL databases running different applications and you want to integrate their data into a Delta Lake. You have a couple of options to achieve this. The first approach involves creating a Copy activity that extracts data from individual tables and stores it in Delta tables. However, this method requires a watermark column to track changes or necessitates full data reloads each time, which can be inefficient.
The second approach involves setting up Change Data Capture in PostgreSQL to capture and stream data changes continuously. This method allows for real-time data synchronization and efficient updates to OneLake. In this blog, we will explore a proof of concept for implementing this CDC-based approach.
How to utilize PostgreSQL logical decoding, Wal2json and Fabric Delta Lake to create a continuously replicating bronze layer?
We will be utilizing PostgreSQL logical replication, Wal2Json plugin and PySpark to capture and apply the changes to delta lake. In PostgreSQL, logical replication is a method used to replicate data changes from one PostgreSQL instance to another or to a different system. Wal2json is a PostgreSQL output plugin for logical replication that converts Write-Ahead Log (WAL) changes into JSON format.
Setup on Azure PostgreSQL
Change following server parameters by logging into Azure portal and navigating to “Server parameters” of the PostgreSQL service.
Parameter Name | Value |
wal_level | logical |
max_replication_slots | >0 (e.g. 4 or 8 ) |
max_wal_senders | >0 (e.g. 4 or 8 ) |
- Create publication for all the tables. Publication is a feature in logical replication that allows you to define which tables' changes should be streamed to subscribers.
CREATE PUBLICATION cdc_publication FOR ALL TABLES;
-
create a replication slot with wal2json as plugin name. A slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. Note - Wal2json plugin is pre-installed in Azure PostgreSQL
SELECT * FROM pg_create_logical_replication_slot('cdc_slot', 'wal2json');
- You can test if the replication is running by updating some test data and running following command.
SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL,'include-xids', 'true', 'include-timestamp', 'true')
- Now that you have tested the replication, let's look at the output format. Following are the key components of wal2jobs output followed by an example.
Attribute Value xid The transaction ID. timestamp The timestamp when the transaction was committed. kind
Type of operation (insert, update, delete).
schema The schema of the table. table The name of the table where the change occurred. columnnames An array of column names affected by the change. columntypes An array of column data types corresponding to columnnames. columnvalues An array of new values for the columns (present for insert and update operations). oldkeys An object containing the primary key or unique key values before the change (present for update and delete operations).
Code:For INSERT statement { "xid": 8362757, "timestamp": "2024-08-01 15:09:34.086064+05:30", "change": [ { "kind": "insert", "schema": "public", "table": "employees_synapse_test", "columnnames": [ "EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "EMAIL", "PHONE_NUMBER", "HIRE_DATE", "JOB_ID", "SALARY", "COMMISSION_PCT", "MANAGER_ID", "DEPARTMENT_ID" ], "columntypes": [ "numeric(10,0)", "text", "text", "text", "text", "timestamp without time zone", "text", "numeric(8,2)", "numeric(2,2)", "numeric(6,0)", "numeric(4,0)" ], "columnvalues": [ 327, "3275FIRST NAME111", "3275LAST NAME", "3275EMAIL3275EMAIL", "3275", "2024-07-31 00:00:00", "IT_PROG", 32750, 0, 100, 60 ] } ] } For UPDATE statement { "xid": 8362759, "timestamp": "2024-08-01 15:09:37.228446+05:30", "change": [ { "kind": "update", "schema": "public", "table": "employees_synapse_test", "columnnames": [ "EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "EMAIL", "PHONE_NUMBER", "HIRE_DATE", "JOB_ID", "SALARY", "COMMISSION_PCT", "MANAGER_ID", "DEPARTMENT_ID" ], "columntypes": [ "numeric(10,0)", "text", "text", "text", "text", "timestamp without time zone", "text", "numeric(8,2)", "numeric(2,2)", "numeric(6,0)", "numeric(4,0)" ], "columnvalues": [ 100, "Third1111", "BLOB", "SKING", "515.123.4567", "2024-08-01 00:00:00", "AD_PRES", 24000, null, null, 90 ], "oldkeys": { "keynames": [ "EMPLOYEE_ID" ], "keytypes": [ "numeric(10,0)" ], "keyvalues": [ 100 ] } } ] } For DELETE statement { "xid": 8362756, "timestamp": "2024-08-01 15:09:29.552539+05:30", "change": [ { "kind": "delete", "schema": "public", "table": "employees_synapse_test", "oldkeys": { "keynames": [ "EMPLOYEE_ID" ], "keytypes": [ "numeric(10,0)" ], "keyvalues": [ 327 ] } } ] }
- Create OneLake in Fabric. For detailed instruction see this.
Create a delta table with initial load of the data using Spark.
Code:# PostgreSQL connection details jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres" jdbc_properties = { "user": "postgres", "driver": "org.postgresql.Driver" } # Read data from PostgreSQL employees table employee_df = spark.read.jdbc(url=jdbc_url, table="employees", properties=jdbc_properties) # Define the path for the Delta table in ADLS delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees" # Write DataFrame to Delta table employee_df.write.format("delta").mode("overwrite").save(delta_table_path) delta_df = spark.read.format("delta").load(delta_table_path) delta_df.show()
- Now running the following code continuously will keep the data in delta lake in sync with the primary PostgreSQL database.
Code:import json from pyspark.sql import SparkSession from pyspark.sql.functions import lit from delta.tables import DeltaTable import pandas as pd # PostgreSQL connection details jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres" jdbc_properties = { "user": "postgres", "driver": "org.postgresql.Driver" } #Delta table details delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees" delta_table = DeltaTable.forPath(spark, delta_table_path) delta_df = spark.read.format("delta").load(delta_table_path) schema = delta_df.schema loop cdc_df = spark.read.jdbc(url=jdbc_url, table="(SELECT data FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL, 'include-xids', 'true', 'include-timestamp', 'true')) as cdc", properties=jdbc_properties) cdc_array = cdc_df.collect() for i in cdc_array: print(i) changedData = json.loads(i['data'])['change'][0] print(changedData) schema = changedData['schema'] table = changedData['table'] DMLtype = changedData['kind'] if DMLtype == "insert" or DMLtype == "update": column_names = changedData['columnnames'] column_values = changedData['columnvalues'] source_data = {col: [val] for col, val in zip(column_names, column_values)} print(source_data) change_df = spark.createDataFrame(pd.DataFrame(source_data)) if DMLtype == "insert": change_df.write.format("delta").mode("append").save(delta_table_path) if DMLtype == "update": old_keys = changedData['oldkeys'] condition = " AND ".join( [f"target.{key} = source.{key}" for key in old_keys['keynames']] ) print(condition) delta_table.alias("target").merge( change_df.alias("source"), condition ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() if DMLtype == "delete": condition = " AND ".join([ f"{key} = '{value}'" for key, value in zip(changedData["oldkeys"]["keynames"], changedData["oldkeys"]["keyvalues"]) ]) delta_table.delete(condition) end loop
Conclusion
In conclusion, building the Bronze layer of the Medallion Architecture using wal2json from PostgreSQL as the source to Fabric OneLake provides a robust and scalable approach for handling raw data ingestion. This setup leverages PostgreSQL's logical replication capabilities to capture and stream changes in real-time, ensuring that the data lake remains up-to-date with the latest transactional data.
Implementing this architecture ensures that the foundational layer is well-structured and becomes a solid layer for next layers while also supporting real-time analytics, advanced data processing and AI applications.
By adopting this strategy, organizations can achieve greater data consistency, reduce latency in data processing, and enhance the overall efficiency of their data pipelines.
References
Implement medallion lakehouse architecture in Fabric - Microsoft Fabric
What is the medallion lakehouse architecture? - Azure Databricks
Eventhouse OneLake Availability is Generally Available | Microsoft Fabric Blog | Microsoft Fabric
Lakehouse and Delta tables - Microsoft Fabric
Feedback and suggestions
If you have feedback or suggestions for improving this data migration asset, please send an email to Database Platform Engineering Team.
Continue reading...