Spark performance optimization compendium

By Niraj Zade  |  2025 Oct 11  |  5m read


This note is a constant work in progress

This note will be useful to you if you are:

  1. A revising/updating their knowledge (me!)
  2. A junior data engineers building up a vocabulary of solutions to common problems

How to approach performance optimization?

Your operational loop is going to be:

  1. Determine the SLA (time budget for the pipelie's execution)
  2. Build pipeline
  3. Measure metrics
  4. Find out bottlenecks
  5. Reduce time lost to bottlenecks
  6. Repeat steps 3 to5 until SLA is met with a good margin

Speed up reads

Slow JDBC query based ingestion

By default, spark will read all SQL query data through a single query (as a single partition). This will be very slow (as there is no parallelization), and also cause a gigantic skew (as it will form only one partition).

To avoid this issue, there are mechanisms to tell the JDBC reader how to parallelize the query.

Parallelize on a numeric column

If the primary key column is if type int, long, timestamp, then it is quite straightforward:

df = spark.read.jdbc(
    url=jdbc_url,
    table="transactions",
    column="id",             # numeric column used for partitioning
    lowerBound=1,
    upperBound=1000000,
    numPartitions=10,
    properties=props
)

This will end up firing 4 SQL queries in parallel

SELECT * FROM transactions WHERE id >= 1 AND id < 100001;
SELECT * FROM transactions WHERE id >= 100001 AND id < 200001;
...
SELECT * FROM transactions WHERE id >= 900001 AND id <= 1000000;

Parallelize on a string column

The predicates will have to be supplied manually

predicates = [
    "region = 'NORTH'",
    "region = 'SOUTH'",
    "region = 'EAST'",
    "region = 'WEST'"
]

df = spark.read.jdbc(
    url=jdbc_url,
    table="customers",
    predicates=predicates,
    properties=props
)

Each string in predicates will be used in a WHERE clause as follows:

SELECT * FROM customers WHERE region = 'NORTH';
SELECT * FROM customers WHERE region = 'SOUTH';
SELECT * FROM customers WHERE region = 'EAST';
SELECT * FROM customers WHERE region = 'WEST';

Parallelize using a hashing function

This method will put load on the db, but the data will be evenly distributed among the spark job's executors. But this is the only realistic method for dealing with columns containing high cardinality non-numeric datatypes like UUID strings etc.

Eg - Split the workload across 4 queries:

num_partitions = 4
predicates = [f"MOD(ABS(HASH(user_id)), {num_partitions}) = {i}" for i in range(num_partitions)]

df = spark.read.jdbc(
    url=jdbc_url,
    table="(SELECT * FROM users) AS users_subquery",
    predicates=predicates,
    properties=props
)

This will create the following queries:

SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 0;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 1;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 2;
SELECT * FROM users WHERE MOD(ABS(HASH(user_id)), 8) = 4;

Parallelize on timestamps

This is great for reconciliation runs etc

predicates = [
    "created_at < '2024-01-01'",
    "created_at >= '2024-01-01' AND created_at < '2024-04-01'",
    "created_at >= '2024-04-01'"
]
df = spark.read.jdbc(
    url=jdbc_url,
    table="(SELECT * FROM users) AS users_subquery",
    predicates=predicates,
    properties=props
)

Slow transformations

Slow joins

(There are more details in the spark join strategies note)

One table is small enough to fit into memory

Perform a broadcast hash join (usually by broadcasting a dimension table)

# Force spark to send small_df to all executors
broadcast_join = big_df.join(broadcast(small_df), big_df.big_id == small_df.small_id, "inner")

Medium-sized datasets

When the dataset's hash can fit into memory, use the shuffle hash join.

(the hash table is used for comparison. So it must fit into the memory.)

hash_join = big_df.hint("shuffle_hash").join(
    small_df.hint("shuffle_hash"),
    big_df.big_id == small_df.small_id,
    "inner"
)

Delta lake optimization

Diagnosis and solutions for common problems

Analyze constituent files with - DESCRIBE DETAIL tbl

Stale table statistics

Re-analyze table, to refresh statistics using ANALYZE tbl COMPUTE STATISTICS

Small file problem

Enable auto-compaction, or manually perform compaction by running OPTIMIZE tbl

Set up a data layout for file-skipping

  • Cluster on some stable column
  • Enable Liquid clustering, or do Z-ordering
  • Aim for file file sizes of 128mb-1gb

Vacuum up history

To remove stale files (reduces storage usage)

  1. Review with DESCRIBE HISTORY
  2. Then run vacuum with VACUUM tbl

Random issues

Parquet specific issues

Parquet vectorized reader issue with decimal columns (upto spark 3.1)

Databricks spark has vectorized reads enabled by default spark.sql.parquet.enableVectorizedReader = True. This leads to a huge boost in parquet parsing performance, but it also causes problems when reading decimal columns. This is because vectorized reader doesn't support decimal, and starts treating decimal values as binary.

Turn it off if this issue is encountered:

park.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

Other Articles

© Niraj Zade 2025 - Website, Linkedin
Website was autogenerated on 2025-11-14
Whoever owns storage, owns computing | Silicon is this generation's atomics