Jump to content

Featured Replies

Posted

Author(s): Arun Sethia is a Program Manager in Azure HDInsight Customer Success Engineering (CSE) team.

 

Introduction

 

 

On February 27, 2023, HDInsight has released Spark 3.1 (containing stability fixes from Spark 3.0.0), part of HDI 5.0. The HDInsight Team is working on upgrading other open-source components in the upcoming release.

 

 

 

In Spark 3.1, the community has added 1700+ improvements, 1680+ fixes, and 130+ new features. In addition, we also added some performance improvements in 3.x, which will benefit customers significantly. Some of the exciting new features and improvements are:

 

 

 

  • Performance enhancement - AQE, Dynamic Partition Pruning, etc.)
  • Project Hydrogen - Accelerator-aware Schedule
  • Redesigned pandas UDFs with type hints
  • PySpark SQL exceptions more Pythonic
  • Better ANSI SQL Compatibility
  • DataSource V2 API refactoring and Catalog Plugin APIs
  • Hive 3.0 and 3.1 metastore support
  • Introducing a complete set of Join Hints
  • New built-in data source for binary file
  • 15+ New built-in functions
  • New built-in functions
  • Support Prometheus monitoring natively
  • New Structured Streaming UI
  • and many more.

 

largevv2px999.png.1fda14d6c8dfb2c1c29b6eb344daa394.png

 

We can't cover all features in one blog; in the next series of blogs, the HDInsight Team will provide a few benefits from moving HDInsight 4.0 (Spark 2.4) to HDInsight 5.0 (Spark 3.1) or HDI 5.1 (Spark 3.3); it is not only going to bring our customers' latest to available Apache Spark version but also going to provide a better performance, lower the TCO.

 

 

 

This blog is dedicated to performance enhancements using Dynamic Coalescing Shuffle Partitions from Adaptive Query Execution (AQE); subsequent blogs will be focused on Dynamic Switching Join Strategies and Dynamic optimizing Skew Joins.

 

Adaptive Query Execution – Dynamic Coalescing Shuffle Partitions

 

 

 

 

Before we jump into the feature detail, let us understand the problem statement from an example code:

 

 

 

Here we are considering a page hit data points from an online retail portal, where we have the amount of time (sec) spent by each click on a given page. The code example is available on git.

 

 

 

The following code will compute the sum of time spent on each page. The typical physical plan for Spark SQL groupBy(key).agg(sum(“field”)) is HashAggregate -> Exchange -> HashAggregate. The first HashAggregate is responsible for doing partial aggregation local to the executor. Then the Exchange represents the shuffle, and the second HashAggregate represents the final aggregation (final merge) after the shuffle.

 

 

 

largevv2px999.png.5a698aef2d619806002e0b5a0d817ca5.png

 

 

 

The SQL DAG will for the above code will be as follows:

 

largevv2px999.thumb.png.fc075ae1a22478e0453de73dbc20630d.png

 

 

 

From Job perspective; It is going to have two stages.

 

  • Stage0 – reads data and fills the output exchange with 6 rows (3 for Home page, 1 for each search, catalog, checkout)
  • Stage1 – reads data from output exchange and brings it to input exchange.

 

mediumvv2px400.png.581449a4f4aaa09fc4b4b9db61571aa2.png mediumvv2px400.png.2113b8c48c8782b49e387f1caea84788.png

 

The Stage0 has 4 partitions, during the shuffle operation (caused by wide transformation groupBy) the stage1 got 200 partitions (default value of spark.sql.shuffle.partitions).

 

 

 

largevv2px999.png.c2c95be08b870b76427c17590c444a91.png

 

 

 

If we detail the stage1, we will find that out of 200, only 4 of the partitions have few records; the remaining 196 are empty. However, Spark will still schedule and monitor tasks for these 196 partitions, even if they are empty.

 

 

 

largevv2px999.png.63a092bd32be11f89fc12c5f14187dfb.png

 

The immediate solution is to set a smaller size for the spark.sql.shuffle.partitions to avoid such a situation. The bigger question is what that number would be. It will be hard for developers to predict how many unique keys there will be to configure the required number of partitions.

 

The other problem is data skewness; each partition may have a different data size(or records). It will result in a couple of problems:

 

  • Total elapsed time will be longer - The time taken by a larger set of partitions will be bigger than a smaller set of partitions, which means the total elapsed time will be longer.
  • Larger partitions on a few executors will result in memory overhead, OOM, and GC-related issues.
  • Scheduler Overhead and Network Congestion – too many smaller partitions will result in scheduling overhead, network overhead, and congestion during the reading process.

 

