Performance Optimization - Spark join strategies
By Niraj Zade | 2024 Jan 22 | 16 min read
Join strategies are part of the fundamental knowledge required when working with any data management and processing engine. This is foundational knowledge - understand it for one engine is like understanding it for all engines.
This note explains the join strategies in spark, and how spark chooses a join strategy.
Joins can be classified in the following ways:
There are 2 types of join conditions:
Non-equi join
The type of join condition decides if a join algorithm is applicable to the join operation or not.
In an equi-join, the join condition is a strict equality condition.
Examples:
- table1.column = table2.column
SELECT *
FROM table1, table2
WHERE table1.column = table2.column;
In a non-equi join, the join condition is not a strict equality condition.
Examples:
table1.column > table2.column
table1.column < table2.column
table1.column >= table2.column
table1.column <= table2.column
table1.column != table2.column
table1.column BETWEEN table2.column1 AND table2.column2
SELECT *
FROM table1, table2
WHERE table1.column <= table2.column;
These are the normal set operations that we do in our joins. These operations decide if a join algorithm is applicable to a particular join type or not.
The spark join process is simple:
The first step - collecting relevant data at one place (within executors) - can be done in 2 ways:
For the second step - joining the rows - there are 3 algorithms available in the spark engine:
These join algorithms are standard in the data processing world, and are in almost all relational database engines (eg - PostgreSQL).
So now, we have - 2 ways to collect relevant data into the executors - 3 algorithms to perform joins.
Through combinations of these two, there are 5 join strategies available in spark:
Wikipedia article on the algorithm - Wikipedia - Hash Join
Spark engine calculates hash table of the join column(s) of one of the dataframes (usually the smaller dataframe) and stores it in memory. Then it loops over rows in the other dataframe, calculates hash of the current row, and matches this calculated hash with hashes of the other dataframe stored in memory. If both hashes match, the rows are joined and added to the result dataframe.
Since the hashtable of one of the dataframes has to be stored in memory, hash join algorithms will always use more memory than nested loop join algorithms.
BROADCAST
The hinted dataframe is broadcasted to all worker nodes, and the join is performed locally on each node.
Used when a small dataframe is being joined to another dataframe. The small dataframe will be broadcasted to all executors for joining.
Typical use case is when joining a dimension table to a fact table - We usually broadcast the dimension table to all executors, and then join it to the fact table within executors.
You can hint spark to use broadcast hash join using any one of the following hints -
BROADCAST
- preferredBROADCASTJOIN
MAPJOIN
Example:
# hint Braodcast Hash Join
joined_df = fact_sales_df.join(country_dim_df.hint("BROADCAST"), on="country_id", how="inner")
# explain physical plan
joined_df.explain()
Output:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- BroadcastHashJoin [country_id#231], [country_id#254], Inner, BuildRight, false
:- Filter isnotnull(country_id#231)
: +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=1130]
+- Filter isnotnull(country_id#254)
+- Scan ExistingRDD[country_id#254,country_name#255]
SHUFFLE_HASH
It redistributes both dataframes' data across the partitions using a hash function on the join key, so that matching keys end up in the same partition. And then, the join is performed locally on each node.
You can hint spark to use broadcast hash join using the "SHUFFLE_HASH"
hint.
Example:
# hint Broadcast Shuffle Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_HASH"), on="country_id", how="inner")
# explain physical plan
joined_df.explain()
Output:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- ShuffledHashJoin [country_id#231], [country_id#254], Inner, BuildRight
:- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=1251]
: +- Filter isnotnull(country_id#231)
: +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
+- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=1252]
+- Filter isnotnull(country_id#254)
+- Scan ExistingRDD[country_id#254,country_name#255]
Wikipedia article on the algorithm - Nested Loop Join
This is a Cartesian product join, and is also the simplest join algorithm to understand.
To find matching rows to join - the algorithm iterates through rows of the outer table, and compares it to every row of the inner table. If the join condition is satisfied, the rows are joined and added to the result dataframe.
Suppose the outer table has n
rows and inner table has m
rows, then this join has O(n*m)
time complexity. So, it is a very very expensive join.
There is no hint for this join type, as it is the default join strategy used by spark (when no other strategies are applicable).
From comments in the spark source code:
If the join is an equi join:
//5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
If the join is a non-equi join:
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
SHUFFLE_REPLACE_NL
You can hint spark to use broadcast hash join using the "SHUFFLE_REPLICATE_NL"
hint.
Example:
# hint Shuffle and Replicate Nested Loop Join
joined_df = fact_sales_df.join(country_dim_df.hint("SHUFFLE_REPLICATE_NL"), on="country_id", how="inner")
# explain physical plan
joined_df.explain()
Output:
== Physical Plan ==
*(3) Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- CartesianProduct (country_id#231 = country_id#254)
:- *(1) Filter isnotnull(country_id#231)
: +- *(1) Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
+- *(2) Filter isnotnull(country_id#254)
+- *(2) Scan ExistingRDD[country_id#254,country_name#255]
In this algorithm, both dataframes are sorted based on the join key, and then merged if the join condition is satisfied. The merged rows are added to the result dataframe.
This algorithm only works with equi-joins.
MERGE
This strategy is efficient when both DataFrames are large and can be sorted without shuffling. To avoid the shuffle phase, make sure the data is bucketed so that the shuffle phase is avoided.
You can hint spark to use broadcast hash join using any one of the following hints -
MERGE
- preferredSHUFFLE_MERGE
MERGEJOIN
Example:
# hint Shuffle Sort Merge Join
joined_df = fact_sales_df.join(country_dim_df.hint("MERGE"), on="country_id", how="inner")
# explain physical plan
joined_df.explain()
Output:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [country_id#231, id#229L, customer_name#230, sales_amount#232L, country_name#255]
+- SortMergeJoin [country_id#231], [country_id#254], Inner
:- Sort [country_id#231 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(country_id#231, 200), ENSURE_REQUIREMENTS, [plan_id=2187]
: +- Filter isnotnull(country_id#231)
: +- Scan ExistingRDD[id#229L,customer_name#230,country_id#231,sales_amount#232L]
+- Sort [country_id#254 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(country_id#254, 200), ENSURE_REQUIREMENTS, [plan_id=2188]
+- Filter isnotnull(country_id#254)
+- Scan ExistingRDD[country_id#254,country_name#255]
Spark has a straightforward, predictable way of choosing a join strategy. It goes through a decision tree to choose the join strategy to use.
If join hints are provided - check hints in this order and pick the first hint that matches:
BROADCAST
, BROADCASTJOIN
, MAPJOIN
) - If both sizes have broadcast hints, broadcast the smaller side.MERGE
) - Pick if join keys are sortable.SHUFFLE_HASH
) - If both sides have shuffle hash hints, choose smaller side as the build side.SHUFFLE_REPLACE_NL
) - Pick if the join type is inner-like.If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:
spark.sql.join.preferSortMergeJoin
is false
, pick Shuffle Hash join.No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.
If join hints are provided - check hints in this order and pick the first hint that matches:
BROADCAST
, BROADCASTJOIN
, MAPJOIN
) - If both sizes have broadcast hints, broadcast the smaller side.SHUFFLE_REPLACE_NL
) - Pick if the join type is inner-like.If no hint matches or if no hint was provided by the programmer - check conditions in this order and pick the the join strategy of the first condition that is satisfied:
No condition was satisfied? pick Broadcast Nested Loop join. It may throw Out of memory error, but there is no other choice.
Commends in the spark source code describe this process of choosing join strategy. (Read the comments in the apply()
function inside SparkStrategies.scala - Source code link)
Here are the comments:
Equi join
// If it is an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
// have the broadcast hints, choose the smaller side (based on stats) to broadcast.
// 2. sort merge hint: pick sort merge join if join keys are sortable.
// 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
// sides have the shuffle hash hints, choose the smaller side (based on stats) as the
// build side.
// 4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
// is supported. If both sides are small, choose the smaller side (based on stats)
// to broadcast.
// 2. Pick shuffle hash join if one side is small enough to build local hash map, and is
// much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
// 3. Pick sort merge join if the join keys are sortable.
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
Non-equi join
// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
// hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
// choose the left side for right join, and choose right side for left join.
// 2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
// side is broadcast-able and it's left join, or only right side is broadcast-able and
// it's right join, we skip this rule. If both sides are small, broadcasts the smaller
// side for inner and full joins, broadcasts the left side for right join, and broadcasts
// right side for left join.
// 2. Pick cartesian product if join type is inner like.
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
If your spark version is less then 3.0.0, only broadcast hash join's hints are supported. Only spark version 3.0.0 and later support all 4 join hints.
The hint()
function is not case sensitive. So, you can give the join hint in any case - uppercase, lowecase, mixed case..
For example - all these are valid hints, recognized by spark:
hint('mErGe')
hint('merge')
hint('MERGE')