Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9fe453e

Browse files
authored
Merge pull requestjleetutorial#2 from jleetutorial/pedromb-scala_to_python
Scala to Python - sparkSql folder
2 parents9d9066c +3aeb5d8 commit9fe453e

14 files changed

+175
-263
lines changed

‎commons/Utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
importre
22

33
classUtils():
4-
5-
COMMA_DELIMITER=re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''')
4+
5+
COMMA_DELIMITER=re.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')

‎commons/Utils.scala

Lines changed: 0 additions & 6 deletions
This file was deleted.

‎sparkSql/HousePriceProblem.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
if__name__=="__main__":
2+
3+
'''
4+
Create a Spark program to read the house data from in/RealEstate.csv,
5+
group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft.
6+
7+
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
8+
around it. 
9+
10+
The dataset contains the following fields:
11+
1. MLS: Multiple listing service number for the house (unique ID).
12+
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and
13+
northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there
14+
some out of area locations as well.
15+
3. Price: the most recent listing price of the house (in dollars).
16+
4. Bedrooms: number of bedrooms.
17+
5. Bathrooms: number of bathrooms.
18+
6. Size: size of the house in square feet.
19+
7. Price/SQ.ft: price of the house per square foot.
20+
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular.
21+
22+
Each field is comma separated.
23+
24+
Sample output:
25+
26+
+----------------+-----------------+
27+
| Location| avg(Price SQ Ft)|
28+
+----------------+-----------------+
29+
| Oceano| 95.0|
30+
| Bradley| 206.0|
31+
| San Luis Obispo| 359.0|
32+
| Santa Ynez| 491.4|
33+
| Cayucos| 887.0|
34+
|................|.................|
35+
|................|.................|
36+
|................|.................|
37+
'''
38+

‎sparkSql/HousePriceProblem.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

‎sparkSql/HousePriceSolution.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
frompyspark.sqlimportSparkSession
2+
3+
PRICE_SQ_FT="Price SQ Ft"
4+
5+
if__name__=="__main__":
6+
7+
session=SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate()
8+
session.sparkContext.setLogLevel("ERROR")
9+
realEstate=session.read \
10+
.option("header","true") \
11+
.option("inferSchema",value=True) \
12+
.csv("in/RealEstate.csv")
13+
14+
realEstate.groupBy("Location") \
15+
.avg(PRICE_SQ_FT) \
16+
.orderBy("avg(Price SQ FT)") \
17+
.show()

‎sparkSql/HousePriceSolution.scala

Lines changed: 0 additions & 25 deletions
This file was deleted.

‎sparkSql/RddDataframeConversion.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
frompyspark.sqlimportSparkSession
2+
fromcommons.UtilsimportUtils
3+
4+
defgetColNames(line:str):
5+
splits=Utils.COMMA_DELIMITER.split(line)
6+
return [splits[2],splits[6],splits[9],splits[14]]
7+
8+
defmapResponseRdd(line:str):
9+
splits=Utils.COMMA_DELIMITER.split(line)
10+
double1=Noneifnotsplits[6]elsefloat(splits[6])
11+
double2=Noneifnotsplits[14]elsefloat(splits[14])
12+
returnsplits[2],double1,splits[9],double2
13+
14+
if__name__=="__main__":
15+
16+
session=SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate()
17+
sc=session.sparkContext
18+
sc.setLogLevel("ERROR")
19+
20+
lines=sc.textFile("in/2016-stack-overflow-survey-responses.csv")
21+
22+
colNames=lines \
23+
.filter(lambdaline:Utils.COMMA_DELIMITER.split(line)[2]=="country") \
24+
.map(getColNames)
25+
26+
responseRDD=lines \
27+
.filter(lambdaline:notUtils.COMMA_DELIMITER.split(line)[2]=="country") \
28+
.map(mapResponseRdd)
29+
30+
responseDataFrame=responseRDD.toDF(colNames.collect()[0])
31+
32+
print("=== Print out schema ===")
33+
responseDataFrame.printSchema()
34+
35+
print("=== Print 20 records of responses table ===")
36+
responseDataFrame.show(20)
37+
38+
forresponseinresponseDataFrame.rdd.collect():
39+
print(response)

‎sparkSql/RddDatasetConversion.scala

Lines changed: 0 additions & 42 deletions
This file was deleted.

‎sparkSql/Response.scala

Lines changed: 0 additions & 3 deletions
This file was deleted.

‎sparkSql/StackOverFlowSurvey.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
frompyspark.sqlimportSparkSession
2+
3+
AGE_MIDPOINT="age_midpoint"
4+
SALARY_MIDPOINT="salary_midpoint"
5+
SALARY_MIDPOINT_BUCKET="salary_midpoint_bucket"
6+
7+
if__name__=="__main__":
8+
9+
session=SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate()
10+
session.sparkContext.setLogLevel("ERROR")
11+
dataFrameReader=session.read
12+
13+
responses=dataFrameReader \
14+
.option("header","true") \
15+
.option("inferSchema",value=True) \
16+
.csv("in/2016-stack-overflow-survey-responses.csv")
17+
18+
print("=== Print out schema ===")
19+
responses.printSchema()
20+
21+
responseWithSelectedColumns=responses.select("country","occupation",AGE_MIDPOINT,SALARY_MIDPOINT)
22+
23+
print("=== Print the selected columns of the table ===")
24+
responseWithSelectedColumns.show()
25+
26+
print("=== Print records where the response is from Afghanistan ===")
27+
responseWithSelectedColumns.filter(responseWithSelectedColumns["country"]=="Afghanistan").show()
28+
29+
print("=== Print the count of occupations ===")
30+
groupedDataset=responseWithSelectedColumns.groupBy("occupation")
31+
groupedDataset.count().show()
32+
33+
print("=== Print records with average mid age less than 20 ===")
34+
responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT]<20).show()
35+
36+
print("=== Print the result by salary middle point in descending order ===")
37+
responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT],ascending=False).show()
38+
39+
print("=== Group by country and aggregate by average salary middle point ===")
40+
datasetGroupByCountry=responseWithSelectedColumns.groupBy("country")
41+
datasetGroupByCountry.avg(SALARY_MIDPOINT).show()
42+
43+
responseWithSalaryBucket=responses.withColumn(SALARY_MIDPOINT_BUCKET,
44+
((responses[SALARY_MIDPOINT]/20000).cast("integer")*20000))
45+
46+
print("=== With salary bucket column ===")
47+
responseWithSalaryBucket.select(SALARY_MIDPOINT,SALARY_MIDPOINT_BUCKET).show()
48+
49+
print("=== Group by salary bucket ===")
50+
responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy(SALARY_MIDPOINT_BUCKET).show()
51+
52+
session.stop()

‎sparkSql/StackOverFlowSurvey.scala

Lines changed: 0 additions & 60 deletions
This file was deleted.

‎sparkSql/TypedDataset.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp