Guest Timothy_Brown Posted December 20, 2022 Posted December 20, 2022 Introduction I recently worked with a customer to develop a demo that showcased an event-driven architecture using multiple Azure technologies that supported a use case where point-of-sale data would be streaming and needed to be transformed to support an API. Additionally, complex transformations needed to be considered and Databricks was required. One of the requirements was to compare multiple streaming and transformation approaches which culminated in Azure Data Explorer (ADX). I developed a two-path demo that shows data streaming through an Event Hub into both ADX directly and Databricks. The end result was the same table layout for both approaches. Data Set To simulate inbound events, I created a simulator in Databricks which uses New York City taxi trip data as its source. This data is readily available and can be scaled to millions of events. It also offers easy options for partitioning and demonstrates well. Data used was from the year 2022 for green and yellow taxis and can be found at TLC Trip Record Data - TLC (nyc.gov). I copied all the parquet files into a container called nytlc->parquet->tripdata->yellow and green. Architecture At first glance, this architecture overlaps technology and many transformations can be accomplished using Azure Data Explorer or Databricks. This is understood. I was tasked with demonstrating through comparison how events could be processed using both techniques allowing the end user to determine which approach worked best based on their requirements. Essentially, which skill sets they have and how complex the transformations are. The simulator was developed in Databricks and consumes New York City Taxi trip data stored in Azure Data Lake sending events to an Azure Event Hub for downstream processing. Approach one demonstrates how Azure Data Explorer consumes events directly from Azure Event Hub and stores them in a database where ADX functions split the data into yellow and green tables. Approach two leverages Databricks which consumes the same simulator events on a different Event Hub consumer group transforming those events into multiple dataframes for persistence in the delta format on Azure Data Lake and also in Azure Data Explorer in three tables ( AllTripData, yellow, and green). This architecture is depicted below. Configuration This blog is not intended to dive into specifics on setting up each piece of technology, but to show case how they interact with each other. However, links are provided as applicable. Create an Azure Event Hub Follow the steps in Azure Quickstart - Create an event hub using the Azure portal - Azure Event Hubs | Microsoft Learn to create an Event Hub. I used the defaults for the namespace. The Event Hub was created with 8 partitions and three Consumer Groups: Databricks-reader Databricks-simulator ADX-reader These consumer groups can be called by any name and is a best practice to allow each consumer to have its own “view” of the event data and process it at its own pace as described at Consumer Groups. Create a Databricks Environment Follow the guidance in Get started: Free trial & setup - Azure Databricks | Microsoft Learn to setup a Databricks environment. I created a single user cluster as shown below. Libraries required: Maven: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22 azure-event-hubs-spark/structured-streaming-pyspark.md at master · Azure/azure-event-hubs-spark (github.com) Jar: kusto_spark_3_0_2_12_3_1_6_jar_with_dependencies.jar Release v3.0_3.1.6 · Azure/azure-kusto-spark (github.com) Library Links: Using 3rd Party Libraries in Databricks: Apache Spark Packages and Maven Libraries - The Databricks Blog Workspace libraries - Azure Databricks | Microsoft Learn Notebooks: nycExecSim nycInboundEvents nycUtilities There is documentation inside each notebook where applicable. You can import the BlogCode.dbc file. Review each notebook and make changes where applicable. As an example, in the nycUtilities notebook, add your credentials in cmd 3: clientId = <ADD CLIENT ID> clientSecret = <ADD CLIENT SECRET> tenantId = <ADD TENANT ID> Configure Azure Storage Follow the guidance at Create a storage account - Azure Storage | Microsoft Learn to create a storage account along with Azure Data Lake Storage Gen2 Introduction | Microsoft Learn for information. I used two different containers in the same storage account: nytlc streaming However, it does not matter if you use a single container with multiple folders – just define them correctly in the notebooks. I used the nytlc container for the nyc taxi parquet source data and the streaming container for all notebook output. Note: Make sure to give your service principal appropriate access such as Storage Blob Data Contributor. Create Azure Data Explorer Follow the guidance at Quickstart: Create an Azure Data Explorer cluster and database | Microsoft Learn to create an Azure Data Explorer cluster. I like to use Azure Data Explorer for the queries. Create two databases: TripDataDatabricksSource TripDataEventHubSource Both databases have an unlimited retention period and cache period of 31 days although these do not affect the demo. Create the table [ AllTripData ] in both databases using the following command: .create table AllTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string) Update Policies Modify the update policies for both databases using the following commands: .alter table AllTripData policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:02","MaximumNumberOfItems": "100","MaximumRawDataSizeMB": "300"}' .alter table AllTripData policy streamingingestion '{"IsEnabled": true}' TripDataEventHubSource The next series of commands are only applicable for the TripDataEventHubSource. Create a data connection to ingest events from an Event Hub. Azure Data Explorer Table Creation Create the yellow and green trip data tables: .create table YellowTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string) .create table GreenTripData (VendorID: int, tpep_pickup_datetime: datetime, tpep_dropoff_datetime: datetime, passenger_count: int, trip_distance: real, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: real, extra: real, mta_tax: real, tip_amount: real, tolls_amount: real, improvement_surcharge: real, total_amount: real, congestion_surcharge: real, airport_fee: int, TaxiType: string, lpep_pickup_datetime: datetime, lpep_dropoff_datetime: datetime, ehail_fee: int ,trip_type: string) Function Creation Create two functions: .create-or-alter function with (docstring = 'Append Records where TaxiType = Yellow' , folder = 'UpdatePolicyFunctions') CopyYellowTaxiData() { AllTripData | where TaxiType == "Yellow" } .create-or-alter function with (docstring = 'Append Records where TaxiType = Green' , folder = 'UpdatePolicyFunctions') CopyGreenTaxiData() { AllTripData | where TaxiType == "Green" } Update policies Alter the update policies for the yellow and green tables: .alter table YellowTripData policy update @'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyYellowTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]' .alter table GreenTripData policy update @'[{ "IsEnabled": true, "Source": "AllTripData", "Query": "CopyGreenTaxiData()", "IsTransactional": false, "PropagateIngestionProperties": false}]' Demo Execution Now that everything is configured and you have added your credential information to the notebooks, start the Databricks cluster and execute the nycExecSim notebook to begin sending events to the Event Hub. You should begin to see messages in the overview of the Event Hub: Execute the nycInboundEvents notebook to begin processing events consumed from the Event Hub. Head on over to the Azure Data Explorer UX at Azure Data Explorer and execute this command on the TripDataEventHubSource database. AllTripData | count You should begin to see the record count increase. Execute these commands as well: YellowTripData | count GreenTripData | count Again, you should see the record count increase. To view the records, execute the following command: select * from YellowTripData Continue reading... Quote
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.