Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit83216e1

Browse files
authored
Merge pull requestjleetutorial#7 from jleetutorial/pedro-changes-path
Added sys path to guarantee imports | Added SparkConf to all files
2 parents3ec564f +ac8e586 commit83216e1

18 files changed

+145
-123
lines changed
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
deffilterResponseFromCanada(response,total,missingSalaryMidPoint):
5-
splits=Utils.COMMA_DELIMITER.split(response)
6-
total.add(1)
7-
ifnotsplits[14]:
8-
missingSalaryMidPoint.add(1)
9-
returnsplits[2]=="Canada"
10-
116
if__name__=="__main__":
12-
sc=SparkContext("local","StackOverFlowSurvey")
13-
sc.setLogLevel("ERROR")
14-
7+
conf=SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
159
total=sc.accumulator(0)
1610
missingSalaryMidPoint=sc.accumulator(0)
17-
1811
responseRDD=sc.textFile("in/2016-stack-overflow-survey-responses.csv")
1912

20-
responseFromCanada=responseRDD.filter(lambdaresponse: \
21-
filterResponseFromCanada(response,total,missingSalaryMidPoint))
13+
deffilterResponseFromCanada(response):
14+
splits=Utils.COMMA_DELIMITER.split(response)
15+
total.add(1)
16+
ifnotsplits[14]:
17+
missingSalaryMidPoint.add(1)
18+
returnsplits[2]=="Canada"
2219

20+
responseFromCanada=responseRDD.filter(filterResponseFromCanada)
2321
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2422
print("Total count of responses: {}".format(total.value))
25-
print("Count of responses missing salary middle point: {}".format(missingSalaryMidPoint.value))
23+
print("Count of responses missing salary middle point: {}" \
24+
.format(missingSalaryMidPoint.value))

‎advanced/accumulator/StackOverFlowSurveyFollowUp.py‎

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
deffilterResponseFromCanada(response,total,missingSalaryMidPoint,processedBytes):
5-
processedBytes.add(len(response.encode('utf-8')))
6-
splits=Utils.COMMA_DELIMITER.split(response)
7-
total.add(1)
8-
ifnotsplits[14]:
9-
missingSalaryMidPoint.add(1)
10-
returnsplits[2]=="Canada"
11-
126
if__name__=="__main__":
13-
sc=SparkContext("local","StackOverFlowSurvey")
14-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName('StackOverFlowSurvey').setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
159

1610
total=sc.accumulator(0)
1711
missingSalaryMidPoint=sc.accumulator(0)
1812
processedBytes=sc.accumulator(0)
19-
2013
responseRDD=sc.textFile("in/2016-stack-overflow-survey-responses.csv")
2114

22-
responseFromCanada=responseRDD.filter(lambdaresponse: \
23-
filterResponseFromCanada(response,total,missingSalaryMidPoint,processedBytes))
15+
deffilterResponseFromCanada(response):
16+
processedBytes.add(len(response.encode('utf-8')))
17+
splits=Utils.COMMA_DELIMITER.split(response)
18+
total.add(1)
19+
ifnotsplits[14]:
20+
missingSalaryMidPoint.add(1)
21+
returnsplits[2]=="Canada"
22+
responseFromCanada=responseRDD.filter(filterResponseFromCanada)
2423

2524
print("Count of responses from Canada: {}".format(responseFromCanada.count()))
2625
print("Number of bytes processed: {}".format(processedBytes.value))

‎advanced/broadcast/UkMakerSpaces.py‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
defgetPostPrefix(line:str):
5-
splits=Utils.COMMA_DELIMITER.split(line)
6-
postcode=splits[4]
7-
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
8-
96
defloadPostCodeMap():
107
lines=open("in/uk-postcode.csv","r").read().split("\n")
118
splitsForLines= [Utils.COMMA_DELIMITER.split(line)forlineinlinesifline!=""]
129
return {splits[0]:splits[7]forsplitsinsplitsForLines}
1310

