Spark performance optimization - Complete reference
By Niraj Zade | 2025 Oct 11 | 3 min read
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
WORK IN PROGRESS
The intended audience of this note is:
Your operational loop is going to be:
By default, spark will read all SQL query data through a single query, as a single partition. This will be very slow, and also cause a gigantic skew (as there is only one partition).
Instead, tell the jdbc reader how to parallelize the query.
If the primary key column is if type int, long, timestamp, then it is quite straightforward:
df = spark.read.jdbc(
url=jdbc_url,
table="transactions",
column="id", # numeric column used for partitioning
lowerBound=1,
upperBound=1000000,
numPartitions=10,
properties=props
)
This will end up firing 4 SQL queries in parallel
SELECT * FROM transactions WHERE id >= 1 AND id < 100001;
SELECT * FROM transactions WHERE id >= 100001 AND id < 200001;
...
SELECT * FROM transactions WHERE id >= 900001 AND id <= 1000000;
The predicates will have to be supplied manually
predicates = [
"region = 'NORTH'",
"region = 'SOUTH'",
"region = 'EAST'",
"region = 'WEST'"
]
df = spark.read.jdbc(
url=jdbc_url,
table="customers",
predicates=predicates,
properties=props
)
Each string in predicates will be used in a WHERE clause as follows:
SELECT * FROM customers WHERE region = 'NORTH';
SELECT * FROM customers WHERE region = 'SOUTH';
SELECT * FROM customers WHERE region = 'EAST';
SELECT * FROM customers WHERE region = 'WEST';
This method will put load on the db, but the data will be evenly distributed among the spark job's executors. But this is the only realistic method for dealing with columns containing high cardinality non-numeric datatypes like UUID strings etc.
Eg - Split the workload across 4 queries:
num_partitions = 4
predicates = [f"MOD(ABS(HASH(user_id)), {num_partitions}) = {i}" for i in range(num_partitions)]
df = spark.read.jdbc(
url=jdbc_url,
table="(SELECT * FROM users) AS users_subquery",
predicates=predicates,
properties=props
)
This will create the following queries:
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 0;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 1;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 2;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 4;
This is great for reconciliation runs etc
predicates = [
"created_at < '2024-01-01'",
"created_at >= '2024-01-01' AND created_at < '2024-04-01'",
"created_at >= '2024-04-01'"
]
df = spark.read.jdbc(
url=jdbc_url,
table="(SELECT * FROM users) AS users_subquery",
predicates=predicates,
properties=props
)
Databricks spark has vectorized reads enabled by default spark.sql.parquet.enableVectorizedReader = True
. This is a huge win for performance because
This causes problems when reading decimal columns. Because vectorized reader doesn't support decimal, and starts treating decimal values as binary.
Turn it off if this issue is encountered:
park.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
Table Of Contents