May 23, 2023
The original reference isn't fast enough for my use
Collection <- Current phase
Re-organization
Editing
Polishing
Note: the spark api has a lot of baggage. There is always more than one way to do the same thing.
However, there are two distinct styles of getting something done: 1. Directly using a function and setting its arguments 2. Chaining a function, and progressively setting options Example:
# READING A FILE
# direct
df = spark.read.csv(path="/data/data.csv", nullValue="undefined")
df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
# progressive
df = spark.read.option("nullValue", "undefined").format('csv').load("/data/data.csv")
# WRITING A FILE
# direct
df.write.csv("/data/data.csv", mode="overwrite")
# progressive
df.write.mode("overwrite").format("csv").save("/data/data.csv")
I prefer to use the direct way of defining the operation. As we are simply setting arguments to a function, autocomplete works very well with it. And it is pythonic.
In the indirect method, we are setting strings most of the time (especially when using the options()
method in which we purely use strings in the function chains.). This leads to mistakes, and also demands a lot of memorization.
You can notice the things that creep in when a framework of one language is operated through other languages.
Look at the spark.read.csv()
vs spark.read.load()
. You can see that the csv()
is a sharp and dedicated function, while the load()
function is designed to handle all data formats internally. The load()
function has a distinct java/scala style to it (the factory style of programming)
Also, the progressive style of chaining functions has a distinctive functional programming feel to it. We don't usually write python in this style.
Readcsv into df
A particular CSV, or all CSVs in a directory
df = spark.read.csv("/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.csv("/dir/path_with_many_csv/") # all
arguments: * delimiter="," * inferSchema='True' (default false) * header='True' (read first line as col name) * nullValue='unknown' (consider a string as null)
# Read with user defined schema
schema = StructType().add("name", StringType(), True).add("age", IntegerType(), True)
df = spark.read.csv("path_to_csv", schema=)
Write df to csv
df.write.csv("/dir/path_with_many_csv/", mode="<mode>")
Where mode= * overwrite * append * ignore * error
Table into dataframe
df = spark.read.jdbc(url="", table="", properties={})
# properties are jdbc connection arguments
# example:
properties = { ‘user’ : ‘SYSTEM’, ‘password’ : ‘mypassword’ }
How to parallelize?
From the documentation: Partitions of the table will be retrieved in parallel if either column
or predicates
is specified. lowerBound
, upperBound
and numPartitions
is needed when column
is specified.
Dataframe into table
<Insert example when I have the time>
params: * table (str) * mode * properties (dict)
Similar to csv, depends on the path you specify. * Entire firectory? give dir path * Single file? give file path * Multiple files? give files path as an array
df = spark.read.json(path="/dir/path_with_many_csv/single_file.csv") # single
df = spark.read.json(path="/dir/path_with_many_csv/") # all
JSON Lines will work by default. This data is never multiline (only one json record per line)
For one record for file json, with the data in multiple lines, set the argument multiline=True
JSON will need user specified schema, to maintain your sanity.
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True),
])
df = spark.read.json(path="<your path>", schema=schema)
Write json file:
Writes in JSON lines format
df.write.json(path="/path/to/output/zipcodes.json", mode="overwrite")
Modes: * append * overwrite * ignore * error or errorifexists
Read
df = spark.read.orc(path="<path to dir, file, files>")
Notable parameters: * mergeSchema (boolean) * recursiveFileLookup * modifiedBefore * modifiedAfter
Write
df.write.orc(path='<path>')
Notable parameters: * partitionBy (str, list of str) - parititon by column name(s)
The parquet api's options are surprisingly limited
Read
spark.read.parquet(path='/data/data.parquet')
Notable parameters: * mergeSchema (bool)
Write
spark.write.parquet(path='/data/data.parquet')
Notable params: * mode (str, optional) {append, overwrite, ignore error/errorifexists} * partitionBy (str or list) - names of cols to partition by
Read Reads every line in the text file as a new row in the dataframe.
Suppose file.txt looks like this:
alphabets
a
b
c
spark.read.schema(df.schema).text("/data/file.txt").sort("alphabets").show()
Write The dataframe must have only one col of string type. Every row becomes a new line in the output file.
df.write.mode("overwrite").text(path="/data/data.txt")
other params: * lineSep (str)
These are weird. Used to set arguments (options) of both read and write functions.
# you can either directly set the nullvalue argument of the function:
spark.read.csv(path="/data/", nullValue="undefined")
# or set arguments via options, through the chained functions style of coding
spark.read.option("nullValue", "undefined").format('parquet').load("/data/data.parquet")
These are just two different styles of programming.
I don't like using options, and prefer to stick to directly setting arguments in functions. (Since we give strings in options, autocomplete doesn't work for them, which leads to mistakes)
You might have noticed it at the end of chained write functions. We specify the path to save into (file or directory)
df.write.mode("overwrite").format("json").save(path="/data/data.json")
In the above example, the mode and format is set using functions in the chain.
You can also set them directly within save()
df.write.save(path="/data/data.json", format="json", mode="overwrite")
Other arguments:
* partitionBy (list)
* **options
(dict) other string options)
This more than one way of doing things makes spark a pain to deal with.
uses partitionBy()
function
df.write.partitionBy(cols="name").mode("overwrite").format("parquet").save("/data/data.parquet")
The argument cols
can be a string or a list of strings
Acts the same way as options. Either set mode as an argument to a function, or set it via the mode()
function.
# you can either directly set the mode argument of the function:
df.write.mode("overwrite").format("parquet").save(d)
# or set mode through the chained functions style of coding
df.write.mode("overwrite").format("parquet").save(d)
Load is a generic function to read various file formats into a dataframe.
df = spark.read.load(path="/data/data.csv", schema=df.schema, format="csv", nullValue="undefined", header=True)
Prefer directly using the dedicated function. Eg spark.read.csv()
instead of spark.read.load(format="csv")
Read a table into a dataframe
spark.read.table(tableName='people').show()
The table used in this function is one loaded via a spark sql function like df.createOrReplaceTempView('people')
write.bucketBy(2, "name").mode("overwrite").saveAsTable("bucketed_table")
Params: * numBuckets (int) * col (str, list, typle) * cols (str). Keep this empty if a list is already provided in cols
Use sortBy() to sort rows in a bucket
write.bucketBy(2, "name").sortBy(col="age").saveAsTable("bucketed_table")
parameters: * col (str, list of str) * cols (str, list of str)
Works only when (schema of dataframe) = (schema of table)
df.selectExpr("age AS col1", "name AS col2").write.insertInto(tableName="tblPeople")
Params: * overwrite (bool). Default is false. Overwrites existing data when true.
df.write.saveAsTable(name="tblA")
Other params:
* format (str)
* mode (str)
* partitionBy (str/list)
* **options
(dict)
Note: In append mode, if the table already exists, saveAsTable()
doesn't need the dataframe's schema to be the same as table's schema. It will find correct column positions through the column names, and insert data into the tables.
The documentation of table stuff SUCKS. Maybe because it is legacy stuff
create
create new table from contents of dataframe. The new table’s schema, partition layout, properties, and other configuration will be based on the configuration set on this writer.
replace
Replace an existing table with the contents of the data frame.
The existing table’s schema, partition layout, properties, and other configuration will be replaced with the contents of the data frame and the configuration set on this writer.
createOrReplace
Create a new table or replace an existing table with the contents of the data frame.
append
overwrite
Overwrite rows matching the given filter condition with the contents of the data frame in the output table.
overwritePartitions
Overwrite all partition for which the data frame contains at least one row with the contents of the data frame in the output table.
tableProperty
[https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.tableProperty.html)
partitionedBy
# set app name (shown in spark ui)
SparkSession.builder.appName("Your App Name")
# set master url
SparkSession.builder.master("local")
# or
SparkSession.builder.master("spark://master:7077")
SparkSession.builder.enableHiveSupport()
# Set spark remote url
SparkSession.builder.remote("sc://localhost")
# set some config options
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())
# will be applied to sparksessions created or fetched via this builder
Checks if a valid global SparkSession exists. * Yes? returns it. Also applies config options specified in the builder to the returned SparkSession. * No? creates new SparkSession and sets it as the global default
session_1 = SparkSession.buildier.getOrCreate()
# can set its config
session_1.conf.set("key_1", "value_1")
# can get its config
session_1.conf.get("key_1")
active_session = SparkSession.getActiveSession()
The session has separate SQLConf, registered tmp views, UDFs. However, SparkContext and tablecache is shared.
spark.newSession()
Set all spark/hadoop configs relevant to Spark SQL. When fetching a config key's value, defaults to the value set in underlying SparkContext (if it is already set).
spark.conf.set("key", "value")
Catalog object, through which we can do operations on underlying db, tables, functions etc (create, drop alter, query etc)
spark.catalog
# eg: list tables
spark.catalog.listTables()
DF is created via a sparksession
spark.createDataFrame([('Alice', 1)]).collect()
Can create with explicit schema
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)
And can create from various sources
# DF from tuple
spark.createDataFrame([('Alice', 1)]).collect()
# df from list of dicts
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()
# from RDD
rdd = spark.sparkContext.parallelize([('Alice', 1)])
df = spark.createDataFrame(rdd, ['name', 'age'])
# from row instances
from pyspark.sql import Row
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df2 = spark.createDataFrame(person)
# from pandas dataframe
spark.createDataFrame(df.toPandas())
# from RDD with schema in DDL formatted string
spark.createDataFrame(rdd, "a: string, b: int").collect()
rdd = rdd.map(lambda row: row[1])
spark.createDataFrame(rdd, "int").collect()
Creates DataFrame with with single pyspark.sql.types.LongType
column named id
, containing elements in a range from start
to end
(exclusive) with step value step
.
spark.range(start=1, end=7, step=2)
# +---+
# | id|
# +---+
# | 1|
# | 3|
# | 5|
# +---+
Returns a DataFrameReader, that can be used to read data as a DataFrame
spark.read
# <...DataFrameReader object ...>
This is the interface, through which we read in stuff. Eg: spark.read.format('json').load(d).show()
Read data as a streaming DataFrame
spark.readStream.format("rate").load()
# write stream to console, and stop streaming query after 3 seconds
q = df.writeStream.format("console").start()
time.sleep(3)
q.stop()
I was just having fun, and the fun turned out to be a viable career. Computers are amazing.