|
7 | 7 | importorg.apache.spark.api.java.JavaPairRDD;
|
8 | 8 | importorg.apache.spark.api.java.JavaRDD;
|
9 | 9 | importorg.apache.spark.api.java.JavaSparkContext;
|
10 |
| -importorg.apache.spark.api.java.function.Function2; |
11 |
| -importorg.apache.spark.api.java.function.PairFunction; |
12 | 10 | importscala.Tuple2;
|
13 | 11 |
|
14 | 12 | importjava.util.Map;
|
15 | 13 |
|
16 | 14 | publicclassAverageHousePriceSolution {
|
17 | 15 |
|
18 | 16 | publicstaticvoidmain(String[]args)throwsException {
|
19 |
| - |
20 | 17 | Logger.getLogger("org").setLevel(Level.ERROR);
|
21 | 18 | SparkConfconf =newSparkConf().setAppName("wordCounts").setMaster("local[3]");
|
22 | 19 | JavaSparkContextsc =newJavaSparkContext(conf);
|
23 | 20 |
|
24 | 21 | JavaRDD<String>lines =sc.textFile("in/RealEstate.csv");
|
25 |
| - |
26 | 22 | JavaRDD<String>cleanedLines =lines.filter(line -> !line.contains("Bedrooms"));
|
27 | 23 |
|
28 | 24 | JavaPairRDD<String,AvgCount>housePricePairRdd =cleanedLines.mapToPair(
|
29 |
| - (PairFunction<String,String,AvgCount>)line -> |
30 |
| -newTuple2<>(line.split(",")[3], |
31 |
| -newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
| 25 | +line ->newTuple2<>(line.split(",")[3], |
| 26 | +newAvgCount(1,Double.parseDouble(line.split(",")[2])))); |
32 | 27 |
|
33 | 28 | JavaPairRDD<String,AvgCount>housePriceTotal =housePricePairRdd.reduceByKey(
|
34 |
| - (Function2<AvgCount,AvgCount,AvgCount>) (x,y) -> |
35 |
| -newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
| 29 | + (x,y) ->newAvgCount(x.getCount() +y.getCount(),x.getTotal() +y.getTotal())); |
36 | 30 |
|
37 | 31 | System.out.println("housePriceTotal: ");
|
38 | 32 | for (Map.Entry<String,AvgCount>housePriceTotalPair :housePriceTotal.collectAsMap().entrySet()) {
|
39 | 33 | System.out.println(housePriceTotalPair.getKey() +" : " +housePriceTotalPair.getValue());
|
40 |
| - |
41 | 34 | }
|
42 | 35 |
|
43 |
| -JavaPairRDD<String,Double>housePriceAvg =housePriceTotal.mapToPair( |
44 |
| - (PairFunction<Tuple2<String,AvgCount>,String,Double>)total -> |
45 |
| -newTuple2<>(total._1(),total._2().getTotal()/total._2().getCount())); |
46 |
| - |
| 36 | +JavaPairRDD<String,Double>housePriceAvg =housePriceTotal.mapValues(avgCount ->avgCount.getTotal()/avgCount.getCount()); |
47 | 37 | System.out.println("housePriceAvg: ");
|
48 | 38 | for (Map.Entry<String,Double>housePriceAvgPair :housePriceAvg.collectAsMap().entrySet()) {
|
49 | 39 | System.out.println(housePriceAvgPair.getKey() +" : " +housePriceAvgPair.getValue());
|
50 | 40 |
|
51 | 41 | }
|
52 | 42 | }
|
53 |
| - |
54 | 43 | }
|