Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4473c72

Browse files
author
James Lee
committed
improve accumulator and broadcast exmaples
1 parent7d67d40 commit4473c72

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

‎src/main/java/com/sparkTutorial/advanced/accumulator/StackOverFlowSurvey.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
packagecom.sparkTutorial.advanced.accumulator;
22

3+
importcom.sparkTutorial.rdd.commons.Utils;
4+
importorg.apache.log4j.Level;
5+
importorg.apache.log4j.Logger;
36
importorg.apache.spark.SparkConf;
47
importorg.apache.spark.SparkContext;
58
importorg.apache.spark.api.java.JavaRDD;
@@ -10,11 +13,10 @@
1013
publicclassStackOverFlowSurvey {
1114

1215
publicstaticvoidmain(String[]args)throwsException {
13-
16+
Logger.getLogger("org").setLevel(Level.ERROR);
1417
SparkConfconf =newSparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
1518

1619
SparkContextsparkContext =newSparkContext(conf);
17-
1820
JavaSparkContextjavaSparkContext =newJavaSparkContext(sparkContext);
1921

2022
finalLongAccumulatortotal =newLongAccumulator();
@@ -26,11 +28,11 @@ public static void main(String[] args) throws Exception {
2628
JavaRDD<String>responseRDD =javaSparkContext.textFile("in/2016-stack-overflow-survey-responses.csv");
2729

2830
JavaRDD<String>responseFromCanada =responseRDD.filter(response -> {
29-
String[]splits =response.split(",", -1);
31+
String[]splits =response.split(Utils.COMMA_DELIMITER, -1);
3032

3133
total.add(1);
3234

33-
if (splits[14].equals("")) {
35+
if (splits[14].isEmpty()) {
3436
missingSalaryMidPoint.add(1);
3537
}
3638

‎src/main/java/com/sparkTutorial/advanced/accumulator/StackOverFlowSurveyFollowUp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
packagecom.sparkTutorial.advanced.accumulator;
22

3+
importcom.sparkTutorial.rdd.commons.Utils;
4+
importorg.apache.log4j.Level;
5+
importorg.apache.log4j.Logger;
36
importorg.apache.spark.SparkConf;
47
importorg.apache.spark.SparkContext;
58
importorg.apache.spark.api.java.JavaRDD;
@@ -10,11 +13,9 @@
1013
publicclassStackOverFlowSurveyFollowUp {
1114

1215
publicstaticvoidmain(String[]args)throwsException {
13-
16+
Logger.getLogger("org").setLevel(Level.ERROR);
1417
SparkConfconf =newSparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
15-
1618
SparkContextsparkContext =newSparkContext(conf);
17-
1819
JavaSparkContextjavaSparkContext =newJavaSparkContext(sparkContext);
1920

2021
finalLongAccumulatortotal =newLongAccumulator();
@@ -31,11 +32,11 @@ public static void main(String[] args) throws Exception {
3132

3233
processedBytes.add(response.getBytes().length);
3334

34-
String[]splits =response.split(",", -1);
35+
String[]splits =response.split(Utils.COMMA_DELIMITER, -1);
3536

3637
total.add(1);
3738

38-
if (splits[14].equals("")) {
39+
if (splits[14].isEmpty()) {
3940
missingSalaryMidPoint.add(1);
4041
}
4142

‎src/main/java/com/sparkTutorial/advanced/broadcast/UkMarketSpaces.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
packagecom.sparkTutorial.advanced.broadcast;
22

3+
importcom.sparkTutorial.rdd.commons.Utils;
34
importorg.apache.log4j.Level;
45
importorg.apache.log4j.Logger;
56
importorg.apache.spark.SparkConf;
@@ -14,19 +15,16 @@
1415
publicclassUkMarketSpaces {
1516

1617
publicstaticvoidmain(String[]args)throwsException {
17-
1818
Logger.getLogger("org").setLevel(Level.ERROR);
19-
2019
SparkConfconf =newSparkConf().setAppName("UkMarketSpaces").setMaster("local[1]");
21-
2220
JavaSparkContextjavaSparkContext =newJavaSparkContext(conf);
2321

2422
finalBroadcast<Map<String,String>>postCodeMap =javaSparkContext.broadcast(loadPostCodeMap());
2523

2624
JavaRDD<String>marketsRdd =javaSparkContext.textFile("in/uk-market-spaces-identifiable-data.csv");
2725

2826
JavaRDD<String>regions =marketsRdd
29-
.filter(line -> !line.split(",", -1)[0].equals("Timestamp"))
27+
.filter(line -> !line.split(Utils.COMMA_DELIMITER, -1)[0].equals("Timestamp"))
3028
.map(line -> {
3129
Optional<String>postPrefix =getPostPrefix(line);
3230
if (postPrefix.isPresent() &&postCodeMap.value().containsKey(postPrefix.get())) {
@@ -40,7 +38,7 @@ public static void main(String[] args) throws Exception {
4038
}
4139

4240
privatestaticOptional<String>getPostPrefix(Stringline) {
43-
String[]splits =line.split(",", -1);
41+
String[]splits =line.split(Utils.COMMA_DELIMITER, -1);
4442
Stringpostcode =splits[4];
4543
if (postcode.isEmpty()) {
4644
returnOptional.empty();
@@ -53,7 +51,7 @@ private static Map<String, String> loadPostCodeMap() throws FileNotFoundExceptio
5351
Map<String,String>postCodeMap =newHashMap<>();
5452
while (postCode.hasNextLine()) {
5553
Stringline =postCode.nextLine();
56-
String[]splits =line.split(",", -1);
54+
String[]splits =line.split(Utils.COMMA_DELIMITER, -1);
5755
postCodeMap.put(splits[0],splits[7]);
5856
}
5957
returnpostCodeMap;

‎src/main/java/com/sparkTutorial/advanced/broadcast/UkMarketSpacesWithoutBroadcaset.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
packagecom.sparkTutorial.advanced.broadcast;
22

3+
importcom.sparkTutorial.rdd.commons.Utils;
4+
importorg.apache.log4j.Level;
5+
importorg.apache.log4j.Logger;
36
importorg.apache.spark.SparkConf;
47
importorg.apache.spark.api.java.JavaRDD;
58
importorg.apache.spark.api.java.JavaSparkContext;
@@ -11,17 +14,16 @@
1114
publicclassUkMarketSpacesWithoutBroadcaset {
1215

1316
publicstaticvoidmain(String[]args)throwsException {
14-
17+
Logger.getLogger("org").setLevel(Level.ERROR);
1518
SparkConfconf =newSparkConf().setAppName("UkMarketSpaces").setMaster("local[1]");
16-
1719
JavaSparkContextjavaSparkContext =newJavaSparkContext(conf);
1820

1921
finalMap<String,String>postCodeMap =loadPostCodeMap();
2022

2123
JavaRDD<String>marketsRdd =javaSparkContext.textFile("in/uk-market-spaces-identifiable-data.csv");
2224

2325
JavaRDD<String>regions =marketsRdd
24-
.filter(line -> !line.split(",", -1)[0].equals("Timestamp"))
26+
.filter(line -> !line.split(Utils.COMMA_DELIMITER, -1)[0].equals("Timestamp"))
2527
.map(line -> {
2628
List<String>postCodePrefixes =getPostPrefixes(line);
2729
for (StringpostCodePrefix:postCodePrefixes) {
@@ -37,7 +39,7 @@ public static void main(String[] args) throws Exception {
3739
}
3840

3941
privatestaticList<String>getPostPrefixes(Stringline) {
40-
String[]splits =line.split(",", -1);
42+
String[]splits =line.split(Utils.COMMA_DELIMITER, -1);
4143
Stringpostcode =splits[4];
4244
StringcleanedPostCode =postcode.replaceAll("\\s+","");
4345
ArrayList<String>prefixes =newArrayList<>();
@@ -52,7 +54,7 @@ private static Map<String, String> loadPostCodeMap() throws FileNotFoundExceptio
5254
Map<String,String>postCodeMap =newHashMap<>();
5355
while (postCode.hasNextLine()) {
5456
Stringline =postCode.nextLine();
55-
String[]splits =line.split(",", -1);
57+
String[]splits =line.split(Utils.COMMA_DELIMITER, -1);
5658
postCodeMap.put(splits[0],splits[7]);
5759
}
5860
returnpostCodeMap;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp