Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit83216e1

Browse files
authored
Merge pull requestjleetutorial#7 from jleetutorial/pedro-changes-path
Added sys path to guarantee imports | Added SparkConf to all files
2 parents3ec564f +ac8e586 commit83216e1

18 files changed

+145
-123
lines changed
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
deffilterResponseFromCanada(response,total,missingSalaryMidPoint):
5-
splits=Utils.COMMA_DELIMITER.split(response)
6-
total.add(1)
7-
ifnotsplits[14]:
8-
missingSalaryMidPoint.add(1)
9-
returnsplits[2]=="Canada"
10-
116
if__name__=="__main__":
12-
sc=SparkContext("local","StackOverFlowSurvey")
13-
sc.setLogLevel("ERROR")
14-
7+
conf=SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
159
total=sc.accumulator(0)
1610
missingSalaryMidPoint=sc.accumulator(0)
17-
1811
responseRDD=sc.textFile("in/2016-stack-overflow-survey-responses.csv")
1912

20-
responseFromCanada=responseRDD.filter(lambdaresponse: \
21-
filterResponseFromCanada(response,total,missingSalaryMidPoint))
13+
deffilterResponseFromCanada(response):
14+
splits=Utils.COMMA_DELIMITER.split(response)
15+
total.add(1)
16+
ifnotsplits[14]:
17+
missingSalaryMidPoint.add(1)
18+
returnsplits[2]=="Canada"
2219

20+
responseFromCanada=responseRDD.filter(filterResponseFromCanada)
2321
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2422
print("Total count of responses: {}".format(total.value))
25-
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))
23+
print("Count of responses missing salary middle point: {}" \
24+
.format(missingSalaryMidPoint.value))

‎advanced/accumulator/StackOverFlowSurveyFollowUp.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
deffilterResponseFromCanada(response,total,missingSalaryMidPoint,processedBytes):
5-
processedBytes.add(len(response.encode('utf-8')))
6-
splits=Utils.COMMA_DELIMITER.split(response)
7-
total.add(1)
8-
ifnotsplits[14]:
9-
missingSalaryMidPoint.add(1)
10-
returnsplits[2]=="Canada"
11-
126
if__name__=="__main__":
13-
sc=SparkContext("local","StackOverFlowSurvey")
14-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
159

1610
total=sc.accumulator(0)
1711
missingSalaryMidPoint=sc.accumulator(0)
1812
processedBytes=sc.accumulator(0)
19-
2013
responseRDD=sc.textFile("in/2016-stack-overflow-survey-responses.csv")
2114

22-
responseFromCanada=responseRDD.filter(lambdaresponse: \
23-
filterResponseFromCanada(response,total,missingSalaryMidPoint,processedBytes))
15+
deffilterResponseFromCanada(response):
16+
processedBytes.add(len(response.encode('utf-8')))
17+
splits=Utils.COMMA_DELIMITER.split(response)
18+
total.add(1)
19+
ifnotsplits[14]:
20+
missingSalaryMidPoint.add(1)
21+
returnsplits[2]=="Canada"
22+
responseFromCanada=responseRDD.filter(filterResponseFromCanada)
2423

2524
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2625
print("Number of bytes processed: {}".format(processedBytes.value))

‎advanced/broadcast/UkMakerSpaces.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
defgetPostPrefix(line:str):
5-
splits=Utils.COMMA_DELIMITER.split(line)
6-
postcode=splits[4]
7-
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
8-
96
defloadPostCodeMap():
107
lines=open("in/uk-postcode.csv","r").read().split("\n")
118
splitsForLines= [Utils.COMMA_DELIMITER.split(line)forlineinlinesifline!=""]
129
return {splits[0]:splits[7]forsplitsinsplitsForLines}
1310

