- Notifications
You must be signed in to change notification settings - Fork0
A library that provides useful extensions to Apache Spark and PySpark.
License
gr-oss-devops/spark-extension
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
This project provides extensions to theApache Spark project in Scala and Python:
Diff: Adiff transformation and application forDatasets that computes the differences betweentwo datasets, i.e. which rows toadd,delete orchange to get from one dataset to the other.
SortedGroups: AgroupByKey transformation that groups rows by a key while providingasorted iterator for each group. Similar toDataset.groupByKey.flatMapGroups, but with order guaranteesfor the iterator.
Histogram[*]: Ahistogram transformation that computes the histogram DataFrame for a value column.
Global Row Number[*]: AwithRowNumbers transformation that provides the global row number w.r.t.the current order of the Dataset, or any given order. In contrast to the existing SQL functionrow_number, whichrequires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.
Partitioned Writing: ThewritePartitionedBy action writes yourDataset partitioned andefficiently laid out with a single operation.
Inspect Parquet files[*]: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar toparquet-toolsorparquet-cli by reading from a simple Spark data source.This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.
Install Python packages into PySpark job[*]: Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferencesfromgresearch.sparkimport*# using PIPspark.install_pip_package("pandas==1.4.3","pyarrow")spark.install_pip_package("-r","requirements.txt")# using Poetryspark.install_poetry_project("../my-poetry-project/",poetry_python="../venv-poetry/bin/python")
Fluent method call:T.call(transformation: T => R): R: Turns a transformationT => R,that is not part ofT into a fluent method call onT. This allows writing fluent code like:
importuk.co.gresearch._i.doThis() .doThat() .call(transformation) .doMore()
Fluent conditional method call:T.when(condition: Boolean).call(transformation: T => T): T:Perform a transformation fluently only if the given condition is true.This allows writing fluent code like:
importuk.co.gresearch._i.doThis() .doThat() .when(condition).call(transformation) .doMore()
Shortcut for groupBy.as: CallingDataset.groupBy(Column*).as[K, T]should be preferred over callingDataset.groupByKey(V => K) whenever possible. The former allows Catalyst to exploitexisting partitioning and ordering of the Dataset, while the latter hides from Catalyst which columns are used to create the keys.This can have a significant performance penalty.
Details:
The new column-expression-basedgroupByKey[K](Column*) method makes it easier to group by a column expression key. Instead of
ds.groupBy($"id").as[Int, V]use:
ds.groupByKey[Int]($"id")Backticks:backticks(string: String, strings: String*): String): Encloses the given column name with backticks (`) when needed.This is a handy way to ensure column names with special characters like dots (.) work withcol() orselect().
Count null values:count_null(e: Column): an aggregation function likecount that counts null values in columne.This is equivalent to callingcount(when(e.isNull, lit(1))).
.Net DateTime.Ticks[*]: Convert .Net (C#, F#, Visual Basic)DateTime.Ticks into Spark timestamps, seconds and nanoseconds.
Available methods:
// ScaladotNetTicksToTimestamp(Column):Column// returns timestamp as TimestampTypedotNetTicksToUnixEpoch(Column):Column// returns Unix epoch seconds as DecimalTypedotNetTicksToUnixEpochNanos(Column):Column// returns Unix epoch nanoseconds as LongType
The reverse is provided by (all returnLongType .Net ticks):
// ScalatimestampToDotNetTicks(Column):ColumnunixEpochToDotNetTicks(Column):ColumnunixEpochNanosToDotNetTicks(Column):Column
These methods are also available in Python:
# Pythondotnet_ticks_to_timestamp(column_or_name)# returns timestamp as TimestampTypedotnet_ticks_to_unix_epoch(column_or_name)# returns Unix epoch seconds as DecimalTypedotnet_ticks_to_unix_epoch_nanos(column_or_name)# returns Unix epoch nanoseconds as LongTypetimestamp_to_dotnet_ticks(column_or_name)unix_epoch_to_dotnet_ticks(column_or_name)unix_epoch_nanos_to_dotnet_ticks(column_or_name)
Spark temporary directory[*]: Create a temporary directory that will be removed on Spark application shutdown.
Examples:
Scala:
importuk.co.gresearch.spark.createTemporaryDirvaldir= createTemporaryDir("prefix")
Python:
# noinspection PyUnresolvedReferencesfromgresearch.sparkimport*dir=spark.create_temporary_dir("prefix")
Spark job description[*]: Set Spark job description for all Spark jobs within a context.
Examples:
importuk.co.gresearch.spark._implicitvalsession:SparkSession= sparkwithJobDescription("parquet file") {valdf= spark.read.parquet("data.parquet")valcount= appendJobDescription("count") { df.count } appendJobDescription("write") { df.write.csv("data.csv") }}
| Without job description | With job description |
|---|---|
![]() | ![]() |
Note that setting a description in one thread while calling the action (e.g..count) in a different threaddoes not work, unless the different thread is spawned from the current threadafter the description has been set.
Working example with parallel collections:
importjava.util.concurrent.ForkJoinPoolimportscala.collection.parallel.CollectionConverters.seqIsParallelizableimportscala.collection.parallel.ForkJoinTaskSupportvalfiles=Seq("data1.csv","data2.csv").parvalcounts= withJobDescription("Counting rows") {// new thread pool required to spawn new threads from this thread// so that the job description is actually used files.tasksupport=newForkJoinTaskSupport(newForkJoinPool()) files.map(filename=> spark.read.csv(filename).count).sum}(spark)
Thespark-extension package is available for all Spark 3.2, 3.3, 3.4 and 3.5 versions. Some earlier Spark versions may also be supported.The package version has the following semantics:spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}:
SCALA_COMPAT_VERSION: Scala binary compatibility (minor) version. Available are2.12and2.13.SPARK_COMPAT_VERSION: Apache Spark binary compatibility (minor) version. Available are3.2,3.3,3.4and3.5.VERSION: The package version, e.g.2.10.0.
Add this line to yourbuild.sbt file:
libraryDependencies+="uk.co.gresearch.spark"%%"spark-extension"%"2.13.0-3.5"
Add this dependency to yourpom.xml file:
<dependency> <groupId>uk.co.gresearch.spark</groupId> <artifactId>spark-extension_2.12</artifactId> <version>2.13.0-3.5</version></dependency>
Add this dependency to yourbuild.gradle file:
dependencies { implementation"uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5"}Submit your Spark app with the Spark Extension dependency (version ≥1.1.0) as follows:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.
Launch a Spark Shell with the Spark Extension dependency (version ≥1.1.0) as follows:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark Shell version.
Start a PySpark session with the Spark Extension dependency (version ≥1.1.0) as follows:
frompyspark.sqlimportSparkSessionspark=SparkSession \ .builder \ .config("spark.jars.packages","uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5") \ .getOrCreate()
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.
Launch the Python Spark REPL with the Spark Extension dependency (version ≥1.1.0) as follows:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.
Run your Python scripts that use PySpark viaspark-submit:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.
You may want to install thepyspark-extension python package from PyPi into your development environment.This provides you code completion, typing and test capabilities during your development phase.
Running your Python application on a Spark cluster will still require one of the above waysto add the Scala package to the Spark environment.
pip install pyspark-extension==2.13.0.3.5
Note: Pick the right Spark version (here 3.5) depending on your PySpark version.
There are plenty ofData Science notebooks around. To use this library,adda jar dependency to your notebook using theseMaven coordinates:
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5Ordownload the jar and place iton a filesystem where it is accessible by the notebook, and reference that jar file directly.
Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.
Most features are not supportedin Python in conjunction with aSpark Connect server.This also holds for Databricks Runtime environment 13.x and above. Details can be foundin this blog.
Calling any of those features when connected to a Spark Connect server will raise this error:
This feature is not supported for Spark Connect.Use a classic connection to a Spark cluster instead.
You can build this project against different versions of Spark and Scala.
If you want to build for a Spark or Scala version different to what is defined in thepom.xml file, then run
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
For example, switch to Spark 3.5.0 and Scala 2.13.8 by runningsh set-version.sh 3.5.0 2.13.8.
Then executemvn package to create a jar from the sources. It can be found intarget/.
Run the Scala tests viamvn test.
In order to run the Python tests, setup a Python environment as follows:
virtualenv -p python3 venvsource venv/bin/activatepip install python/[test]Run the Python tests viaenv PYTHONPATH=python/test python -m pytest python/test.
Run the following commands in the project root directory to create a whl from the sources:
pip install buildpython -m build python/
It can be found inpython/dist/.
- Guaranteeing in-partition order for partitioned-writing in Apache Spark, Enrico Minack, 20/01/2023:
https://www.gresearch.com/blog/article/guaranteeing-in-partition-order-for-partitioned-writing-in-apache-spark/ - Un-pivot, sorted groups and many bug fixes: Celebrating the first Spark 3.4 release, Enrico Minack, 21/03/2023:
https://www.gresearch.com/blog/article/un-pivot-sorted-groups-and-many-bug-fixes-celebrating-the-first-spark-3-4-release/ - A PySpark bug makes co-grouping with window function partition-key-order-sensitive, Enrico Minack, 29/03/2023:
https://www.gresearch.com/blog/article/a-pyspark-bug-makes-co-grouping-with-window-function-partition-key-order-sensitive/ - Spark’s groupByKey should be avoided – and here’s why, Enrico Minack, 13/06/2023:
https://www.gresearch.com/blog/article/sparks-groupbykey-should-be-avoided-and-heres-why/ - Inspecting Parquet files with Spark, Enrico Minack, 28/07/2023:
https://www.gresearch.com/blog/article/parquet-files-know-your-scaling-limits/ - Enhancing Spark’s UI with Job Descriptions, Enrico Minack, 12/12/2023:
https://www.gresearch.com/blog/article/enhancing-sparks-ui-with-job-descriptions/ - PySpark apps with dependencies: Managing Python dependencies in code, Enrico Minack, 24/01/2024:
https://www.gresearch.com/news/pyspark-apps-with-dependencies-managing-python-dependencies-in-code/ - Observing Spark Aggregates: Cheap Metrics from Datasets, Enrico Minack, 06/02/2024:
https://www.gresearch.com/news/observing-spark-aggregates-cheap-metrics-from-datasets-2/
Please see oursecurity policy for details on reporting security vulnerabilities.
About
A library that provides useful extensions to Apache Spark and PySpark.
Resources
License
Security policy
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Languages
- Scala64.9%
- Python29.5%
- Java3.6%
- Shell2.0%

