|
1 | | -packagecom.sparkTutorial.pairRdd.sort.sortbykey; |
| 1 | +packagecom.sparkTutorial.pairRdd.sort; |
2 | 2 |
|
3 | 3 |
|
4 | 4 | importcom.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount; |
|
8 | 8 | importorg.apache.spark.api.java.JavaPairRDD; |
9 | 9 | importorg.apache.spark.api.java.JavaRDD; |
10 | 10 | importorg.apache.spark.api.java.JavaSparkContext; |
11 | | -importorg.apache.spark.api.java.function.Function2; |
12 | | -importorg.apache.spark.api.java.function.PairFunction; |
13 | 11 | importscala.Tuple2; |
14 | 12 |
|
15 | | -importjava.util.Map; |
16 | | - |
17 | 13 | publicclassAverageHousePriceSolution { |
18 | 14 |
|
19 | 15 | publicstaticvoidmain(String[]args)throwsException { |
20 | | - |
21 | 16 | Logger.getLogger("org").setLevel(Level.ERROR); |
22 | 17 | SparkConfconf =newSparkConf().setAppName("wordCounts").setMaster("local[3]"); |
23 | 18 | JavaSparkContextsc =newJavaSparkContext(conf); |
24 | 19 |
|
25 | 20 | JavaRDD<String>lines =sc.textFile("in/RealEstate.csv"); |
26 | | - |
27 | 21 | JavaRDD<String>cleanedLines =lines.filter(line -> !line.contains("Bedrooms")); |
28 | 22 |
|
29 | | -JavaPairRDD<String,AvgCount>housePricePairRdd =cleanedLines.mapToPair( |
30 | | - (PairFunction<String,String,AvgCount>)line -> |
31 | | -newTuple2<>(line.split(",")[3], |
32 | | -newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
33 | | - |
34 | | -JavaPairRDD<String,AvgCount>housePriceTotal =housePricePairRdd.reduceByKey( |
35 | | - (Function2<AvgCount,AvgCount,AvgCount>) (x,y) -> |
36 | | -newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
37 | | - |
| 23 | +JavaPairRDD<Integer,AvgCount>housePricePairRdd =cleanedLines.mapToPair( |
| 24 | +line ->newTuple2<>(Integer.valueOf(line.split(",")[3]), |
| 25 | +newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
38 | 26 |
|
39 | | -JavaPairRDD<Integer,Double>housePriceAvg =housePriceTotal.mapToPair( |
40 | | - (PairFunction<Tuple2<String,AvgCount>,Integer,Double>)total -> |
41 | | -newTuple2<>(Integer.valueOf(total._1()),total._2().getTotal()/total._2().getCount())); |
| 27 | +JavaPairRDD<Integer,AvgCount>housePriceTotal =housePricePairRdd.reduceByKey( |
| 28 | + (x,y) ->newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
42 | 29 |
|
| 30 | +JavaPairRDD<Integer,Double>housePriceAvg =housePriceTotal.mapValues(avgCount ->avgCount.getTotal()/avgCount.getCount()); |
43 | 31 |
|
44 | 32 | JavaPairRDD<Integer,Double>sortedHousePriceAvg =housePriceAvg.sortByKey(); |
45 | 33 |
|
|