Jump to content

Ingest streaming data using Azure Event Hub and Azure Databricks


Recommended Posts

Guest Timothy_Brown
Posted

Introduction

 

 

 

 

I recently worked with a customer to develop a demo that showcased an event-driven architecture using multiple Azure technologies that supported a use case where point-of-sale data would be streaming and needed to be transformed to support an API. Additionally, complex transformations needed to be considered and Databricks was required.

 

 

 

One of the requirements was to compare multiple streaming and transformation approaches which culminated in Azure Data Explorer (ADX). I developed a two-path demo that shows data streaming through an Event Hub into both ADX directly and Databricks. The end result was the same table layout for both approaches.

 

 

 

Data Set

 

 

 

 

To simulate inbound events, I created a simulator in Databricks which uses New York City taxi trip data as its source. This data is readily available and can be scaled to millions of events. It also offers easy options for partitioning and demonstrates well. Data used was from the year 2022 for green and yellow taxis and can be found at TLC Trip Record Data - TLC (nyc.gov). I copied all the parquet files into a container called nytlc->parquet->tripdata->yellow and green.

 

largevv2px999.png.65e98444e846fcf634b00fcd76ab8342.png

 

 

 

Architecture

 

 

 

 

At first glance, this architecture overlaps technology and many transformations can be accomplished using Azure Data Explorer or Databricks. This is understood. I was tasked with demonstrating through comparison how events could be processed using both techniques allowing the end user to determine which approach worked best based on their requirements. Essentially, which skill sets they have and how complex the transformations are.

 

 

 

The simulator was developed in Databricks and consumes New York City Taxi trip data stored in Azure Data Lake sending events to an Azure Event Hub for downstream processing. Approach one demonstrates how Azure Data Explorer consumes events directly from Azure Event Hub and stores them in a database where ADX functions split the data into yellow and green tables.

 

Approach two leverages Databricks which consumes the same simulator events on a different Event Hub consumer group transforming those events into multiple dataframes for persistence in the delta format on Azure Data Lake and also in Azure Data Explorer in three tables ( AllTripData, yellow, and green). This architecture is depicted below.

 

 

 

largevv2px999.png.28e9bb290f663d7fc0827fd0d3d8a7cc.png

 

 

 

Configuration

 

 

 

 

This blog is not intended to dive into specifics on setting up each piece of technology, but to show case how they interact with each other. However, links are provided as applicable.

 

 

 

Create an Azure Event Hub

 

 

 

 

Follow the steps in Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Learn to create an Event Hub. I used the defaults for the namespace. The Event Hub was created with 8 partitions and three Consumer Groups:

 

  • Databricks-reader
  • Databricks-simulator
  • ADX-reader

 

 

 

These consumer groups can be called by any name and is a best practice to allow each consumer to have its own “view” of the event data and process it at its own pace as described at Consumer Groups.

 

 

 

Create a Databricks Environment

 

 

 

 

Follow the guidance in Get started: Free trial & setup - Azure Databricks | Microsoft Learn to setup a Databricks environment. I created a single user cluster as shown below.

 

 

 

largevv2px999.thumb.png.5f0323f3308bea947c436b5af7aac0dd.png

 

 

 

Libraries required:

 

Library Links:

 

 

 

 

Notebooks:

 

  • nycExecSim
  • nycInboundEvents
  • nycUtilities

 

There is documentation inside each notebook where applicable. You can import the BlogCode.dbc file. Review each notebook and make changes where applicable. As an example, in the nycUtilities notebook, add your credentials in cmd 3:

 

 

 

 

 

 

 

 

 

clientId = <ADD CLIENT ID>

clientSecret = <ADD CLIENT SECRET>

tenantId = <ADD TENANT ID>

 

 

 

 

 

 

 

 

 

Configure Azure Storage

 

 

 

 

Follow the guidance at Create a storage account - Azure Storage | Microsoft Learn to create a storage account along with Azure Data Lake Storage Gen2 Introduction | Microsoft Learn for information.

 

 

 

