Spark performance optimization compendium
By Niraj Zade | 2025 Oct 11 | 4m read
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
The intended audience of this note is:
Your operational loop is going to be:
By default, spark will read all SQL query data through a single query, as a single partition. This will be very slow, and also cause a gigantic skew (as there is only one partition).
To avoid this issue, there are mechanisms to tell the JDBC reader how to parallelize the query.
If the primary key column is if type int, long, timestamp, then it is quite straightforward:
df = spark.read.jdbc(
url=jdbc_url,
table="transactions",
column="id", # numeric column used for partitioning
lowerBound=1,
upperBound=1000000,
numPartitions=10,
properties=props
)
This will end up firing 4 SQL queries in parallel
SELECT * FROM transactions WHERE id >= 1 AND id < 100001;
SELECT * FROM transactions WHERE id >= 100001 AND id < 200001;
...
SELECT * FROM transactions WHERE id >= 900001 AND id <= 1000000;
The predicates will have to be supplied manually
predicates = [
"region = 'NORTH'",
"region = 'SOUTH'",
"region = 'EAST'",
"region = 'WEST'"
]
df = spark.read.jdbc(
url=jdbc_url,
table="customers",
predicates=predicates,
properties=props
)
Each string in predicates will be used in a WHERE clause as follows:
SELECT * FROM customers WHERE region = 'NORTH';
SELECT * FROM customers WHERE region = 'SOUTH';
SELECT * FROM customers WHERE region = 'EAST';
SELECT * FROM customers WHERE region = 'WEST';
This method will put load on the db, but the data will be evenly distributed among the spark job's executors. But this is the only realistic method for dealing with columns containing high cardinality non-numeric datatypes like UUID strings etc.
Eg - Split the workload across 4 queries:
num_partitions = 4
predicates = [f"MOD(ABS(HASH(user_id)), {num_partitions}) = {i}" for i in range(num_partitions)]
df = spark.read.jdbc(
url=jdbc_url,
table="(SELECT * FROM users) AS users_subquery",
predicates=predicates,
properties=props
)
This will create the following queries:
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 0;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 1;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 2;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 4;
This is great for reconciliation runs etc
predicates = [
"created_at < '2024-01-01'",
"created_at >= '2024-01-01' AND created_at < '2024-04-01'",
"created_at >= '2024-04-01'"
]
df = spark.read.jdbc(
url=jdbc_url,
table="(SELECT * FROM users) AS users_subquery",
predicates=predicates,
properties=props
)
Analyze constituent files with - DESCRIBE DETAIL tbl
Stale statistics
ANALYZE tbl COMPUTE STATISTICSSmall file problem
OPTIMIZE tblData layout for file-skipping - Cluster on some stable column - Enable Liquid clustering, or do Z-ordering - Aim for file file sizes of 128mb-1gb
Vacuum up history
VACUUM tblDatabricks spark has vectorized reads enabled by default spark.sql.parquet.enableVectorizedReader = True. This leads to a huge boost in parquet parsing performance, but it also causes problems when reading decimal columns. This is because vectorized reader doesn't support decimal, and starts treating decimal values as binary.
Turn it off if this issue is encountered:
park.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
Table Of Contents