Guest PrabhKaur Posted April 7, 2023 Posted April 7, 2023 This blog is co-authored by Abhinav Singh Sr. Data Engineer (Cognizant) and Channa Wijesinghe Data & AI Capability Lead (Cognizant). This blog post aims to provide you with insights into the best practices for optimizing performance in data ingestion and transformation. You will learn how to efficiently use available resources, compute capacity, and workload management in Azure Synapse Analytics workspace. While the solution discussed in this blog pertains to a healthcare industry customer, the optimization techniques presented here are applicable to other industries as well. The scenario A new Healthcare mandate for ‘Transparency in Coverage’ rule states that “Healthcare Plan Providers and Issuers needed to post pricing information for covered items and services in a publicly accessible place.” Health plan price transparency helps consumers know the cost of a covered item or service before receiving care, which can be used by third parties, such as researchers and app developers to help consumers understand the costs associated with their health care. Providers and Issuers have enormous amounts of data in machine-readable files (MRF) for their covered services. To stay compliant with the mandate, they were required to process this data and generate extracts ready to be posted on a website. A solution was built on-prem to meet the requirement for the Healthcare mandate. Several challenges were faced to scale computational requirements for processing terabytes of data. To mitigate challenges with on-prem solutions, the following objectives were identified: A modernized and scalable solution that is expandable in terms of storage and compute capacity with minimal to no human intervention. Design an infrastructure to manage large amounts of data (40-50 terabytes) processing. Optimized performance to complete the end-to-end enrichment process within hours. Solution components To ensure scalability, reduced cost and reduced time in infrastructure provisioning and management, migration of an on-premises solution to Azure was proposed. Azure Synapse Analytics was the platform of choice that helped us to build an autonomous, well-integrated, and one-stop-shop solution meeting scalable compute capacity with following out of the box features: Azure Data Lake Storage (Gen2) – to store Terabytes of MRF JSON files in storage blob containers that were ready for data enrichment. Synapse Pipelines – to orchestrate data movement and transformation activities such as executing business rules and copying data along with effortless connectivity using linked services. Apache Spark Pool –using Spark as the underlying execution engine for blazing fast, in-memory computing helped us validate MRF files that would otherwise take hours to read, load, and validate using String functions like IsJson() in SQL. Dedicated SQL Pool – to leverage features such as flexible and scalable compute capacity as needed and increase the size of tempdb storage and cache required to run large queries. The below diagram illustrates the reference architecture of Azure Synapse Analytics implementation to extract, load, transform, and enrich MRF Files: End-to-end process flow Extract Source data from the on-prem Oracle database and load it into SQL DB installed in Azure Virtual Machine (Azure SQL VM). MRF utility tool generates a large number of JSON files using this data. Copy JSON Files from an Azure File Share (generated by the MRF utility tool) to Azure Data Lake Gen2. Validate, flatten, parse JSON files, and load into Synapse Dedicated SQL Pool Extract enriched data from Azure SQL VM and load it into Synapse Dedicated SQL Pool Perform data preparation to join raw JSON data and raw enriched data before applying enrichment rules in Stage tables in Dedicated SQL Pool Execute enrichment rules on the staged data layer and load enriched data into Curated tables. Utilize Curated table data to generate final extract files and reports. Compress extract files before posting on publicly accessible websites. Once the solution was deployed, intensive performance testing was done. Results indicated a significant performance decrease as soon as the magnitude of MRF data was increased from gigabytes (GBs) to Terabytes (TBs). Challenges were identified during the large volume of data Ingestion (of flattened JSON files into a dedicated SQL pool), and data enrichment process. Optimization techniques Following are the techniques implemented to optimize the data loading and transformation process: Workload Management using Resource Class (RC): Under the Synapse workspace monitoring section, metrics for CPU utilization, tempdb storage consumption, and memory utilization were analyzed. Metrics results indicated only 25% utilization of overall available resources. By default, in Dedicated SQL Pool, the allocated RC to the user account is “smallRc” which allows 25% resource utilization to the request. An “xLargeRc” workload user account was created to utilize 70% of available resources. Check this out to learn more about Workload Management using Resource Classes. Database optimization Using Heap vs. CCI table: Initially, a large volume of data was ingested from ADLS into the database tables using Synapse Pipelines. By default, tables are created as CCI (Clustered Columnstore Indexes) tables in a dedicated SQL pool. In CCI tables, data is indexed first and then ingested, which causes data shuffling for proper indexing of ingested data across different distributed nodes. Data shuffling leads to higher usage of overall available resources such as compute capacity and tempdb storage. Loading raw data into Heap tables was proven to be an effective approach in improving resource consumption and reducing processing time from 11 hours to 2 hours. For further data processing and transformations, CTAS was used to create CCI tables off these heap tables with de-duplicated and selective columns only. This change improved performance by reducing processing time from 16 hours to 2 hours and was very cost-effective. Read more about Heap and CCI tables. Splitting data into multiple tables: Originally, raw data from parsed JSON files was loaded into a single table in the dedicated pool’s database. Ingesting flattened JSON files into a single table with many columns was time-consuming. These files had elements nested up to 4 objects per array. Each data point generated a large number of columns along with billions of records for cross-referential integrity with its parent object. To address the issue, 4 separate Heap tables were created to ingest data for each JSON object separately. These tables were linked by using foreign key constraints. This helped to reduce data redundancies and the volume of data. Data distribution column: Having the right distribution column is vital to avoid data movement operations like Shuffle. Uneven data distribution across nodes causes data shuffling by incompatible joins or Case Statements. To balance the parallel processing, select a distribution column that: (a) has many unique values (b) does not have NULLs or has fewer NULLs © is not a date column To mitigate uneven data distribution, a new derived column was created as a distribution column for the tables that were missing it. This helped to distribute the data evenly across all 60 nodes, thereby increasing query performance. Learn more about Distributed tables. CTAS vs Truncate and Select Into: As a result of ‘Truncate and Select Into’, by default, a CCI with a Round-Robin distribution table is created which is not always optimal when dealing with large volumes of data. CCI tables were causing tempDB to balloon up due to uneven data distribution and data shuffling. CTAS provides full control over the table distribution key and organization of the table. Once CTAS was used consistently to load fresh data, it stopped ballooning up the tempDB storage to keep the pool from crashing, and performance was enhanced. Table statistics: Query optimizer utilizes table stats to choose the plan with the lowest cost. Automatic Statistics can be enabled which then is used to generate the most optimized query plans. However, the creation of stats could miss key columns that are used in transformations and join. Manual creation of column statistics that are actively used in Joins ensures the best query optimization. Statistics were created on all CTAS tables after creating and inserting data along with updating statistics on CCI tables on JOIN columns. This helped optimize data processing time further for the enrichment process. Learn more about Create - Update Stats. Table indexes in Heap tables: Heap tables aren’t recommended for transformations. However, there were certain scenarios where Heap tables were joined to create a new CCI staging table. In such cases, non-clustered indexes were created on joined columns. It’s important to understand that CCI tables are already indexed and don’t require indexing (the index can be rebuilt if necessary). Clustered or non-clustered indexes are only created for Heap Tables. Learn more about Indexes on dedicated SQL pool tables. ELT optimization Removing unnecessary columns: Instead of performing “Joins” on large Heap tables, CCI tables were created with only columns required for further processing of those tables. CTAS technique was used with well-chosen distributed key(s), availing limited transaction logging, reducing data volume with fewer columns across billions of records Removing unnecessary rows: Duplicate records were removed on both sides of tables before joining. There are various ways to filter duplicate records using Windowing functions such as Rank(), Dense_Rank(), and Row_Number() depending on the dataset. After intensive data analysis, different methods were used to filter datasets. Custom batches: A batch process was implemented to divide and distribute data to be ingested into the raw data layers in batches. Billions of records produced from the flattening of JSON files were evenly distributed by implementing a logic to divide the total count of records into batches. A control table was created to have the count of records to be ingested per batch processed in sequence. For example: if there were 1 billion records to be ingested, 10 batch rows in the control table would process 10 million rows per batch and 10 batches would get processed one-by-one loading all 1 billion records into the target table. This custom batch process helped to use the distributed data infrastructure without exponentially expanding the compute resources and keeping the costs minimal. Avoid calling sub-queries: Sub-queries in SQL Stored Procedures utilize tempDB storage for calculations that leads to the ballooning of tembDB and crashing the SQL pool. We reviewed the business rules for the enrichment and removed all possible sub-queries from Stored Procedures. Results After implementing key performance optimization techniques for Synapse dedicated pool, we reduced total processing time from 59 hours to 5 hours i.e. ~12 times overall performance improvement. The total operational time for the end-to-end process was reduced from 65 hours to 8.5 hours. See performance comparison statistics below: Performance comparison before and after implementing optimization techniques: S.No. Activities Data Volume Run 1 (before optimization) Run 2 (after optimization) 1 Extract Source data from the on-prem Oracle database and load into Azure SQL Virtual Machine 713 tables 1 hour 26 mins 2 Validate, flatten, parse JSON files, and load into Synapse Dedicated SQL Pool (Raw) 3 TB 11 hrs 2 hrs 3 Extract enrichment data from Azure SQL VM and load into Synapse Dedicated SQL Pool (Raw) 31 tables 16 mins 15 mins 4 Data preparation (Staging) 1.6 Billion records 16 hrs 21 mins 2 hrs 21 mins 5 Enrichment rules (Curated) 127 billion records 31 hrs 48 mins 45 mins 6 Generate final extract (and compress) 70 GB 5 hrs 2 hours 38 mins Conclusion By choosing Azure Synapse Analytics, you can implement a modern, scalable, and flexible solution that acts as a one-stop-shop for all your data processing needs. To handle the enormous amounts of data load, it's important to understand the distribution of nodes across different compute capacities and how workload management functions within Synapse Analytics. Our solution was deployed within tight delivery timelines and the customer is pleased with its overall implementation and effective performance. Continue reading... Quote
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.