pySpark api reference

May 23, 2023

The original reference isn't fast enough for my use

Collection <- Current phase


Note: the spark api has a lot of baggage. There is always more than one way to do the same thing.

However, there are two distinct styles of getting something done: 1. Directly using a function and setting its arguments 2. Chaining a function, and progressively setting options Example:

# direct
df ="/data/data.csv", nullValue="undefined")
df ="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
# progressive
df ="nullValue", "undefined").format('csv').load("/data/data.csv")

# direct
df.write.csv("/data/data.csv", mode="overwrite")
# progressive

I prefer to use the direct way of defining the operation. As we are simply setting arguments to a function, autocomplete works very well with it. And it is pythonic.

In the indirect method, we are setting strings most of the time (especially when using the options() method in which we purely use strings in the function chains.). This leads to mistakes, and also demands a lot of memorization.

You can notice the things that creep in when a framework of one language is operated through other languages.

Look at the vs You can see that the csv() is a sharp and dedicated function, while the load() function is designed to handle all data formats internally. The load() function has a distinct java/scala style to it (the factory style of programming)

Also, the progressive style of chaining functions has a distinctive functional programming feel to it. We don't usually write python in this style.


Readcsv into df

A particular CSV, or all CSVs in a directory

df ="/dir/path_with_many_csv/single_file.csv") # single
df ="/dir/path_with_many_csv/") # all

arguments: * delimiter="," * inferSchema='True' (default false) * header='True' (read first line as col name) * nullValue='unknown' (consider a string as null)

# Read with user defined schema
schema = StructType().add("name", StringType(), True).add("age", IntegerType(), True)

df ="path_to_csv", schema=)


Write df to csv

df.write.csv("/dir/path_with_many_csv/", mode="<mode>")

Where mode= * overwrite * append * ignore * error


Table into dataframe

df ="", table="", properties={})

# properties are jdbc connection arguments
# example:
properties = { user : SYSTEM, password : mypassword }

How to parallelize? From the documentation: Partitions of the table will be retrieved in parallel if either column or predicates is specified. lowerBound, upperBound and numPartitions is needed when column is specified.

Dataframe into table

<Insert example when I have the time>

params: * table (str) * mode * properties (dict)


Similar to csv, depends on the path you specify. * Entire firectory? give dir path * Single file? give file path * Multiple files? give files path as an array

df ="/dir/path_with_many_csv/single_file.csv") # single
df ="/dir/path_with_many_csv/") # all

JSON Lines will work by default. This data is never multiline (only one json record per line)

For one record for file json, with the data in multiple lines, set the argument multiline=True

JSON will need user specified schema, to maintain your sanity.

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
df ="<your path>", schema=schema)

Write json file:

Writes in JSON lines format

 df.write.json(path="/path/to/output/zipcodes.json", mode="overwrite")

Modes: * append * overwrite * ignore * error or errorifexists



df ="<path to dir, file, files>")

Notable parameters: * mergeSchema (boolean) * recursiveFileLookup * modifiedBefore * modifiedAfter



Notable parameters: * partitionBy (str, list of str) - parititon by column name(s)


The parquet api's options are surprisingly limited


Notable parameters: * mergeSchema (bool)



Notable params: * mode (str, optional) {append, overwrite, ignore error/errorifexists} * partitionBy (str or list) - names of cols to partition by


Read Reads every line in the text file as a new row in the dataframe.

Suppose file.txt looks like this:


Write The dataframe must have only one col of string type. Every row becomes a new line in the output file.


other params: * lineSep (str)


These are weird. Used to set arguments (options) of both read and write functions.

# you can either directly set the nullvalue argument of the function:"/data/", nullValue="undefined")
# or set arguments via options, through the chained functions style of coding"nullValue", "undefined").format('parquet').load("/data/data.parquet")

These are just two different styles of programming.

I don't like using options, and prefer to stick to directly setting arguments in functions. (Since we give strings in options, autocomplete doesn't work for them, which leads to mistakes)

More on writing


You might have noticed it at the end of chained write functions. We specify the path to save into (file or directory)


In the above example, the mode and format is set using functions in the chain.

You can also set them directly within save()"/data/data.json", format="json", mode="overwrite")

Other arguments: * partitionBy (list) * **options (dict) other string options)

This more than one way of doing things makes spark a pain to deal with.

Partition on a dataframe col while writing

uses partitionBy() function


The argument cols can be a string or a list of strings



Acts the same way as options. Either set mode as an argument to a function, or set it via the mode() function.

