Spark Supernote

By Niraj Zade  |  2025 Oct 16  |  32m read


A collection of spark (and pySpark) knowledge. Partly written because writing is how I learn, and partly because I need a reliable place with the info I need.

Adaptive Query Engine (AQE)

2024-02-20 The Adaptive Query Engine aka AQE was introduced in spark 3.0.0, and has been a major step up in making working with spark easier.

We enable AQE by setting the configuration variable spark.sql.adaptive.enabled to true.

Spark AQE follows the tried and tested way of bringing automation magic into systems:

  1. Create a system with a lot of configurations that lets users fine tune their operations
  2. Create a "meta management layer" that automatically sets and tweaks these parameters during runtime, to make the operation run better without needing user interaction

AQE has ended up automating many critical settings that we set as users (most importantly - spark.sql.shuffle.partitions), and has enabled those settings to automatically dynamically change between stages. This makes the system more hands off and removes guesswork from the system. This is a major game changer.

Understanding the context - problems in spark

When working with spark, there are 2 major problems that lead to out of memory errors (OOM) - improper partitioning and data skew.

You notice these problems in spark very clearly when you work with warehousing engines that don't make you think about these errors (like Snowflake, big query etc).

Now, we cannot directly compare spark framework with warehousing engines like Snowflake and BigQuery. But I want you to understand the larger point - spark makes you think too much about partitions and the execution of your program, and fails too often during execution.

The partition size problem

The main problem is managing partition sizes - in storage, and during execution. If partition sizes grow too large, your executors will throw OOM errors. If partition sizes grow too small, performance degrades.

Managing partition sizes while storing and reading from storage is pretty much solved, and doesn't take much experience to solve it.

However, managing partition sizes during execution is whole another thing. It simply takes a lot of experience to properly control them. Look back at most of the OOM errors and execution problems you have had while working with spark. You'll see that spark.sql.shuffle.partitions has been the cause and solution for majority of the problems.

The skew problem

The real world is skewed. Data is a reflection of the real world. So your data too will be skewed. It is inevitable.

Some examples are:

  • There will always be some products who record disproportionately more sales than other products
  • There will be some days (eg - black friday, big billion day) that lead to disproportionately more sales than other days
  • Social media accounts of famous people will generate disproportionately more engagement data than other normal accounts

This skew ends up coming into your data's partitions. So while some partitions get processed quickly while putting minimal load on their executors, the huge skewed partitions will come in and kill their executor with OOM errors.

Major warehousing engines in the market don't have these problems

If you have worked with warehouses like Snowflake, BigQuery etc, you know how painfree the experience is. You just put in data, partition it on some key, and just execute queries. You don't have to worry about size of partitions, types of joins etc. Large partitions and skews will cause reduced performance, but they won't straight out kill your queries.

These warehouses and their processing engines simply hide away the executors from us. Their query planners and execution engines take care of partitions and skews for us. Working with them is a pleasant & brainless experience. The query might run slow, but it won't fail.

With spark, we simply have to care too much about execution. This causes headaches as well as a high barrier to entry. Databricks is the largest contributor to spark. With Databricks' new offering for business intelligence through Databricks SQL, this high barrier to entry and requirement of execution knowledge has become an even larger problem. Analysts simply want to run queries and get answers. They don't want to deal with this execution OOM mumbo-jumbo. If you have been in the field long enough, you'll quickly realize that majority of the people don't care about performance - this entire field is famously littered with huge un-optimized SQL queries. Most people just want their queries to run, and their reports to come out on time. That's all.

Spark has been taking steps towards reaching the developer comfort of these engines, and making the developers worry less about these execution problems. AQE is a part of these steps.

Currently, AQE targets 3 particular problems:

  1. Partition size across stages problem
  2. Data skew problem
  3. Slow join problem

Partition size across stages problem

During storage, you create partitions and buckets carefully such that they don't overload the executors when read.

But what about during execution? what about the partition size that increases during execution when you perform joins and operations on data?