11+
defgetPostPrefix(line:str):
12+
splits=Utils.COMMA_DELIMITER.split(line)
13+
postcode=splits[4]
14+
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
15+
1416
if__name__=="__main__":
15-
sc=SparkContext("local","UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf=SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc=SparkContext(conf=conf)
1719

1820
postCodeMap=sc.broadcast(loadPostCodeMap())
1921

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
defgetPostPrefixes(line:str):
5-
postcode=Utils.COMMA_DELIMITER.split(line)[4]
6-
cleanedPostCode=postcode.replace("\\s+","")
7-
return [cleanedPostCode[0:i]foriinrange(0,len(cleanedPostCode)+1)]
8-
96
defloadPostCodeMap():
107
lines=open("in/uk-postcode.csv","r").read().split("\n")
118
splitsForLines= [Utils.COMMA_DELIMITER.split(line)forlineinlinesifline!=""]
129
return {splits[0]:splits[7]forsplitsinsplitsForLines}
1310

11+
defgetPostPrefix(line:str):
12+
splits=Utils.COMMA_DELIMITER.split(line)
13+
postcode=splits[4]
14+
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
15+
1416
if__name__=="__main__":
15-
sc=SparkContext("local","UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf=SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc=SparkContext(conf=conf)
1719
postCodeMap=loadPostCodeMap()
1820
makerSpaceRdd=sc.textFile("in/uk-makerspaces-identifiable-data.csv")
1921

2022
regions=makerSpaceRdd \
2123
.filter(lambdaline:Utils.COMMA_DELIMITER.split(line)[0]!="Timestamp") \
22-
.map(lambdaline:next((postCodeMap[prefix]forprefixingetPostPrefixes(line) \
23-
ifprefixinpostCodeMap),"Unknow"))
24+
.map(lambdaline:postCodeMap[getPostPrefix(line)] \
25+
ifgetPostPrefix(line)inpostCodeMapelse"Unknow")
2426

2527
forregion,countinregions.countByValue().items():
2628
print("{} : {}".format(region,count))

‎pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
frompysparkimportSparkContext
1+
frompysparkimportSparkContext,SparkConf
22

33
if__name__=="__main__":
4-
5-
sc=SparkContext("local","AverageHousePrice")
6-
sc.setLogLevel("ERROR")
4+
conf=SparkConf().setAppName("AverageHousePrice").setMaster("local")
5+
sc=SparkContext(conf=conf)
76

87
lines=sc.textFile("in/RealEstate.csv")
98
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
4+
frompairRdd.aggregation.reducebykey.housePrice.AvgCountimportAvgCount
25

36
if__name__=="__main__":
4-
5-
sc=SparkContext("local","avgHousePrice")
6-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("avgHousePrice").setMaster("local[3]")
8+
sc=SparkContext(conf=conf)
79

810
lines=sc.textFile("in/RealEstate.csv")
911
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)
1012

1113
housePricePairRdd=cleanedLines.map(lambdaline: \
12-
(line.split(",")[3], (1,float(line.split(",")[2]))))
14+
(line.split(",")[3],AvgCount(1,float(line.split(",")[2]))))
1315

1416
housePriceTotal=housePricePairRdd \
15-
.reduceByKey(lambdax,y:(x[0]+y[0],x[1]+y[1]))
17+
.reduceByKey(lambdax,y:AvgCount(x.count+y.count,x.total+y.total))
1618

1719
print("housePriceTotal: ")
18-
forbedroom,totalinhousePriceTotal.collect():
19-
print("{} :{}".format(bedroom,total))
20+
forbedroom,avgCountinhousePriceTotal.collect():
21+
print("{} :({}, {})".format(bedroom,avgCount.count,avgCount.total))
2022

21-
housePriceAvg=housePriceTotal.mapValues(lambdaavgCount:avgCount[1]/avgCount[0])
23+
housePriceAvg=housePriceTotal.mapValues(lambdaavgCount:avgCount.total/avgCount.count)
2224
print("\nhousePriceAvg: ")
2325
forbedroom,avginhousePriceAvg.collect():
2426
print("{} : {}".format(bedroom,avg))

‎pairRdd/filter/AirportsNotInUsaSolution.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
57

6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
8+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc=SparkContext(conf=conf)
810

911
airportsRDD=sc.textFile("in/airports.text")
1012

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
57

6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
8+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc=SparkContext(conf=conf)
810

911
lines=sc.textFile("in/airports.text")
1012

@@ -15,4 +17,4 @@
1517
airportsByCountry=countryAndAirportNameAndPair.groupByKey()
1618