# you can either directly set the mode argument of the function:
# or set mode through the chained functions style of coding


Load is a generic function to read various file formats into a dataframe.

df ="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)

Prefer directly using the dedicated function. Eg instead of"csv")


Read a table into a dataframe'people').show()

The table used in this function is one loaded via a spark sql function like df.createOrReplaceTempView('people')

Write bucketBy()

write.bucketBy(2, "name").mode("overwrite").saveAsTable("bucketed_table")

Params: * numBuckets (int) * col (str, list, typle) * cols (str). Keep this empty if a list is already provided in cols


Use sortBy() to sort rows in a bucket

write.bucketBy(2, "name").sortBy(col="age").saveAsTable("bucketed_table")

parameters: * col (str, list of str) * cols (str, list of str)

Insert dataframe into a table

Works only when (schema of dataframe) = (schema of table)

df.selectExpr("age AS col1", "name AS col2").write.insertInto(tableName="tblPeople")

Params: * overwrite (bool). Default is false. Overwrites existing data when true.

Save into a table


Other params: * format (str) * mode (str) * partitionBy (str/list) * **options (dict)

Note: In append mode, if the table already exists, saveAsTable() doesn't need the dataframe's schema to be the same as table's schema. It will find correct column positions through the column names, and insert data into the tables.

Table stuff

The documentation of table stuff SUCKS. Maybe because it is legacy stuff


create new table from contents of dataframe. The new table’s schema, partition layout, properties, and other configuration will be based on the configuration set on this writer.


Replace an existing table with the contents of the data frame.

The existing table’s schema, partition layout, properties, and other configuration will be replaced with the contents of the data frame and the configuration set on this writer.


Create a new table or replace an existing table with the contents of the data frame.



Overwrite rows matching the given filter condition with the contents of the data frame in the output table.


Overwrite all partition for which the data frame contains at least one row with the contents of the data frame in the output table.





# set app name (shown in spark ui)
SparkSession.builder.appName("Your App Name")
# set master url
# or

# Set spark remote url
# set some config options
from pyspark.conf import SparkConf
# will be applied to sparksessions created or fetched via this builder


Checks if a valid global SparkSession exists. * Yes? returns it. Also applies config options specified in the builder to the returned SparkSession. * No? creates new SparkSession and sets it as the global default

session_1 = SparkSession.buildier.getOrCreate()
# can set its config
session_1.conf.set("key_1", "value_1")
# can get its config

Get currently active session

active_session = SparkSession.getActiveSession()

Create a new session

The session has separate SQLConf, registered tmp views, UDFs. However, SparkContext and tablecache is shared.


Set session config at runtime

Set all spark/hadoop configs relevant to Spark SQL. When fetching a config key's value, defaults to the value set in underlying SparkContext (if it is already set).

spark.conf.set("key", "value")


Catalog object, through which we can do operations on underlying db, tables, functions etc (create, drop alter, query etc)


# eg: list tables

Create dataframe

DF is created via a sparksession

spark.createDataFrame([('Alice', 1)]).collect()

Can create with explicit schema

from pyspark.sql.types import *
schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)

And can create from various sources

# DF from tuple
spark.createDataFrame([('Alice', 1)]).collect()

# df from list of dicts
d = [{'name': 'Alice', 'age': 1}]

# from RDD
rdd = spark.sparkContext.parallelize([('Alice', 1)])
df = spark.createDataFrame(rdd, ['name', 'age'])

# from row instances
from pyspark.sql import Row
Person = Row('name', 'age')
person = r: Person(*r))
df2 = spark.createDataFrame(person)

# from pandas dataframe

# from RDD with schema in DDL formatted string
spark.createDataFrame(rdd, "a: string, b: int").collect()
rdd = row: row[1])
spark.createDataFrame(rdd, "int").collect()


Creates DataFrame with with single pyspark.sql.types.LongType column named id, containing elements in a range from start to end (exclusive) with step value step.

spark.range(start=1, end=7, step=2)
# +---+
# | id|
# +---+
# |  1|
# |  3|
# |  5|
# +---+


Returns a DataFrameReader, that can be used to read data as a DataFrame
# <...DataFrameReader object ...>

This is the interface, through which we read in stuff. Eg:'json').load(d).show()


Read data as a streaming DataFrame

# write stream to console, and stop streaming query after 3 seconds
q = df.writeStream.format("console").start()

I was just having fun, and the fun turned out to be a viable career. Computers are amazing.