Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

A library that provides useful extensions to Apache Spark and PySpark.

License

NotificationsYou must be signed in to change notification settings

gr-oss-devops/spark-extension

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

This project provides extensions to theApache Spark project in Scala and Python:

Diff: Adiff transformation and application forDatasets that computes the differences betweentwo datasets, i.e. which rows toadd,delete orchange to get from one dataset to the other.

SortedGroups: AgroupByKey transformation that groups rows by a key while providingasorted iterator for each group. Similar toDataset.groupByKey.flatMapGroups, but with order guaranteesfor the iterator.

Histogram[*]: Ahistogram transformation that computes the histogram DataFrame for a value column.

Global Row Number[*]: AwithRowNumbers transformation that provides the global row number w.r.t.the current order of the Dataset, or any given order. In contrast to the existing SQL functionrow_number, whichrequires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.

Partitioned Writing: ThewritePartitionedBy action writes yourDataset partitioned andefficiently laid out with a single operation.

Inspect Parquet files[*]: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar toparquet-toolsorparquet-cli by reading from a simple Spark data source.This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

Install Python packages into PySpark job[*]: Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):

# noinspection PyUnresolvedReferencesfromgresearch.sparkimport*# using PIPspark.install_pip_package("pandas==1.4.3","pyarrow")spark.install_pip_package("-r","requirements.txt")# using Poetryspark.install_poetry_project("../my-poetry-project/",poetry_python="../venv-poetry/bin/python")

Fluent method call:T.call(transformation: T => R): R: Turns a transformationT => R,that is not part ofT into a fluent method call onT. This allows writing fluent code like:

importuk.co.gresearch._i.doThis() .doThat() .call(transformation) .doMore()

Fluent conditional method call:T.when(condition: Boolean).call(transformation: T => T): T:Perform a transformation fluently only if the given condition is true.This allows writing fluent code like:

importuk.co.gresearch._i.doThis() .doThat() .when(condition).call(transformation) .doMore()

Shortcut for groupBy.as: CallingDataset.groupBy(Column*).as[K, T]should be preferred over callingDataset.groupByKey(V => K) whenever possible. The former allows Catalyst to exploitexisting partitioning and ordering of the Dataset, while the latter hides from Catalyst which columns are used to create the keys.This can have a significant performance penalty.

Details:

The new column-expression-basedgroupByKey[K](Column*) method makes it easier to group by a column expression key. Instead of

ds.groupBy($"id").as[Int, V]

use:

ds.groupByKey[Int]($"id")