During execution, if the partition size grows too large, you'll again get OOM errors. So now, you tune the partition size by explicitly telling spark to cut up partitions. You do this using the spark.sql.shuffle.partitions setting.

You look at the stage with the most amount of data and set spark.sql.shuffle.partitions to a large enough number such that even that largest stage will run without OOM errors. But this makes other stages with smaller volume of data suffer from the "too small partitions" problem. This kills performance.

What you actually wanted was the option to set partition size on a stage by stage basis. Not set the same number of partitions across all stages.

This is where Coalescing Post Shuffle Partitions helps. With this setting, spark.sql.shuffle.partitions goes from being a number that is strictly applied to all stages, to simply becoming an upper bound of number of partitions.

Suppose spark.sql.shuffle.partitions was set to 1000. But aqe looks at the stats and sees that after the shuffle, the partitions have become too small, and actually work best with 100 partitions. In this case, after the shuffle, AQE will coalesce the partitions to form larger partitions.

In this way, AQE avoids the "many tiny partitions problem"

Coalescing Post Shuffle Partitions

Spark AQE avoids the small partition problem by coalescing partitions after shuffle to form a larger partitions. You practically have to stop worrying about spark.sql.shuffle.partitions.

This is enabled when both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled are set to true.

Just set a large enough initial number of shuffle partitions in spark.sql.adaptive.coalescePartitions.initialPartitionNum, and spark will pick proper partition number at runtime. No need to set a specific shuffle partition number through spark.sql.shuffle.partitions to fit our dataset. This simplifies the tuning of shuffle partition numbers when running queries.

When enabled, AQE will coalesce shuffle partitions to the target size set in spark.sql.adaptive.advisoryPartitionSizeInBytes.


NOTE:

However, by default spark AQE will not try to achieve partition sizes equal to spark.sql.adaptive.advisoryPartitionSizeInBytes. Instead, it will try to maximize parallelism and avoid performance regression when enabling adaptive query execution.

This default behaviour is because spark.sql.adaptive.coalescePartitions.parallelismFirst is set to true by default. It is recommended to set it to false and turn off this behaviour.

When spark.sql.adaptive.coalescePartitions.parallelismFirst is true, spark ignores the target size in spark.sql.adaptive.advisoryPartitionSizeInBytes while coalescing, and only respects the minimum partition size specified by spark.sql.adaptive.coalescePartitions.minPartitionSize.

Why this behaviour? Because coalescing will reduce the number of partitions, so it will reduce the number of parallel tasks, so it will cause a reduction in the job's execution speed. The spark devs don't want your job execution speed to suffer when you turn on AQE. That's why this default behaviour maintains job parallelism and execution speed. Then you come in and deliberately turn this behaviour off and let AQE coalesce and increase the partition sizes to try to achieve the partition size specified in spark.sql.adaptive.advisoryPartitionSizeInBytes.

Another configuration variable you need to know is spark.sql.adaptive.coalescePartitions.initialPartitionNum. This variable sets the initial number of shuffle partitions before coalescing. If it is not set, spark uses the value in spark.sql.shuffle.partitions