Adaptive Query Execution (AQE) re-optimize and adjust the query plan based on runtime statistics collected during query execution to choose the most efficient query execution plan. Using Spark Adaptive Query Execution (AQE) we can overcome from our current problems; it will help in the Reducing Post-shuffle Partitions or apply dynamic coalescing shuffle partitions.

 

 

 

AQE is disabled by default in Spark SQL 3.1.1 and enabled by default from Spark SQL 3.2.0. The Spark SQL can use the umbrella AQE configuration of to make use of dynamic coalesce partitioning:

 

  • spark.sql.adaptive.coalescePartitions.initialPartitionNum - initial number of shuffle partitions before coalescing
  • spark.sql.adaptive.coalescePartitions.minPartitionNum - minimum number of shuffle partitions after coalescing
  • spark.sql.adaptive.coalescePartitions.enabled - Enable coalesce contiguous shuffle partitions
  • spark.sql.adaptive.advisoryPartitionSizeInBytes - Advisory size in bytes of the shuffle partition

 

While I am writing this blog the HDInsight 5.0 is available with Spark 3.1.1 and HDInsight 5.1 is coming with Spark 3.3.0. The subsequent code examples are based on HDInsight 5.0 (Spark 3.1.1).

 

 

 

You can enable AQE using spark configuration:

 

  • From notebook
     
    %%configure -f
    { "conf": {"spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled":"true"
    }
    }
  • From the job code
    spark.conf.set("spark.sql.adaptive.enabled","true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")
  • Or by adding additional properties in Spark configuration from Ambari
    largevv2px999.png.ece5bfe2ce07d2c2637c5f5001910d70.png

 

Let’s run the earlier code, enabled with AQE with HDInsight 5.0 using Spark 3.1.1. The code example is available on git.

 

 

 

largevv2px999.png.55c4bb7afe3bec13e2bec4c670f30245.png

 

The physical plan (From Spark UI -> SQL) for this execution will have initial plan and AdaptiveSparkPlan. When the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false.

 

 

 

largevv2px999.png.82a1fa6b23632392b2ec508a359ae121.png

 

After the query execution is complete, the isFinalPlan flag changes to true.

 

largevv2px999.png.f1c6cee18fa3abfc1dc42399ef1ff4a6.png

 

 

 

mediumvv2px400.png.8a9dc0960ff028be740c96ca1c184638.png

 

While looking into stage metrics we can find that number of partitions has reduced from 200 to 4 and initial plan is skipped with new optimized plan.

 

largevv2px999.png.bbf9e39f66f1428a6a98d513d4c16098.png

 

 

 

AQE applies for queries that meet the following criteria:

 

  • Adaptive Query Execution can only be used for queries with at least one exchange (Exchange Physical Operators) or one subquery (SubqueryExpression)
  • Adaptive Query Execution is not supported for Structured Streaming

TPC-DS Performance Gains From Spark 3.1 and AQE

 

 

A 1TB TPC-DS benchmark is conducted using HDInsight 4.0 (Spark 2.4) and 5.0 (Spark 3.1) with 103 Spark SQL queries. The following Spark AQE configuration is defined for HDInsight 5.0 (Spark 3.1):

 

 

 

"spark.sql.adaptive.enabled": "true"

"spark.sql.adaptive.localShuffleReader.enabled": "true"

"spark.sql.adaptive.coalescePartitions.enabled": "true"

"spark.sql.adaptive.skewJoin.enabled": "true"

 

 

 

As we can see from the following comparison chart, Spark 3.1 performed roughly 1.8x better than Spark 2.4 in total runtime. As we can see, there is an improvement by Spark 3.x and enabling AQE.

 

largevv2px999.png.59f8ab77772a34b7ed2744e37f77c9f7.png

 

We have seen 20 TPCDS queries with more than 1.5x speedup and 35 TPCDS queries with more than 1.3x; below is a chart of the top 10 TPC-DS queries with the most performance improvement by Spark 3.x and AQE.

 

largevv2px999.png.7c032dd5cc84e2d558493076c7baf457.png

 

Summary

 

 

Spark 3.x enhancements and features will maximize Spark resource utilization and performance. Therefore, we recommend that our customers start migrating their workload from Spark 2.4 to Spark 3.x to benefit from Spark 3.x features and enhancements.

 

 

 

This blog has covered one of the AQE features dynamically coalesce; in the coming blogs, we will cover AQE dynamic join and skew join optimization.

 

 

 

Few upcoming blogs in this series will have:

 

- AQE - Dynamic join strategy switching

 

- PySpark Enhancements and pandas UDFs with type hints

 

- Monitoring and Debugging

 

- Streaming and extensibility

 

References

 

 

Performance Tuning - Spark 3.0.0 Documentation

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...