Backticks:backticks(string: String, strings: String*): String): Encloses the given column name with backticks (`) when needed.This is a handy way to ensure column names with special characters like dots (.) work withcol() orselect().

Count null values:count_null(e: Column): an aggregation function likecount that counts null values in columne.This is equivalent to callingcount(when(e.isNull, lit(1))).

.Net DateTime.Ticks[*]: Convert .Net (C#, F#, Visual Basic)DateTime.Ticks into Spark timestamps, seconds and nanoseconds.

Available methods:
// ScaladotNetTicksToTimestamp(Column):Column// returns timestamp as TimestampTypedotNetTicksToUnixEpoch(Column):Column// returns Unix epoch seconds as DecimalTypedotNetTicksToUnixEpochNanos(Column):Column// returns Unix epoch nanoseconds as LongType

The reverse is provided by (all returnLongType .Net ticks):

// ScalatimestampToDotNetTicks(Column):ColumnunixEpochToDotNetTicks(Column):ColumnunixEpochNanosToDotNetTicks(Column):Column

These methods are also available in Python:

# Pythondotnet_ticks_to_timestamp(column_or_name)# returns timestamp as TimestampTypedotnet_ticks_to_unix_epoch(column_or_name)# returns Unix epoch seconds as DecimalTypedotnet_ticks_to_unix_epoch_nanos(column_or_name)# returns Unix epoch nanoseconds as LongTypetimestamp_to_dotnet_ticks(column_or_name)unix_epoch_to_dotnet_ticks(column_or_name)unix_epoch_nanos_to_dotnet_ticks(column_or_name)

Spark temporary directory[*]: Create a temporary directory that will be removed on Spark application shutdown.

Examples:

Scala:

importuk.co.gresearch.spark.createTemporaryDirvaldir= createTemporaryDir("prefix")

Python:

# noinspection PyUnresolvedReferencesfromgresearch.sparkimport*dir=spark.create_temporary_dir("prefix")

Spark job description[*]: Set Spark job description for all Spark jobs within a context.

Examples:
importuk.co.gresearch.spark._implicitvalsession:SparkSession= sparkwithJobDescription("parquet file") {valdf= spark.read.parquet("data.parquet")valcount= appendJobDescription("count") {    df.count  }  appendJobDescription("write") {    df.write.csv("data.csv")  }}
Without job descriptionWith job description

Note that setting a description in one thread while calling the action (e.g..count) in a different threaddoes not work, unless the different thread is spawned from the current threadafter the description has been set.

Working example with parallel collections:

importjava.util.concurrent.ForkJoinPoolimportscala.collection.parallel.CollectionConverters.seqIsParallelizableimportscala.collection.parallel.ForkJoinTaskSupportvalfiles=Seq("data1.csv","data2.csv").parvalcounts= withJobDescription("Counting rows") {// new thread pool required to spawn new threads from this thread// so that the job description is actually used  files.tasksupport=newForkJoinTaskSupport(newForkJoinPool())  files.map(filename=> spark.read.csv(filename).count).sum}(spark)

Using Spark Extension

Thespark-extension package is available for all Spark 3.2, 3.3, 3.4 and 3.5 versions. Some earlier Spark versions may also be supported.The package version has the following semantics:spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}:

  • SCALA_COMPAT_VERSION: Scala binary compatibility (minor) version. Available are2.12 and2.13.
  • SPARK_COMPAT_VERSION: Apache Spark binary compatibility (minor) version. Available are3.2,3.3,3.4 and3.5.
  • VERSION: The package version, e.g.2.10.0.

SBT

Add this line to yourbuild.sbt file:

libraryDependencies+="uk.co.gresearch.spark"%%"spark-extension"%"2.13.0-3.5"

Maven

Add this dependency to yourpom.xml file:

<dependency>  <groupId>uk.co.gresearch.spark</groupId>  <artifactId>spark-extension_2.12</artifactId>  <version>2.13.0-3.5</version></dependency>

Gradle

Add this dependency to yourbuild.gradle file:

dependencies {    implementation"uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5"}

Spark Submit

Submit your Spark app with the Spark Extension dependency (version ≥1.1.0) as follows:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.

Spark Shell

Launch a Spark Shell with the Spark Extension dependency (version ≥1.1.0) as follows:

spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark Shell version.

Python

PySpark API

Start a PySpark session with the Spark Extension dependency (version ≥1.1.0) as follows:

frompyspark.sqlimportSparkSessionspark=SparkSession \    .builder \    .config("spark.jars.packages","uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5") \    .getOrCreate()

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.

PySpark REPL

Launch the Python Spark REPL with the Spark Extension dependency (version ≥1.1.0) as follows:

pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your PySpark version.

PySparkspark-submit

Run your Python scripts that use PySpark viaspark-submit:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.5) depending on your Spark version.

PyPi package (local Spark cluster only)

You may want to install thepyspark-extension python package from PyPi into your development environment.This provides you code completion, typing and test capabilities during your development phase.

Running your Python application on a Spark cluster will still require one of the above waysto add the Scala package to the Spark environment.

pip install pyspark-extension==2.13.0.3.5

Note: Pick the right Spark version (here 3.5) depending on your PySpark version.

Your favorite Data Science notebook

There are plenty ofData Science notebooks around. To use this library,adda jar dependency to your notebook using theseMaven coordinates:

uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5

Ordownload the jar and place iton a filesystem where it is accessible by the notebook, and reference that jar file directly.

Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.

Known issues

Spark Connect Server

Most features are not supportedin Python in conjunction with aSpark Connect server.This also holds for Databricks Runtime environment 13.x and above. Details can be foundin this blog.

Calling any of those features when connected to a Spark Connect server will raise this error:

This feature is not supported for Spark Connect.

Use a classic connection to a Spark cluster instead.

Build

You can build this project against different versions of Spark and Scala.

Switch Spark and Scala version

If you want to build for a Spark or Scala version different to what is defined in thepom.xml file, then run

sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]

For example, switch to Spark 3.5.0 and Scala 2.13.8 by runningsh set-version.sh 3.5.0 2.13.8.

Build the Scala project

Then executemvn package to create a jar from the sources. It can be found intarget/.

Testing

Run the Scala tests viamvn test.

Setup Python environment

In order to run the Python tests, setup a Python environment as follows:

virtualenv -p python3 venvsource venv/bin/activatepip install python/[test]

Run Python tests

Run the Python tests viaenv PYTHONPATH=python/test python -m pytest python/test.

Build Python package

Run the following commands in the project root directory to create a whl from the sources:

pip install buildpython -m build python/

It can be found inpython/dist/.

Publications

Security

Please see oursecurity policy for details on reporting security vulnerabilities.

About

A library that provides useful extensions to Apache Spark and PySpark.

Resources

License

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala64.9%
  • Python29.5%
  • Java3.6%
  • Shell2.0%

[8]ページ先頭

©2009-2025 Movatter.jp