1719
forcountry,airportNameinairportsByCountry.collectAsMap().items():
18-
print("{}: {}".format(country,list(airportName)))
20+
print("{}: {}".format(country,list(airportName)))

‎pairRdd/mapValues/AirportsUppercaseSolution.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
5-
6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
89

910
airportsRDD=sc.textFile("in/airports.text")
1011

‎pairRdd/sort/AverageHousePriceSolution.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
importsys
2+
sys.path.insert(0,'.')
13
frompairRdd.aggregation.reducebykey.housePrice.AvgCountimportAvgCount
2-
frompysparkimportSparkContext
3-
4+
frompysparkimportSparkContext,SparkConf
45

56
if__name__=="__main__":
6-
7-
sc=SparkContext("local","averageHousePriceSolution")
8-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("averageHousePriceSolution").setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
99

1010
lines=sc.textFile("in/RealEstate.csv")
1111
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)

‎rdd/airports/AirportsByLatitudeSolution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
defsplitComma(line:str):
57
splits=Utils.COMMA_DELIMITER.split(line)
68
return"{}, {}".format(splits[1],splits[6])
79

810
if__name__=="__main__":
9-
sc=SparkContext("local","airports")
11+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
12+
sc=SparkContext(conf=conf)
1013

1114
airports=sc.textFile("in/airports.text")
1215

‎rdd/airports/AirportsInUsaSolution.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
defsplitComma(line:str):
57
splits=Utils.COMMA_DELIMITER.split(line)
68
return"{}, {}".format(splits[1],splits[2])
79

810
if__name__=="__main__":
9-
sc=SparkContext("local","airports")
11+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
12+
sc=SparkContext(conf=conf)
1013

1114
airports=sc.textFile("in/airports.text")
1215
airportsInUSA=airports.filter(lambdaline :Utils.COMMA_DELIMITER.split(line)[3]=="\"United States\"")

‎rdd/count/CountExample.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
if__name__=="__main__":
44
conf=SparkConf().setAppName("count").setMaster("local[*]")
55
sc=SparkContext(conf=conf)
6+
67
inputWords= ["spark","hadoop","spark","hive","pig","cassandra","hadoop"]
8+
79
wordRdd=sc.parallelize(inputWords)
810
print("Count: {}".format(wordRdd.count()))
11+
912
worldCountByValue=wordRdd.countByValue()
1013
print("CountByValue: ")
1114
forword,countinworldCountByValue.items():

‎sparkSql/HousePriceProblem.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,21 @@
44
Create a Spark program to read the house data from in/RealEstate.csv,
55
group by location, aggregate the average price per SQ Ft and sort by average price per SQ Ft.
66
7-
The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
8-
around it. 
7+
The houses dataset contains a collection of recent real estate listings in 
8+
San Luis Obispo county andaround it. 
99
1010
The dataset contains the following fields:
1111
1. MLS: Multiple listing service number for the house (unique ID).
12-
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and
13-
northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there
14-
some out of area locations as well.
12+
2. Location: city/town where the house is located. Most locations are in 
13+
San Luis Obispo county andnorthern Santa Barbara county (Santa Maria­Orcutt, Lompoc, 
14+
Guadelupe, Los Alamos), but theresome out of area locations as well.
1515
3. Price: the most recent listing price of the house (in dollars).
1616
4. Bedrooms: number of bedrooms.
1717
5. Bathrooms: number of bathrooms.
1818
6. Size: size of the house in square feet.
1919
7. Price/SQ.ft: price of the house per square foot.
20-
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular.
20+
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, 
21+
Foreclosure and Regular.
2122
2223
Each field is comma separated.
2324

‎sparkSql/HousePriceSolution.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
if__name__=="__main__":
66

7-
session=SparkSession.builder.appName("HousePriceSolution").master("local").getOrCreate()
8-
session.sparkContext.setLogLevel("ERROR")
7+
session=SparkSession.builder.appName("HousePriceSolution").master("local[*]").getOrCreate()
8+
99
realEstate=session.read \
1010
.option("header","true") \
1111
.option("inferSchema",value=True) \

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp