Posted April 5, 20231 yr Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team. Introduction On February 27, 2023, HDInsight has released Spark 3.1 (containing stability fixes from Spark 3.0.0), part of HDI 5.0. The HDInsight Team is working on upgrading other open-source components in the upcoming release. In Spark 3.1, the community has added 1700+ improvements, 1680+ fixes, and 130+ new features. In addition, we also added some performance improvements in 3.x, which will benefit customers significantly. Some of the exciting new features and improvements are: Performance enhancement - AQE, Dynamic Partition Pruning, etc.) Project Hydrogen - Accelerator-aware Schedule Redesigned pandas UDFs with type hints PySpark SQL exceptions more Pythonic Better ANSI SQL Compatibility DataSource V2 API refactoring and Catalog Plugin APIs Hive 3.0 and 3.1 metastore support Introducing a complete set of Join Hints New built-in data source for binary file 15+ New built-in functions New built-in functions Support Prometheus monitoring natively New Structured Streaming UI and many more. We can't cover all features in one blog; in the next series of blogs, the HDInsight Team will provide a few benefits from moving HDInsight 4.0 (Spark 2.4) to HDInsight 5.0 (Spark 3.1) or HDI 5.1 (Spark 3.3); it is not only going to bring our customers' latest to available Apache Spark version but also going to provide a better performance, lower the TCO. This blog is dedicated to performance enhancements using Dynamic Coalescing Shuffle Partitions from Adaptive Query Execution (AQE); subsequent blogs will be focused on Dynamic Switching Join Strategies and Dynamic optimizing Skew Joins. Adaptive Query Execution – Dynamic Coalescing Shuffle Partitions Before we jump into the feature detail, let us understand the problem statement from an example code: Here we are considering a page hit data points from an online retail portal, where we have the amount of time (sec) spent by each click on a given page. The code example is available on git. The following code will compute the sum of time spent on each page. The typical physical plan for Spark SQL groupBy(key).agg(sum(“field”)) is HashAggregate -> Exchange -> HashAggregate. The first HashAggregate is responsible for doing partial aggregation local to the executor. Then the Exchange represents the shuffle, and the second HashAggregate represents the final aggregation (final merge) after the shuffle. The SQL DAG will for the above code will be as follows: From Job perspective; It is going to have two stages. Stage0 – reads data and fills the output exchange with 6 rows (3 for Home page, 1 for each search, catalog, checkout) Stage1 – reads data from output exchange and brings it to input exchange. The Stage0 has 4 partitions, during the shuffle operation (caused by wide transformation groupBy) the stage1 got 200 partitions (default value of spark.sql.shuffle.partitions). If we detail the stage1, we will find that out of 200, only 4 of the partitions have few records; the remaining 196 are empty. However, Spark will still schedule and monitor tasks for these 196 partitions, even if they are empty. The immediate solution is to set a smaller size for the spark.sql.shuffle.partitions to avoid such a situation. The bigger question is what that number would be. It will be hard for developers to predict how many unique keys there will be to configure the required number of partitions. The other problem is data skewness; each partition may have a different data size(or records). It will result in a couple of problems: Total elapsed time will be longer - The time taken by a larger set of partitions will be bigger than a smaller set of partitions, which means the total elapsed time will be longer. Larger partitions on a few executors will result in memory overhead, OOM, and GC-related issues. Scheduler Overhead and Network Congestion – too many smaller partitions will result in scheduling overhead, network overhead, and congestion during the reading process. Adaptive Query Execution (AQE) re-optimize and adjust the query plan based on runtime statistics collected during query execution to choose the most efficient query execution plan. Using Spark Adaptive Query Execution (AQE) we can overcome from our current problems; it will help in the Reducing Post-shuffle Partitions or apply dynamic coalescing shuffle partitions. AQE is disabled by default in Spark SQL 3.1.1 and enabled by default from Spark SQL 3.2.0. The Spark SQL can use the umbrella AQE configuration of to make use of dynamic coalesce partitioning: spark.sql.adaptive.coalescePartitions.initialPartitionNum - initial number of shuffle partitions before coalescing spark.sql.adaptive.coalescePartitions.minPartitionNum - minimum number of shuffle partitions after coalescing spark.sql.adaptive.coalescePartitions.enabled - Enable coalesce contiguous shuffle partitions spark.sql.adaptive.advisoryPartitionSizeInBytes - Advisory size in bytes of the shuffle partition While I am writing this blog the HDInsight 5.0 is available with Spark 3.1.1 and HDInsight 5.1 is coming with Spark 3.3.0. The subsequent code examples are based on HDInsight 5.0 (Spark 3.1.1). You can enable AQE using spark configuration: From notebook %%configure -f { "conf": {"spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled":"true" } } From the job code spark.conf.set("spark.sql.adaptive.enabled","true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true") Or by adding additional properties in Spark configuration from Ambari Let’s run the earlier code, enabled with AQE with HDInsight 5.0 using Spark 3.1.1. The code example is available on git. The physical plan (From Spark UI -> SQL) for this execution will have initial plan and AdaptiveSparkPlan. When the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false. After the query execution is complete, the isFinalPlan flag changes to true. While looking into stage metrics we can find that number of partitions has reduced from 200 to 4 and initial plan is skipped with new optimized plan. AQE applies for queries that meet the following criteria: Adaptive Query Execution can only be used for queries with at least one exchange (Exchange Physical Operators) or one subquery (SubqueryExpression) Adaptive Query Execution is not supported for Structured Streaming TPC-DS Performance Gains From Spark 3.1 and AQE A 1TB TPC-DS benchmark is conducted using HDInsight 4.0 (Spark 2.4) and 5.0 (Spark 3.1) with 103 Spark SQL queries. The following Spark AQE configuration is defined for HDInsight 5.0 (Spark 3.1): "spark.sql.adaptive.enabled": "true" "spark.sql.adaptive.localShuffleReader.enabled": "true" "spark.sql.adaptive.coalescePartitions.enabled": "true" "spark.sql.adaptive.skewJoin.enabled": "true" As we can see from the following comparison chart, Spark 3.1 performed roughly 1.8x better than Spark 2.4 in total runtime. As we can see, there is an improvement by Spark 3.x and enabling AQE. We have seen 20 TPCDS queries with more than 1.5x speedup and 35 TPCDS queries with more than 1.3x; below is a chart of the top 10 TPC-DS queries with the most performance improvement by Spark 3.x and AQE. Summary Spark 3.x enhancements and features will maximize Spark resource utilization and performance. Therefore, we recommend that our customers start migrating their workload from Spark 2.4 to Spark 3.x to benefit from Spark 3.x features and enhancements. This blog has covered one of the AQE features dynamically coalesce; in the coming blogs, we will cover AQE dynamic join and skew join optimization. Few upcoming blogs in this series will have: - AQE - Dynamic join strategy switching - PySpark Enhancements and pandas UDFs with type hints - Monitoring and Debugging - Streaming and extensibility References Performance Tuning - Spark 3.0.0 Documentation Continue reading...
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.