- Notifications
You must be signed in to change notification settings - Fork181
🐍 Quick reference guide to common patterns & functions in PySpark.
License
NotificationsYou must be signed in to change notification settings
kevinschaich/pyspark-cheatsheet
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
A quick reference guide to the most commonly used patterns and functions in PySpark SQL.
- Quickstart
- Basics
- Common Patterns
- String Operations
- Number Operations
- Date & Timestamp Operations
- Array Operations
- Struct Operations
- Aggregation Operations
- Advanced Operations
- Useful Functions / Tranformations
If you can't find what you're looking for, check out thePySpark Official Documentation and add it here!
Install on macOS:
brew install apache-spark&& pip install pyspark
Create your first DataFrame:
frompyspark.sqlimportSparkSessionspark=SparkSession.builder.getOrCreate()# I/O options: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.htmldf=spark.read.csv('/path/to/your/input/file')
# Show a previewdf.show()# Show preview of first / last n rowsdf.head(5)df.tail(5)# Show preview as JSON (WARNING: in-memory)df=df.limit(10)# optionalprint(json.dumps([row.asDict(recursive=True)forrowindf.collect()],indent=2))# Limit actual DataFrame to n rows (non-deterministic)df=df.limit(5)# Get columnsdf.columns# Get columns + column typesdf.dtypes# Get schemadf.schema# Get row countdf.count()# Get column countlen(df.columns)# Write output to diskdf.write.csv('/path/to/your/output/file')# Get results (WARNING: in-memory) as list of PySpark Rowsdf=df.collect()# Get results (WARNING: in-memory) as list of Python dictsdicts= [row.asDict(recursive=True)forrowindf.collect()]# Convert (WARNING: in-memory) to Pandas DataFramedf=df.toPandas()
# Easily reference these as F.my_function() and T.my_type() belowfrompyspark.sqlimportfunctionsasF,typesasT
# Filter on equals conditiondf=df.filter(df.is_adult=='Y')# Filter on >, <, >=, <= conditiondf=df.filter(df.age>25)# Multiple conditions require parentheses around each conditiondf=df.filter((df.age>25)& (df.is_adult=='Y'))# Compare against a list of allowed valuesdf=df.filter(col('first_name').isin([3,4,7]))# Sort resultsdf=df.orderBy(df.age.asc()))df=df.orderBy(df.age.desc()))
# Left join in another datasetdf=df.join(person_lookup_table,'person_id','left')# Match on different columns in left & right datasetsdf=df.join(other_table,df.id==other_table.person_id,'left')# Match on multiple columnsdf=df.join(other_table, ['first_name','last_name'],'left')
# Add a new static columndf=df.withColumn('status',F.lit('PASS'))# Construct a new dynamic columndf=df.withColumn('full_name',F.when( (df.fname.isNotNull()&df.lname.isNotNull()),F.concat(df.fname,df.lname)).otherwise(F.lit('N/A'))# Pick which columns to keep, optionally rename somedf=df.select('name','age',F.col('dob').alias('date_of_birth'),)# Remove columnsdf=df.drop('mod_dt','mod_username')# Rename a columndf=df.withColumnRenamed('dob','date_of_birth')# Keep all the columns which also occur in another datasetdf=df.select(*(F.col(c)forcindf2.columns))# Batch Rename/Clean Columnsforcolindf.columns:df=df.withColumnRenamed(col,col.lower().replace(' ','_').replace('-','_'))
# Cast a column to a different typedf=df.withColumn('price',df.price.cast(T.DoubleType()))# Replace all nulls with a specific valuedf=df.fillna({'first_name':'Tom','age':0,})# Take the first value that is not nulldf=df.withColumn('last_name',F.coalesce(df.last_name,df.surname,F.lit('N/A')))# Drop duplicate rows in a dataset (distinct)df=df.dropDuplicates()# ordf=df.distinct()# Drop duplicate rows, but consider only specific columnsdf=df.dropDuplicates(['name','height'])# Replace empty strings with null (leave out subset keyword arg to replace in all columns)df=df.replace({"":None},subset=["name"])# Convert Python/PySpark/NumPy NaN operator to nulldf=df.replace(float("nan"),None)
# Contains - col.contains(string)df=df.filter(df.name.contains('o'))# Starts With - col.startswith(string)df=df.filter(df.name.startswith('Al'))# Ends With - col.endswith(string)df=df.filter(df.name.endswith('ice'))# Is Null - col.isNull()df=df.filter(df.is_adult.isNull())# Is Not Null - col.isNotNull()df=df.filter(df.first_name.isNotNull())# Like - col.like(string_with_sql_wildcards)df=df.filter(df.name.like('Al%'))# Regex Like - col.rlike(regex)df=df.filter(df.name.rlike('[A-Z]*ice$'))# Is In List - col.isin(*cols)df=df.filter(df.name.isin('Bob','Mike'))
# Substring - col.substr(startPos, length)df=df.withColumn('short_id',df.id.substr(0,10))# Trim - F.trim(col)df=df.withColumn('name',F.trim(df.name))# Left Pad - F.lpad(col, len, pad)# Right Pad - F.rpad(col, len, pad)df=df.withColumn('id',F.lpad('id',4,'0'))# Left Trim - F.ltrim(col)# Right Trim - F.rtrim(col)df=df.withColumn('id',F.ltrim('id'))# Concatenate - F.concat(*cols)df=df.withColumn('full_name',F.concat('fname',F.lit(' '),'lname'))# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols)df=df.withColumn('full_name',F.concat_ws('-','fname','lname'))# Regex Replace - F.regexp_replace(str, pattern, replacement)[source]df=df.withColumn('id',F.regexp_replace(id,'0F1(.*)','1F1-$1'))# Regex Extract - F.regexp_extract(str, pattern, idx)df=df.withColumn('id',F.regexp_extract(id,'[0-9]*',0))
# Round - F.round(col, scale=0)df=df.withColumn('price',F.round('price',0))# Floor - F.floor(col)df=df.withColumn('price',F.floor('price'))# Ceiling - F.ceil(col)df=df.withColumn('price',F.ceil('price'))# Absolute Value - F.abs(col)df=df.withColumn('price',F.abs('price'))# X raised to power Y – F.pow(x, y)df=df.withColumn('exponential_growth',F.pow('x','y'))# Select smallest value out of multiple columns – F.least(*cols)df=df.withColumn('least',F.least('subtotal','total'))# Select largest value out of multiple columns – F.greatest(*cols)df=df.withColumn('greatest',F.greatest('subtotal','total'))
# Add a column with the current datedf=df.withColumn('current_date',F.current_date())# Convert a string of known format to a date (excludes time information)df=df.withColumn('date_of_birth',F.to_date('date_of_birth','yyyy-MM-dd'))# Convert a string of known format to a timestamp (includes time information)df=df.withColumn('time_of_birth',F.to_timestamp('time_of_birth','yyyy-MM-dd HH:mm:ss'))# Get year from date: F.year(col)# Get month from date: F.month(col)# Get day from date: F.dayofmonth(col)# Get hour from date: F.hour(col)# Get minute from date: F.minute(col)# Get second from date: F.second(col)df=df.filter(F.year('date_of_birth')==F.lit('2017'))# Add & subtract daysdf=df.withColumn('three_days_after',F.date_add('date_of_birth',3))df=df.withColumn('three_days_before',F.date_sub('date_of_birth',3))# Add & Subtract monthsdf=df.withColumn('next_month',F.add_month('date_of_birth',1))# Get number of days between two datesdf=df.withColumn('days_between',F.datediff('start','end'))# Get number of months between two datesdf=df.withColumn('months_between',F.months_between('start','end'))# Keep only rows where date_of_birth is between 2017-05-10 and 2018-07-21df=df.filter( (F.col('date_of_birth')>=F.lit('2017-05-10'))& (F.col('date_of_birth')<=F.lit('2018-07-21')))
# Column Array - F.array(*cols)df=df.withColumn('full_name',F.array('fname','lname'))# Empty Array - F.array(*cols)df=df.withColumn('empty_array_column',F.array([]))# Get element at index – col.getItem(n)df=df.withColumn('first_element',F.col("my_array").getItem(0))# Array Size/Length – F.size(col)df=df.withColumn('array_length',F.size('my_array'))# Flatten Array – F.flatten(col)df=df.withColumn('flattened',F.flatten('my_array'))# Unique/Distinct Elements – F.array_distinct(col)df=df.withColumn('unique_elements',F.array_distinct('my_array'))# Map over & transform array elements – F.transform(col, func: col -> col)df=df.withColumn('elem_ids',F.transform(F.col('my_array'),lambdax:x.getField('id')))# Return a row per array element – F.explode(col)df=df.select(F.explode('my_array'))
# Make a new Struct column (similar to Python's `dict()`) – F.struct(*cols)df=df.withColumn('my_struct',F.struct(F.col('col_a'),F.col('col_b')))# Get item from struct by key – col.getField(str)df=df.withColumn('col_a',F.col('my_struct').getField('col_a'))
# Row Count: F.count()# Sum of Rows in Group: F.sum(*cols)# Mean of Rows in Group: F.mean(*cols)# Max of Rows in Group: F.max(*cols)# Min of Rows in Group: F.min(*cols)# First Row in Group: F.alias(*cols)df=df.groupBy('gender').agg(F.max('age').alias('max_age_by_gender'))# Collect a Set of all Rows in Group: F.collect_set(col)# Collect a List of all Rows in Group: F.collect_list(col)df=df.groupBy('age').agg(F.collect_set('name').alias('person_names'))# Just take the lastest row for each combination (Window Functions)frompyspark.sqlimportWindowasWwindow=W.partitionBy("first_name","last_name").orderBy(F.desc("date"))df=df.withColumn("row_number",F.row_number().over(window))df=df.filter(F.col("row_number")==1)df=df.drop("row_number")
# Repartition – df.repartition(num_output_partitions)df=df.repartition(1)
# Multiply each row's age column by twotimes_two_udf=F.udf(lambdax:x*2)df=df.withColumn('age',times_two_udf(df.age))# Randomly choose a value to use as a row's nameimportrandomrandom_name_udf=F.udf(lambda:random.choice(['Bob','Tom','Amy','Jenna']))df=df.withColumn('name',random_name_udf())
defflatten(df:DataFrame,delimiter="_")->DataFrame:''' Flatten nested struct columns in `df` by one level separated by `delimiter`, i.e.: df = [ {'a': {'b': 1, 'c': 2} } ] df = flatten(df, '_') -> [ {'a_b': 1, 'a_c': 2} ] '''flat_cols= [nameforname,typeindf.dtypesifnottype.startswith("struct")]nested_cols= [nameforname,typeindf.dtypesiftype.startswith("struct")]flat_df=df.select(flat_cols+ [F.col(nc+"."+c).alias(nc+delimiter+c)forncinnested_colsforcindf.select(nc+".*").columns] )returnflat_dfdeflookup_and_replace(df1,df2,df1_key,df2_key,df2_value):''' Replace every value in `df1`'s `df1_key` column with the corresponding value `df2_value` from `df2` where `df1_key` matches `df2_key` df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc) '''return (df1 .join(df2[[df2_key,df2_value]],df1[df1_key]==df2[df2_key],'left') .withColumn(df1_key,F.coalesce(F.col(df2_value),F.col(df1_key))) .drop(df2_key) .drop(df2_value) )
About
🐍 Quick reference guide to common patterns & functions in PySpark.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
No releases published
Packages0
No packages published
Uh oh!
There was an error while loading.Please reload this page.