- Notifications
You must be signed in to change notification settings - Fork302
Scala to Python - sparkSql folder#2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
25927ae
e6e21af
4b038de
807e4d2
62d6afd
3aeb5d8
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import re | ||
class Utils(): | ||
Owner
| ||
COMMA_DELIMITER = re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''') |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
if __name__ == "__main__": | ||
''' | ||
Create a Spark program to read the house data from in/RealEstate.csv, | ||
group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft. | ||
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and | ||
around it. | ||
The dataset contains the following fields: | ||
1. MLS: Multiple listing service number for the house (unique ID). | ||
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and | ||
northern Santa Barbara county (Santa MariaOrcutt, Lompoc, Guadelupe, Los Alamos), but there | ||
some out of area locations as well. | ||
3. Price: the most recent listing price of the house (in dollars). | ||
4. Bedrooms: number of bedrooms. | ||
5. Bathrooms: number of bathrooms. | ||
6. Size: size of the house in square feet. | ||
7. Price/SQ.ft: price of the house per square foot. | ||
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular. | ||
Each field is comma separated. | ||
Sample output: | ||
+----------------+-----------------+ | ||
| Location| avg(Price SQ Ft)| | ||
+----------------+-----------------+ | ||
| Oceano| 95.0| | ||
| Bradley| 206.0| | ||
| San Luis Obispo| 359.0| | ||
| Santa Ynez| 491.4| | ||
| Cayucos| 887.0| | ||
|................|.................| | ||
|................|.................| | ||
|................|.................| | ||
''' | ||
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from pyspark.sql import SparkSession | ||
PRICE_SQ_FT = "Price SQ Ft" | ||
if __name__ == "__main__": | ||
session = SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate() | ||
session.sparkContext.setLogLevel("ERROR") | ||
realEstate = session.read \ | ||
.option("header","true") \ | ||
.option("inferSchema", value=True) \ | ||
.csv("in/RealEstate.csv") | ||
realEstate.groupBy("Location") \ | ||
.avg(PRICE_SQ_FT) \ | ||
.orderBy("avg(Price SQ FT)") \ | ||
.show() |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from pyspark.sql import SparkSession | ||
from commons.Utils import Utils | ||
def getColNames(line: str): | ||
splits = Utils.COMMA_DELIMITER.split(line) | ||
return [splits[2], splits[6], splits[9], splits[14]] | ||
def mapResponseRdd(line: str): | ||
splits = Utils.COMMA_DELIMITER.split(line) | ||
double1 = None if not splits[6] else float(splits[6]) | ||
double2 = None if not splits[14] else float(splits[14]) | ||
return splits[2], double1, splits[9], double2 | ||
if __name__ == "__main__": | ||
session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() | ||
sc = session.sparkContext | ||
sc.setLogLevel("ERROR") | ||
lines = sc.textFile("in/2016-stack-overflow-survey-responses.csv") | ||
colNames = lines \ | ||
.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[2] == "country") \ | ||
.map(getColNames) | ||
responseRDD = lines \ | ||
.filter(lambda line: not Utils.COMMA_DELIMITER.split(line)[2] == "country") \ | ||
.map(mapResponseRdd) | ||
responseDataFrame = responseRDD.toDF(colNames.collect()[0]) | ||
print("=== Print out schema ===") | ||
responseDataFrame.printSchema() | ||
print("=== Print 20 records of responses table ===") | ||
responseDataFrame.show(20) | ||
for response in responseDataFrame.rdd.collect(): | ||
print(response) |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from pyspark.sql import SparkSession | ||
AGE_MIDPOINT = "age_midpoint" | ||
SALARY_MIDPOINT = "salary_midpoint" | ||
SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket" | ||
if __name__ == "__main__": | ||
session = SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() | ||
session.sparkContext.setLogLevel("ERROR") | ||
dataFrameReader = session.read | ||
responses = dataFrameReader \ | ||
.option("header", "true") \ | ||
.option("inferSchema", value = True) \ | ||
.csv("in/2016-stack-overflow-survey-responses.csv") | ||
print("=== Print out schema ===") | ||
responses.printSchema() | ||
responseWithSelectedColumns = responses.select("country", "occupation", AGE_MIDPOINT, SALARY_MIDPOINT) | ||
print("=== Print the selected columns of the table ===") | ||
responseWithSelectedColumns.show() | ||
print("=== Print records where the response is from Afghanistan ===") | ||
responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan").show() | ||
print("=== Print the count of occupations ===") | ||
groupedDataset = responseWithSelectedColumns.groupBy("occupation") | ||
groupedDataset.count().show() | ||
print("=== Print records with average mid age less than 20 ===") | ||
responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show() | ||
print("=== Print the result by salary middle point in descending order ===") | ||
responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending=False).show() | ||
print("=== Group by country and aggregate by average salary middle point ===") | ||
datasetGroupByCountry = responseWithSelectedColumns.groupBy("country") | ||
datasetGroupByCountry.avg(SALARY_MIDPOINT).show() | ||
responseWithSalaryBucket = responses.withColumn(SALARY_MIDPOINT_BUCKET, | ||
((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000)) | ||
print("=== With salary bucket column ===") | ||
responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show() | ||
print("=== Group by salary bucket ===") | ||
responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show() | ||
session.stop() |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.