Details of configuration variables:

  • spark.sql.adaptive.coalescePartitions.enabled
    • Default value = true
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)
  • spark.sql.adaptive.coalescePartitions.parallelismFirst
    • Default value = true (recommended to setting it to false)
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
  • `spark.sql.adaptive.coalescePartitions.minPartitionSize
    • Default value = 1mb
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Sets the minimum shuffle partitions after coalescing. Can be at most 20% of spark.sql.adaptive.advisoryPartitionSizeInBytes. Used when target size is ignored by setting spark.sql.adaptive.coalescePartitions.parallelismFirst to true
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum
    • Default value= none ( (spark will use the value set in spark.sql.shuffle.partitions, whose default value is 200)
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)
    • When not set, spark will use the value set in spark.sql.shuffle.partitions (whose default value is 200)
  • spark.sql.adaptive.advisoryPartitionSizeInBytes
    • Default value = 64mb
    • Available from spark 3.0.0 onward (All currently suported Datbricks runtimes)

Data skew problem

Data skew is a major unavoidable problem. The few skewed partitions will cause OOM in their executors.

Until AQE, there were 2 solutions for this problem:

Solution 1. Increase executor sizes

This isn't a feasible solution. Suppose all your partitions need 1gb of ram to execute, but one skewed partition needs 4gb to execute. Now, while using spark, you cannot make a single larger 4gb executor and tell spark to use this larger executor to process the skewed partition. While using spark, you'll have to increase the ram of ALL executors to 4gb. Now, all partitions along with the skewed partition will be processed successfully. But all the other normal sized partition only needed 1gb of ram. So, 3gb of ram in all executors got wasted.

Hardware is expensive. So this is not a solution.

Solution 2. Manually split partitions with salting

(If you don't know salting, there are many articles and videos explaining it on the internet.)

This is the solution used by everyone. If the partition on current column is causing skews, we create an synthetic column, and make spark re-partition the data by shuffling on the combination of older partition key column and synthetic column. In this way, we break the skewed partition into smaller and more evenly distributed chunks.


With AQE, we simply don't have to worry about this anymore.

When enabled, AQE will automatically split skewed partitions when they cross the thresholds you set in configuration. No need to increase hardware resources. No need to figure out salting.

This setting is enabled by setting spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled to true. When partition splitting is enabled, the partitions will be split to achieve the target size specified in spark.sql.adaptive.advisoryPartitionSizeInBytes.

A partition will be merged during splitting if its size is smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes multiplied by spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor

Example - the default value of spark.sql.adaptive.advisoryPartitionSizeInBytes is 64mb, and the default value of spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor is 0.2. So, if a partition's size is less then 12.8mb (64mb x 0.2), it will be merged to form a larger partition.

Details of configuration variables:

  • spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
    • Default value = true
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
  • spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
    • Default value = 0.2
    • Available from spark 3.3.0 onward (Databricks runtime 11.3 LTS onward)

Slow join problem

AQE will try to upgrade slow join strategies to faster join strategies.

AQE particularly tries to avoid Shuffle Sort Merge join strategy by converting it into a variant of the Hash join strategies - Broadcast Hash join or Shuffle Hash join

Why? because shuffle sort merge join sorts both sides before the merge operation. This is computationally very expensive.

Converting Shuffle Sort Merge join to Broadcast Hash join

If any join side is smaller than size specified in spark.sql.adaptive.autoBroadcastJoinThreshold, spark will convert it into Broadcast Hash join.

There is also an option to read shuffle files locally within the node itself, to save network traffic. This is done using the spark.sql.adaptive.localShuffleReader.enabled variable.

More details of configuration variables:

  • spark.sql.adaptive.autoBroadcastJoinThreshold
    • Default value = none (AQE will use the value in spark.sql.autoBroadcastJoinThreshold)
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Set the maximum size in bytes
    • Broadcasting can be disabled by setting value to -1.
  • spark.sql.adaptive.localShuffleReader.enabled
    • Default value = true
    • Available from spark 3.0.0 onward (All currently supported Databricks runtimes)
    • When spark.sql.adaptive.enabled is true and spark.sql.adaptive.localShuffleReader.enabled is true, spark will use the local shuffle reader when shuffle partitioning is not needed.

Converting Shuffle Sort Merge join to Shuffle Hash Join

If all post shuffle partitions are smaller than a threshold (spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold), AQE will convert the Shuffle Sort Merge join to Shuffle Hash join.

Spark AQE will convert Shuffle Sort Merge join to Shuffle Hash Join irrespective of the value of spark.sql.join.preferSortMergeJoin when:

  1. spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold is greater than spark.sql.adaptive.advisoryPartitionSizeInBytes
  2. all partition sizes are smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes

More details of configuration variables:

  • spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
    • Default value = 0
    • Available from spark 3.2.0 onward (Databricks runtime 10.4 LTS onward)
    • Sets the maximum partition size in bytes that is allowed to build local hash map (for the join algorithm)

Some interesting history behind sort merge joins and shuffle hash join

According to Spark Issue 11675, the shuffle hash join was removed from spark 1.6. The reason cited was -

... I think we should just standardize on sort merge join for large joins for now, and create better implementations of hash joins if needed in the future

Then in spark 2.0, shuffle hash join was added back into to spark, according to Spark issue 13977. The reason cited was:

ShuffledHashJoin is still useful when:

1) any partition of the build side could fit in memory

2) the build side is much smaller than stream side, the building hash table on smaller side should be faster than sorting the bigger side.

Source - Stackoverflow

Resources

Join strategies

2024/01/22

Join strategies are part of the fundamental knowledge required when working with any data management and processing engine. This is foundational knowledge - understand it for one engine is like understanding it for all engines.

This note explains the join strategies in spark, and how spark chooses a join strategy.

Basics you should know - Types of joins

Joins can be classified in the following ways:

  1. Type of join based on join conditions
    1. Equi join
    2. Non Equi Join
  2. Type of join based on set operations
    1. INNER JOIN
    2. LEFT JOIN / RIGHT JOIN
    3. CROSS JOIN
    4. LEFT SEMI JOIN / RIGHT SEMI JOIN
    5. OUTER JOIN

Types of joins based on join condition

There are 2 types of join conditions:

  1. Equi join
  2. Non-equi join

    The type of join condition decides if a join algorithm is applicable to the join operation or not.

Equi Join

In an equi-join, the join condition is a strict equality condition.

Examples: - table1.column = table2.column

SELECT *
FROM table1, table2
WHERE table1.column = table2.column;

Non-equi Join

In a non-equi join, the join condition is not a strict equality condition.

Examples:

  • table1.column > table2.column
  • table1.column < table2.column
  • table1.column >= table2.column
  • table1.column <= table2.column
  • table1.column != table2.column
  • table1.column BETWEEN table2.column1 AND table2.column2
SELECT *
FROM table1, table2
WHERE table1.column <= table2.column;

Types of joins based on set operation

These are the normal set operations that we do in our joins. These operations decide if a join algorithm is applicable to a particular join type or not.

  • INNER JOIN
  • LEFT JOIN / RIGHT JOIN
  • CROSS JOIN
  • LEFT SEMI JOIN / RIGHT SEMI JOIN
  • OUTER JOIN

Overview of available join strategies

The spark join process

The spark join process is simple:

  1. Collect relevant rows needed for the join operation within the relevant executors
  2. Apply the algorithm to join rows in the dataframe
  3. Save the joined rows' data into a dataframe

The first step - collecting relevant data at one place (within executors) - can be done in 2 ways:

  1. If the dataframe size is smaller than a threshold, spark will broadcast the dataframe to all executors
  2. If the dataframe size is greater than a threshold, spark will perform a shuffle

For the second step - joining the rows - there are 3 algorithms available in the spark engine:

  1. Hash joins
  2. Nested loop joins
  3. Sort merge join

These join algorithms are standard in the data processing world, and are in almost all relational database engines (eg - PostgreSQL).

Join strategies

So now, we have - 2 ways to collect relevant data into the executors - 3 algorithms to perform joins.

Through combinations of these two, there are 5 join strategies available in spark:

  1. Hash joins (2 variants - shuffle or broadcast):
    1. Broadcast Hash Join
    2. Shuffle Hash Join
  2. Nested loop joins (2 variants - shuffle or broadcast):
    1. Broadcast Nested Loop Join
    2. Shuffle and Replicate Nested Loop Join
  3. Sort merge join:
    1. Shuffle Sort Merge Join

Hash Joins

Wikipedia article on the algorithm - Wikipedia - Hash Join

Spark engine calculates hash table of the join column(s) of one of the dataframes (usually the smaller dataframe) and stores it in memory. Then it loops over rows in the other dataframe, calculates hash of the current row, and matches this calculated hash with hashes of the other dataframe stored in memory. If both hashes match, the rows are joined and added to the result dataframe.

Since the hashtable of one of the dataframes has to be stored in memory, hash join algorithms will always use more memory than nested loop join algorithms.

Broadcast Hash join - BROADCAST

The hinted dataframe is broadcasted to all worker nodes, and the join is performed locally on each node.

Used when a small dataframe is being joined to another dataframe. The small dataframe will be broadcasted to all executors for joining.

Typical use case is when joining a dimension table to a fact table - We usually broadcast the dimension table to all executors, and then join it to the fact table within executors.

You can hint spark to use broadcast hash join using any one of the following hints -

  • BROADCAST - preferred
  • BROADCASTJOIN
  • MAPJOIN

Example:

# hint Braodcast Hash Join
joined_df = fact_sales_df.join(country_dim_df.hint("BROADCAST"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- BroadcastHashJoin [country_id#231], [country_id#254], Inner, BuildRight, false
      :- Filter isnotnull(country_id#231)
      :  +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=1130]
         +- Filter isnotnull(country_id#254)
            +- Scan ExistingRDD[country_id#254,country_name#255]

Shuffle hash join - SHUFFLE_HASH

It redistributes both dataframes' data across the partitions using a hash function on the join key, so that matching keys end up in the same partition. And then, the join is performed locally on each node.

You can hint spark to use broadcast hash join using the "SHUFFLE_HASH" hint.

Example:

# hint Broadcast Shuffle Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_HASH"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- ShuffledHashJoin [country_id#231], [country_id#254], Inner, BuildRight
      :- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=1251]
      :  +- Filter isnotnull(country_id#231)
      :     +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=1252]
         +- Filter isnotnull(country_id#254)
            +- Scan ExistingRDD[country_id#254,country_name#255]

Nested Loop joins

Wikipedia article on the algorithm - Nested Loop Join

This is a Cartesian product join, and is also the simplest join algorithm to understand.

To find matching rows to join - the algorithm iterates through rows of the outer table, and compares it to every row of the inner table. If the join condition is satisfied, the rows are joined and added to the result dataframe.

Suppose the outer table has n rows and inner table has m rows, then this join has O(n*m) time complexity. So, it is a very very expensive join.

Broadcast nested loop join

There is no hint for this join type, as it is the default join strategy used by spark (when no other strategies are applicable).

From comments in the spark source code:

If the join is an equi join:

//5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice.

If the join is a non-equi join:

//   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
//      left side for right join, and broadcasts right side for left join.

Shuffle-and-Replicate Nested Loop Join - SHUFFLE_REPLACE_NL

You can hint spark to use broadcast hash join using the "SHUFFLE_REPLICATE_NL" hint.

Example:

# hint Shuffle and Replicate Nested Loop Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_REPLICATE_NL"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
*(3) Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- CartesianProduct (country_id#231 = country_id#254)
   :- *(1) Filter isnotnull(country_id#231)
   :  +- *(1) Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
   +- *(2) Filter isnotnull(country_id#254)
      +- *(2) Scan ExistingRDD[country_id#254,country_name#255]

Sort Merge join

In this algorithm, both dataframes are sorted based on the join key, and then merged if the join condition is satisfied. The merged rows are added to the result dataframe.

This algorithm only works with equi-joins.

Shuffle sort merge join - MERGE

This strategy is efficient when both DataFrames are large and can be sorted without shuffling. To avoid the shuffle phase, make sure the data is bucketed so that the shuffle phase is avoided.

You can hint spark to use broadcast hash join using any one of the following hints -

  • MERGE - preferred
  • SHUFFLE_MERGE
  • MERGEJOIN

Example:

# hint Shuffle Sort Merge Join
joined_df = fact_sales_df.join(country_dim_df.hint("MERGE"), on="country_id", how="inner")

# explain physical plan
joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
   +- SortMergeJoin [country_id#231], [country_id#254], Inner
      :- Sort [country_id#231 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=2187]
      :     +- Filter isnotnull(country_id#231)
      :        +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
      +- Sort [country_id#254 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=2188]
            +- Filter isnotnull(country_id#254)
               +- Scan ExistingRDD[country_id#254,country_name#255]

How does spark choose a join strategy?

Spark has a straightforward, predictable way of choosing a join strategy. It goes through a decision tree to choose the join strategy to use.

If the join is an equi-join

If join hints are provided - check hints in this order and pick the first hint that matches:

  • Broadcast hash join hint (BROADCAST , BROADCASTJOIN, MAPJOIN) - If both sizes have broadcast hints, broadcast the smaller side.
  • Sort merge Join (MERGE) - Pick if join keys are sortable.
  • Shuffle hash join (SHUFFLE_HASH) - If both sides have shuffle hash hints, choose smaller side as the build side.
  • Shuffle replicate nested loop join (SHUFFLE_REPLACE_NL) - Pick if the join type is inner-like.

If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:

  1. If one side is small enough to broadcast, and broadcast hash join is supported, pick Broadcast Hash join.
    • If both sides are small, broadcast the smaller side.
  2. If one side is small enough to build a local hashmap, and that side is much smaller than the other side, and spark.sql.join.preferSortMergeJoin is false, pick Shuffle Hash join.
  3. If join keys are sortable, pick Sort Merge join.
  4. If the join type is inner like, pick cartesian product join.

No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.

If the join is a non-equi join

If join hints are provided - check hints in this order and pick the first hint that matches:

  • Broadcast hash join hint (BROADCAST , BROADCASTJOIN, MAPJOIN) - If both sizes have broadcast hints, broadcast the smaller side.
  • Shuffle replicate nested loop join (SHUFFLE_REPLACE_NL) - Pick if the join type is inner-like.

If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:

  1. If one side is small enough to broadcast, and broadcast hash join is supported, pick Broadcast Hash join.
    1. If only left side is broadcast-able and it's left join, or only right side is broadcast-able and it's right join, skip this rule.
    2. If both sides are small, broadcast the smaller side for inner and full joins, broadcast the left side for right join, and broadcast right side for left join.
  2. If the join type is inner like, pick cartesian product join.

No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.

Comments from the source code

Commends in the spark source code describe this process of choosing join strategy. (Read the comments in the apply() function inside SparkStrategies.scala - Source code link)

Here are the comments:

Equi join

  // If it is an equi-join, we first look at the join hints w.r.t. the following order:
  //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
  //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
  //   2. sort merge hint: pick sort merge join if join keys are sortable.
  //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
  //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
  //      build side.
  //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
  //
  // If there is no hint or the hints are not applicable, we follow these rules one by one:
  //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
  //      is supported. If both sides are small, choose the smaller side (based on stats)
  //      to broadcast.
  //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
  //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
  //   3. Pick sort merge join if the join keys are sortable.
  //   4. Pick cartesian product if join type is inner like.
  //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
  //      other choice.

Non-equi join

// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
//   1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
//      hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
//      choose the left side for right join, and choose right side for left join.
//   2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
//   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
//      side is broadcast-able and it's left join, or only right side is broadcast-able and
//      it's right join, we skip this rule. If both sides are small, broadcasts the smaller
//      side for inner and full joins, broadcasts the left side for right join, and broadcasts
//      right side for left join.
//   2. Pick cartesian product if join type is inner like.
//   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
//      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
//      left side for right join, and broadcasts right side for left join.

Notes

Note 1

If your spark version is less then 3.0.0, only broadcast hash join's hints are supported. Only spark version 3.0.0 and later support all 4 join hints.

Note 2

The hint() function is not case sensitive. So, you can give the join hint in any case - uppercase, lowecase, mixed case..

For example - all these are valid hints, recognized by spark:

  • hint('mErGe')
  • hint('merge')
  • hint('MERGE')

Other Articles