Jump to content

Process Azure Event Hubs data using Azure Data Factory Mapping Data Flows


Recommended Posts

Guest Inder Rana
Posted

Background

 

 

The purpose of this blog post is to share an architectural pattern which I worked on with a customer team to process Azure Event Hubs data using Azure Data Factory Mapping Data Flows. Its more common to process Azure Event Hubs Streams using one of the Stream processing services like Azure Stream Analytics, Azure Functions or Apache Spark with Azure Databricks but using Azure Data Factory in more of a batch fashion is a perfectly valid pattern for certain use cases so thought it would a good idea to document for the benefit broader community.

 

 

 

What will you learn?

 

 

I will share my thoughts on what use cases would be a good fit for this architectural pattern as well as point out a few technical details (which are often overlooked) for implementing this pattern correctly.

 

 

 

Note: I am using the name Azure Data Factory in this blog post but it is applicable to Synapse Pipelines as well.

 

 

 

Reference Architecture

 

 

The following diagram shows the reference architecture and two main things to highlight in the architecture:

 

  1. Event Hub Capture feature is used to automatically save data to ADLS Gen2. This is the easiest method to load data into ADLS Gen2 from Azure Event Hubs because it is simple configuration on Azure Event Hubs (time frequency and volume of data) and does not require any coding. You can read more on Azure Event Hubs Capture feature in the public docs - Azure Event Hubs Capture Overview and Azure Event Hubs Capture Quickstart.
  2. Azure Data Factory Mapping Data Flow pipeline is triggered on schedule to process the data from Storage and load to Azure Synapse SQL Dedicated Pool as well as ADLS Gen2 in parquet format. The Mapping Data Flows capability in Azure Data Factory is used to build no-code transformations which under the hood translate to Apache Spark hence very scalable.

 

 

 

largevv2px999.jpg.bd3b6087217cba6a7085edaa3bf53720.jpg

 

 

 

Use Cases - When is this a good fit?

 

 

The basic premise is that it might make sense for producers to write to Azure Event Hubs but not every consumer needs to consume that data in near-real time. Following are some of the reasons you might choose to adopt this pattern over others:

 

  • Cost Savings - In cloud environments you are paying for usage so if the cost of running a Stream Processing engine 24 X 7 is much more than the value drawn from that near real time processing you might want to consider this architectural pattern to save costs. In lots of reporting use cases it might just be fine to run a pipeline to process new data once every hour, even when Streaming engine is used to process data from Event Hubs I have observed that Streaming job is run on a schedule rather than continuously to save costs so not something really new.
  • Code Free Implementation - The code free implementation method of Azure Data Factory and lack of experience in streaming technologies for technical teams where there is very little value to draw from near real time processing could be another reason to choose this pattern.
  • Multiple Consumers - Lots of times in organization there are multiple consumers for same Azure Event Hub stream and some of them really need to process in near real time and should not be held back to non-stream solutions because a subset of consumers don't have that need.

  • Future Proofing Solution - You could be planning for near future where you want to modernize to streaming patterns but that's not the primary goal and you are trying to meet some immediate timelines (real world budget and timeline constraints), going this way you will be future proofing yourself so that not everything needs to be thrown away in the next iteration of your release.

 

There could be many different factors to decide the scheduled frequency for Azure Data Factory and it can be run every few minutes but my usual guidance would be to use this pattern when you can afford to run every hour or every few hours, maybe even every 15 minutes but not less than that.

 

Implementation Details

 

 

The following diagram shows the sample Mapping Data Flow I have implemented where source is ADLS Gen2 with data from Azure Event Hubs Capture and there are two sinks for processed data - Synapse SQL Dedicated Pool and Parquet in ADLS Gen2. The reason to have two sinks is to demonstrate some important concepts often overlooked but are essential for a correct implementation.

 

 

 

largevv2px999.jpg.d147b363d8cf52b73bcd091588b1af7b.jpg

 

 

 

Summary of the steps executed in the Data Flow:

 

  • Define Source
  • Parse the Body Column which has the real data
  • First sink is Azure Synapse SQL Dedicated Pool, a few transformations are implemented on this path before data is saved to SQL Dedicated Pool
  • Second sink is ADLS Gen2 where data is saved in parquet format

 

 

 

With overview of the Data Flow implementation out of the way let's get into some details.

 

 

 

Identifying what is new since last execution of Pipeline and needs to be processed

 

When batch processes are run on schedule there needs to be a way to identify what is new and needs to be processed since last execution of the processing pipeline. Mapping Data Flows make this really easy for ADLS Gen2 Storage Connector, there are two methods for this in the Source Settings:

1. After Completion Setting - If you prefer to move the processed files to archive folder you can select Move for the setting (deletion is also an option). In this method Azure Event Hub landing folder in Storage serves as a temporary staging area for ADF Pipeline to pick new files for processing

2. Change Data Capture - If you don't care about the processed files to be deleted or moved you can enable Change Data Capture setting and in this case pointer to what's new for processing is automatically handled by service.

 

 

 

In my sample implementation (1) above was used and the relevant settings for Source are shown in the screenshots below.

 

largevv2px999.thumb.jpg.78004845465b7df5c21056a3b19bde7a.jpg

 

 

 

 

 

Duplicate and Upsert Handling

 

Azure Event Hubs at-least once delivery characteristic is very often overlooked, in simpler worlds this means Azure Event Hub Capture might persist same event (or message) to ADLS Gen2 multiple times hence leading to duplicates in the final sink if the processing logic doesn't handle the scenario appropriately. I have run into situations in the past where such issues get bubbled up as bugs with Azure Event Hubs but that's not the case at all as it is clearly documented and expected behavior for the ingestion service. It is the responsibility of the processing engine to have an idempotent implementation to handle duplicates appropriately. The sample Data Flow used for reference in this blog post has two sinks - one for Synapse SQL and the other one for ADLS Gen2 (parquet files). The additional transformations on the Synapse SQL sink path ensures that there are no duplicates in the sink but the ADLS Gen2 sink has duplicates. You can read more on the Idempotency and at-least once delivery characteristics on the following article - Resilient design guidance for Event Hubs and Functions - Azure Architecture Center | Microsoft Learn (it refers to Azure Functions but the same concepts carry over).

 

 

 

Mapping Data Flows makes it pretty easy to remove duplicates using Aggregate Transformation provided there is a key column in the data which can be used as identifier for records.

 

 

 

largevv2px999.jpg.6fe93e19b080dc674e9060bc53490c58.jpglargevv2px999.thumb.jpg.7cf93460f6da300191c5739d6b8387db.jpg

 

 

 

Lastly, it would also be a good idea to have Upsert implemented for the loads to Azure Synapse SQL Pool in case same record is processed again or maybe update records are also pushed through Azure Event Hubs. The AlterRow Transformation activity can be used with Synapse SQL Pool to enable Upserts provided a key column is available in the data to identify rows. The implementation requires AlterRow transformation is added before Synapse SQL Sink. The following screenshots show the settings for both AlterRow and Synapse SQL Sink.

 

 

 

largevv2px999.jpg.f81da61f6b8b57f908409949406e2dec.jpglargevv2px999.thumb.jpg.0ece05b7bfeb597adee3f0930c8ab65f.jpg

 

 

 

Note: Enabling the Staging is another recommended setting for optimizing for performance of loads to Synapse SQL Pool - Sink performance and best practices in mapping data flow - Azure Data Factory & Azure Synapse | Microsoft Learn

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...