Jump to content

Structured streaming in Synapse Spark


Recommended Posts

Guest ryanjadams
Posted

mediumvv2px400.png.978eb6646fee75292281c21c42088e98.png

 

Author: Ryan Adams is a Program Manager in Azure Synapse Customer Success Engineering (CSE) team.

 

 

 

In this post we are going to look at an example of streaming IoT temperature data in Synapse Spark. I have an IoT device that will stream temperature data from two sensors to IoT hub. We’ll use Synapse Spark to process the data, and finally write the output to persistent storage. Here is what our architecture will look like:

 

 

 

largevv2px999.png.f4d217543daa996bfa70802d258535a1.png

 

 

 

 

 

Prerequisites

 

 

Our focus for this post is on streaming the data in Synapse Spark so having a device to send data and the setup of IoT Hub are considered prerequisites. Their setup will not be covered in this article.

 

 

 

Step 1 – Create A New Notebook

 

 

The first thing you need to do is create a new notebook in Synapse, initialize the variables, and do some setup in the first cell. The setup code is provided below. Go to the Develop Hub and click the ellipsis that appears when you hover your cursor over the Notebooks heading. That will display a drop-down menu where you can select “New notebook”.

 

 

 

mediumvv2px400.png.78214aaf6bcd8474e79fc93ac547e40b.png

 

 

 

Step 2 – Setup and configuration

 

 

The code below has comments to help you fill everything in with the service names and keys of your own services. You need to setup your source by providing the name of your IoT or Event Hub along with the connection string and consumer group (if you have one). Your destination will be Data Lake, so you need to supply the container and folder path where you want to land the streaming data.

 

 

 

There are two pieces of sensitive information that you do not want to expose in plain text, so you’ll store the key for the storage account and the Event Hub connection string in Azure Key Vault. You can easily call these using mssparkutils once you create a linked service to your Key Vault.

 

 

 

 

 

 

 

 

 

 

 

# Initialize Variables

storage_account = 'YourStorageAccountName' # This is the storage account where we will write out our data stream

event_hub = 'YourEventHubName' # This the event hub where we will grab our stream from

consumer_group = 'YourCGNAme' #This is our event hub consumer group in case we add more consumers later

key_vault = 'YourKeyVault' #This is the name of our key valut where we will store our event hub connection string and storage account secret

 

# Setup access to storage account using Key Vault Secret. Last parameter is the secret name

spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", mssparkutils.credentials.getSecret(key_vault,"YourKVSecretName"))

 

# Setup storage locations for all data

ROOT_PATH = f"abfss://YourContainerName@{storage_account}.dfs.core.windows.net/" # Put your container before the @ sign

BRONZE_PATH = ROOT_PATH + "bronze/" #Folder where we land our raw data stream

CHECKPOINT_PATH = ROOT_PATH + "checkpoints/" #Folder to store our checkpoints in case the stream gets broken

 

# Get Event Hub Connection String from Key Vault

IOT_CS = mssparkutils.credentials.getSecret(key_vault,'YourHubConnectionStringSecretName') # IoT Hub connection string (Event Hub Compatible)

#print(IOT_CS)

 

# Setup Event Hub Dictionary with Config Settings

ehConf = { 

  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS),

  'ehName':event_hub,

  'eventhubs.consumerGroup':consumer_group

}

 

 

 

 

 

 

 

 

 

 

 

 

 

Step 3 - Start fresh and wipe out ADLS

 

 

In case you want to run this action multiple times, you can delete everything in your ADLS container for a fresh start.

 

 

 

 

 

 

 

 

 

 

 

# Make sure root path is empty

mssparkutils.fs.rm(ROOT_PATH, True)

 

 

 

 

 

 

 

 

 

 

 

Step 4 - Start streaming the data

 

 

Now that you have everything setup and configured it’s time to read the data stream and then write it out to ADLS. First you create a data frame that uses the Event Hub format and our Event Hub Configuration created in Step 1.

 

 

 

The next part gets a little tricky because the entire JSON payload of the stream is stored in a single column called “body”. To handle this, we create the schema we intend to use when landing the data in a final data frame. Last, we write the data out to ADLS in parquet format.

 

 

 

 

 

 

 

 

 

 

 

from pyspark.sql.functions import *

from pyspark.sql.types import *

 

# Create a DF and load it from the EH stream

df = spark.readStream.format("eventhubs").options(**ehConf).load()

 

# The stream stores the JSON payload in a single "body" column.

# We need a schema to match and extract the JSON

Schema1 = StructType([structField("temp1", StringType(), True),

                      StructField("temp2", StringType(), True)

                    ])

 

# Extract the JSON and enqueued time

dfRaw = df.select(df.body.cast('string'), df.enqueuedTime.alias('ArrivalTime'))

dfJson = dfRaw.select(from_json(dfRaw.body, schema=Schema1).alias("json"), dfRaw.ArrivalTime)

dfFinal = dfJson.select("json.*", dfJson.ArrivalTime)

 

# Write the stream to ADLS

# We have also enabled checkpoints so if the stream fails we can pick right where we left off

dfFinal.writeStream.format("parquet").outputMode("append").option("checkpointLocation", CHECKPOINT_PATH + "temperature_raw").start(BRONZE_PATH + "temperature_raw")

 

 

 

 

 

 

 

 

 

 

 

 

 

Step 5 - Read the data streamed to ADLS with Spark Pool

 

 

Now that we have the data streaming to ADLS in parquet format, we are going to want to read it and validate the output. You could use the Azure portal or Azure Storage Explorer, but it would be much easier to do it right here using Spark in the same notebook.

 

 

 

This part is super easy! We simply configure a few variables for connecting to our ADLS account, read the data into a data frame, and then display the data frame.

 

 

 

 

 

 

 

 

 

 

 

from pyspark.sql import SparkSession

from pyspark.sql.types import *

 

# Primary storage info

account_name = 'YourStorageAccountName' # fill in your primary account name

container_name = 'YourContainerName' # fill in your container name

relative_path = 'bronze/temperature_raw/'  # fill in your relative folder path

 

adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)

print('Primary storage account path: ' + adls_path)

 

df_parquet = spark.read.parquet(adls_path)

display(df_parquet)

 

 

 

 

 

 

 

 

 

 

 

 

 

Conclusion

 

 

Spark Pools in Azure Synapse support Spark structured streaming so you can stream data right in your Synapse workspace where you can also handle all your other data streams. This makes managing your data estate much easier. You also have the option of four different analytics engines to suit various use-cases or user personas.

 

 

 

Our team publishes blog(s) regularly and you can find all these blogs here: Azure Synapse Analytics Blog

 

For deeper level understanding of Synapse implementation best practices, check out our Success by Design (SBD) site at Success by design - Azure Synapse Analytics.

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...