11+
defgetPostPrefix(line:str):
12+
splits=Utils.COMMA_DELIMITER.split(line)
13+
postcode=splits[4]
14+
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
15+
1416
if__name__=="__main__":
15-
sc=SparkContext("local","UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf=SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc=SparkContext(conf=conf)
1719

1820
postCodeMap=sc.broadcast(loadPostCodeMap())
1921

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

4-
defgetPostPrefixes(line:str):
5-
postcode=Utils.COMMA_DELIMITER.split(line)[4]
6-
cleanedPostCode=postcode.replace("\\s+","")
7-
return [cleanedPostCode[0:i]foriinrange(0,len(cleanedPostCode)+1)]
8-
96
defloadPostCodeMap():
107
lines=open("in/uk-postcode.csv","r").read().split("\n")
118
splitsForLines= [Utils.COMMA_DELIMITER.split(line)forlineinlinesifline!=""]
129
return {splits[0]:splits[7]forsplitsinsplitsForLines}
1310

11+
defgetPostPrefix(line:str):
12+
splits=Utils.COMMA_DELIMITER.split(line)
13+
postcode=splits[4]
14+
returnNoneifnotpostcodeelsepostcode.split(" ")[0]
15+
1416
if__name__=="__main__":
15-
sc=SparkContext("local","UkMakerSpaces")
16-
sc.setLogLevel("ERROR")
17+
conf=SparkConf().setAppName('UkMakerSpaces').setMaster("local[*]")
18+
sc=SparkContext(conf=conf)
1719
postCodeMap=loadPostCodeMap()
1820
makerSpaceRdd=sc.textFile("in/uk-makerspaces-identifiable-data.csv")
1921

2022
regions=makerSpaceRdd \
2123
.filter(lambdaline:Utils.COMMA_DELIMITER.split(line)[0]!="Timestamp") \
22-
.map(lambdaline:next((postCodeMap[prefix]forprefixingetPostPrefixes(line) \
23-
ifprefixinpostCodeMap),"Unknow"))
24+
.map(lambdaline:postCodeMap[getPostPrefix(line)] \
25+
ifgetPostPrefix(line)inpostCodeMapelse"Unknow")
2426

2527
forregion,countinregions.countByValue().items():
2628
print("{} : {}".format(region,count))

‎pairRdd/aggregation/combinebykey/AverageHousePriceSolution.py‎

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
frompysparkimportSparkContext
1+
frompysparkimportSparkContext,SparkConf
22

33
if__name__=="__main__":
4-
5-
sc=SparkContext("local","AverageHousePrice")
6-
sc.setLogLevel("ERROR")
4+
conf=SparkConf().setAppName("AverageHousePrice").setMaster("local")
5+
sc=SparkContext(conf=conf)
76

87
lines=sc.textFile("in/RealEstate.csv")
98
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
4+
frompairRdd.aggregation.reducebykey.housePrice.AvgCountimportAvgCount
25

36
if__name__=="__main__":
4-
5-
sc=SparkContext("local","avgHousePrice")
6-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("avgHousePrice").setMaster("local[3]")
8+
sc=SparkContext(conf=conf)
79

810
lines=sc.textFile("in/RealEstate.csv")
911
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)
1012

1113
housePricePairRdd=cleanedLines.map(lambdaline: \
12-
(line.split(",")[3], (1,float(line.split(",")[2]))))
14+
(line.split(",")[3],AvgCount(1,float(line.split(",")[2]))))
1315

1416
housePriceTotal=housePricePairRdd \
15-
.reduceByKey(lambdax,y:(x[0]+y[0],x[1]+y[1]))
17+
.reduceByKey(lambdax,y:AvgCount(x.count+y.count,x.total+y.total))
1618

1719
print("housePriceTotal: ")
18-
forbedroom,totalinhousePriceTotal.collect():
19-
print("{} :{}".format(bedroom,total))
20+
forbedroom,avgCountinhousePriceTotal.collect():
21+
print("{} :({}, {})".format(bedroom,avgCount.count,avgCount.total))
2022

21-
housePriceAvg=housePriceTotal.mapValues(lambdaavgCount:avgCount[1]/avgCount[0])
23+
housePriceAvg=housePriceTotal.mapValues(lambdaavgCount:avgCount.total/avgCount.count)
2224
print("\nhousePriceAvg: ")
2325
forbedroom,avginhousePriceAvg.collect():
2426
print("{} : {}".format(bedroom,avg))

‎pairRdd/filter/AirportsNotInUsaSolution.py‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
57

6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
8+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc=SparkContext(conf=conf)
810

911
airportsRDD=sc.textFile("in/airports.text")
1012

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
57

6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
8+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
9+
sc=SparkContext(conf=conf)
810

911
lines=sc.textFile("in/airports.text")
1012

@@ -15,4 +17,4 @@
1517
airportsByCountry=countryAndAirportNameAndPair.groupByKey()
1618

1719
forcountry,airportNameinairportsByCountry.collectAsMap().items():
18-
print("{}: {}".format(country,list(airportName)))
20+
print("{}: {}".format(country,list(airportName)))

‎pairRdd/mapValues/AirportsUppercaseSolution.py‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
frompysparkimportSparkContext
1+
importsys
2+
sys.path.insert(0,'.')
3+
frompysparkimportSparkContext,SparkConf
24
fromcommons.UtilsimportUtils
35

46
if__name__=="__main__":
5-
6-
sc=SparkContext("local","airports")
7-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("airports").setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
89

910
airportsRDD=sc.textFile("in/airports.text")
1011

‎pairRdd/sort/AverageHousePriceSolution.py‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
importsys
2+
sys.path.insert(0,'.')
13
frompairRdd.aggregation.reducebykey.housePrice.AvgCountimportAvgCount
2-
frompysparkimportSparkContext
3-
4+
frompysparkimportSparkContext,SparkConf
45

56
if__name__=="__main__":
6-
7-
sc=SparkContext("local","averageHousePriceSolution")
8-
sc.setLogLevel("ERROR")
7+
conf=SparkConf().setAppName("averageHousePriceSolution").setMaster("local[*]")
8+
sc=SparkContext(conf=conf)
99

1010
lines=sc.textFile("in/RealEstate.csv")
1111
cleanedLines=lines.filter(lambdaline:"Bedrooms"notinline)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp