|
1 |
| -packagecom.sparkTutorial.pairRdd.sort.sortbykey; |
| 1 | +packagecom.sparkTutorial.pairRdd.sort; |
2 | 2 |
|
3 | 3 |
|
4 | 4 | importcom.sparkTutorial.pairRdd.aggregation.reducebykey.housePrice.AvgCount;
|
|
8 | 8 | importorg.apache.spark.api.java.JavaPairRDD;
|
9 | 9 | importorg.apache.spark.api.java.JavaRDD;
|
10 | 10 | importorg.apache.spark.api.java.JavaSparkContext;
|
11 |
| -importorg.apache.spark.api.java.function.Function2; |
12 |
| -importorg.apache.spark.api.java.function.PairFunction; |
13 | 11 | importscala.Tuple2;
|
14 | 12 |
|
15 |
| -importjava.util.Map; |
16 |
| - |
17 | 13 | publicclassAverageHousePriceSolution {
|
18 | 14 |
|
19 | 15 | publicstaticvoidmain(String[]args)throwsException {
|
20 |
| - |
21 | 16 | Logger.getLogger("org").setLevel(Level.ERROR);
|
22 | 17 | SparkConfconf =newSparkConf().setAppName("wordCounts").setMaster("local[3]");
|
23 | 18 | JavaSparkContextsc =newJavaSparkContext(conf);
|
24 | 19 |
|
25 | 20 | JavaRDD<String>lines =sc.textFile("in/RealEstate.csv");
|
26 |
| - |
27 | 21 | JavaRDD<String>cleanedLines =lines.filter(line -> !line.contains("Bedrooms"));
|
28 | 22 |
|
29 |
| -JavaPairRDD<String,AvgCount>housePricePairRdd =cleanedLines.mapToPair( |
30 |
| - (PairFunction<String,String,AvgCount>)line -> |
31 |
| -newTuple2<>(line.split(",")[3], |
32 |
| -newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
33 |
| - |
34 |
| -JavaPairRDD<String,AvgCount>housePriceTotal =housePricePairRdd.reduceByKey( |
35 |
| - (Function2<AvgCount,AvgCount,AvgCount>) (x,y) -> |
36 |
| -newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
37 |
| - |
| 23 | +JavaPairRDD<Integer,AvgCount>housePricePairRdd =cleanedLines.mapToPair( |
| 24 | +line ->newTuple2<>(Integer.valueOf(line.split(",")[3]), |
| 25 | +newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
38 | 26 |
|
39 |
| -JavaPairRDD<Integer,Double>housePriceAvg =housePriceTotal.mapToPair( |
40 |
| - (PairFunction<Tuple2<String,AvgCount>,Integer,Double>)total -> |
41 |
| -newTuple2<>(Integer.valueOf(total._1()),total._2().getTotal()/total._2().getCount())); |
| 27 | +JavaPairRDD<Integer,AvgCount>housePriceTotal =housePricePairRdd.reduceByKey( |
| 28 | + (x,y) ->newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
42 | 29 |
|
| 30 | +JavaPairRDD<Integer,Double>housePriceAvg =housePriceTotal.mapValues(avgCount ->avgCount.getTotal()/avgCount.getCount()); |
43 | 31 |
|
44 | 32 | JavaPairRDD<Integer,Double>sortedHousePriceAvg =housePriceAvg.sortByKey();
|
45 | 33 |
|
|