9
9
import org .apache .spark .api .java .JavaSparkContext ;
10
10
import org .apache .spark .api .java .function .Function ;
11
11
import org .apache .spark .api .java .function .Function2 ;
12
- import org .apache .spark .api .java .function .PairFunction ;
13
12
import scala .Tuple2 ;
14
13
15
14
import java .util .Map ;
@@ -23,33 +22,26 @@ public static void main(String[] args) throws Exception {
23
22
JavaSparkContext sc =new JavaSparkContext (conf );
24
23
25
24
JavaRDD <String >lines =sc .textFile ("in/RealEstate.csv" );
26
-
27
25
JavaRDD <String >cleanedLines =lines .filter (line -> !line .contains ("Bedrooms" ));
28
26
29
27
JavaPairRDD <String ,Double >housePricePairRdd =cleanedLines .mapToPair (
30
- (PairFunction <String ,String ,Double >)line ->
31
- new Tuple2 <>(line .split ("," )[3 ],
32
- Double .parseDouble (line .split ("," )[2 ])));
28
+ line ->new Tuple2 <>(line .split ("," )[3 ],
29
+ Double .parseDouble (line .split ("," )[2 ])));
33
30
34
31
JavaPairRDD <String ,AvgCount >housePriceTotal =housePricePairRdd .combineByKey (createCombiner ,mergeValue ,mergeCombiners );
35
32
36
- JavaPairRDD <String ,Double >housePriceAvg =housePriceTotal .mapToPair (
37
- (PairFunction <Tuple2 <String ,AvgCount >,String ,Double >)total ->
38
- new Tuple2 <>(total ._1 (),total ._2 ().getTotal ()/total ._2 ().getCount ()));
33
+ JavaPairRDD <String ,Double >housePriceAvg =housePriceTotal .mapValues (avgCount ->avgCount .getTotal ()/avgCount .getCount ());
39
34
40
35
for (Map .Entry <String ,Double >housePriceAvgPair :housePriceAvg .collectAsMap ().entrySet ()) {
41
36
System .out .println (housePriceAvgPair .getKey () +" : " +housePriceAvgPair .getValue ());
42
-
43
37
}
44
38
}
45
39
46
- static Function <Double ,AvgCount >createCombiner = (Function <Double ,AvgCount >)x ->new AvgCount (1 ,x );
47
-
48
- static Function2 <AvgCount ,Double ,AvgCount >mergeValue
49
- = (Function2 <AvgCount ,Double ,AvgCount >) (avgCount ,x ) ->new AvgCount (avgCount .getCount () +1 ,
50
- avgCount .getTotal () +x );
51
- static Function2 <AvgCount ,AvgCount ,AvgCount >mergeCombiners
52
- = (Function2 <AvgCount ,AvgCount ,AvgCount >) (avgCountA ,avgCountB ) ->new AvgCount (avgCountA .getCount () +avgCountB .getCount (),
53
- avgCountA .getTotal () +avgCountB .getTotal ());
40
+ static Function <Double ,AvgCount >createCombiner =x ->new AvgCount (1 ,x );
54
41
42
+ static Function2 <AvgCount ,Double ,AvgCount >mergeValue = (avgCount ,x ) ->new AvgCount (avgCount .getCount () +1 ,
43
+ avgCount .getTotal () +x );
44
+ static Function2 <AvgCount ,AvgCount ,AvgCount >mergeCombiners =
45
+ (avgCountA ,avgCountB ) ->new AvgCount (avgCountA .getCount () +avgCountB .getCount (),
46
+ avgCountA .getTotal () +avgCountB .getTotal ());
55
47
}