I used two different containers in the same storage account:

 

  • nytlc
  • streaming

 

 

 

However, it does not matter if you use a single container with multiple folders – just define them correctly in the notebooks. I used the nytlc container for the nyc taxi parquet source data and the streaming container for all notebook output.

 

Note: Make sure to give your service principal appropriate access such as Storage Blob Data Contributor.

 

 

 

Create Azure Data Explorer

 

 

 

 

Follow the guidance at Quickstart: Create an Azure Data Explorer cluster and database | Microsoft Learn to create an Azure Data Explorer cluster.

 

I like to use Azure Data Explorer for the queries.

 

 

 

Create two databases:

 

  • TripDataDatabricksSource
  • TripDataEventHubSource

 

Both databases have an unlimited retention period and cache period of 31 days although these do not affect the demo.

 

 

 

Create the table [ AllTripData ] in both databases using the following command:

 

 

 

 

 

 

 

.create table AllTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

 

 

 

Update Policies

 

 

 

 

Modify the update policies for both databases using the following commands:

 

 

 

 

 

 

 

.alter table AllTripData policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:02","MaximumNumberOfItems": "100","MaximumRawDataSizeMB": "300"}'

 

.alter table AllTripData policy streamingingestion

'{"IsEnabled": true}'

 

 

 

 

 

 

 

TripDataEventHubSource

 

 

 

 

The next series of commands are only applicable for the TripDataEventHubSource.

 

 

 

Create a data connection to ingest events from an Event Hub. Azure Data Explorer

 

 

 

largevv2px999.thumb.png.b9875535f69e5ec56cbe778e904311f2.png

 

 

 

 

 

Table Creation

 

 

 

 

Create the yellow and green trip data tables:

 

 

 

 

 

 

 

.create table YellowTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

 

 

 

 

 

 

 

 

 

.create table GreenTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string)

 

 

 

 

 

 

Function Creation

 

 

Create two functions:

 

 

 

 

 

 

 

 

 

.create-or-alter function with (docstring = 'Append Records where TaxiType = Yellow' , folder = 'UpdatePolicyFunctions')

 

CopyYellowTaxiData()

 

{

 

AllTripData

 

| where TaxiType == "Yellow"

 

}

 

 

 

 

 

 

 

 

 

 

 

.create-or-alter function with (docstring = 'Append Records where TaxiType = Green' , folder = 'UpdatePolicyFunctions')

 

CopyGreenTaxiData()

 

{

 

AllTripData

 

| where TaxiType == "Green"

 

}

 

 

 

 

 

 

Update policies

 

 

Alter the update policies for the yellow and green tables:

 

 

 

 

 

 

 

 

 

.alter table YellowTripData policy update

@'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyYellowTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

 

 

 

 

 

 

 

 

 

 

.alter table GreenTripData policy update

@'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyGreenTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]'

 

 

 

 

 

Demo Execution

 

 

 

 

Now that everything is configured and you have added your credential information to the notebooks, start the Databricks cluster and execute the nycExecSim notebook to begin sending events to the Event Hub. You should begin to see messages in the overview of the

 

Event Hub:

 

 

 

largevv2px999.png.d360619ea84a92508a7a1c99934f4760.png

 

 

 

Execute the nycInboundEvents notebook to begin processing events consumed from the Event Hub.

 

 

 

Head on over to the Azure Data Explorer UX at Azure Data Explorer and execute this command on the TripDataEventHubSource database.

 

 

 

 

 

 

 

AllTripData

| count

 

 

 

 

 

 

 

 

You should begin to see the record count increase.

 

 

 

Execute these commands as well:

 

 

 

 

 

 

 

YellowTripData

| count

 

GreenTripData

| count

 

 

 

 

 

Again, you should see the record count increase.

 

 

 

To view the records, execute the following command:

 

 

 

 

 

 

 

select * from YellowTripData

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...