99import org .apache .spark .api .java .JavaSparkContext ;
1010import org .apache .spark .api .java .function .Function ;
1111import org .apache .spark .api .java .function .Function2 ;
12- import org .apache .spark .api .java .function .PairFunction ;
1312import scala .Tuple2 ;
1413
1514import java .util .Map ;
@@ -23,33 +22,26 @@ public static void main(String[] args) throws Exception {
2322JavaSparkContext sc =new JavaSparkContext (conf );
2423
2524JavaRDD <String >lines =sc .textFile ("in/RealEstate.csv" );
26-
2725JavaRDD <String >cleanedLines =lines .filter (line -> !line .contains ("Bedrooms" ));
2826
2927JavaPairRDD <String ,Double >housePricePairRdd =cleanedLines .mapToPair (
30- (PairFunction <String ,String ,Double >)line ->
31- new Tuple2 <>(line .split ("," )[3 ],
32- Double .parseDouble (line .split ("," )[2 ])));
28+ line ->new Tuple2 <>(line .split ("," )[3 ],
29+ Double .parseDouble (line .split ("," )[2 ])));
3330
3431JavaPairRDD <String ,AvgCount >housePriceTotal =housePricePairRdd .combineByKey (createCombiner ,mergeValue ,mergeCombiners );
3532
36- JavaPairRDD <String ,Double >housePriceAvg =housePriceTotal .mapToPair (
37- (PairFunction <Tuple2 <String ,AvgCount >,String ,Double >)total ->
38- new Tuple2 <>(total ._1 (),total ._2 ().getTotal ()/total ._2 ().getCount ()));
33+ JavaPairRDD <String ,Double >housePriceAvg =housePriceTotal .mapValues (avgCount ->avgCount .getTotal ()/avgCount .getCount ());
3934
4035for (Map .Entry <String ,Double >housePriceAvgPair :housePriceAvg .collectAsMap ().entrySet ()) {
4136System .out .println (housePriceAvgPair .getKey () +" : " +housePriceAvgPair .getValue ());
42-
4337 }
4438 }
4539
46- static Function <Double ,AvgCount >createCombiner = (Function <Double ,AvgCount >)x ->new AvgCount (1 ,x );
47-
48- static Function2 <AvgCount ,Double ,AvgCount >mergeValue
49- = (Function2 <AvgCount ,Double ,AvgCount >) (avgCount ,x ) ->new AvgCount (avgCount .getCount () +1 ,
50- avgCount .getTotal () +x );
51- static Function2 <AvgCount ,AvgCount ,AvgCount >mergeCombiners
52- = (Function2 <AvgCount ,AvgCount ,AvgCount >) (avgCountA ,avgCountB ) ->new AvgCount (avgCountA .getCount () +avgCountB .getCount (),
53- avgCountA .getTotal () +avgCountB .getTotal ());
40+ static Function <Double ,AvgCount >createCombiner =x ->new AvgCount (1 ,x );
5441
42+ static Function2 <AvgCount ,Double ,AvgCount >mergeValue = (avgCount ,x ) ->new AvgCount (avgCount .getCount () +1 ,
43+ avgCount .getTotal () +x );
44+ static Function2 <AvgCount ,AvgCount ,AvgCount >mergeCombiners =
45+ (avgCountA ,avgCountB ) ->new AvgCount (avgCountA .getCount () +avgCountB .getCount (),
46+ avgCountA .getTotal () +avgCountB .getTotal ());
5547}