Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3be72e8

Browse files
author
James Lee
committed
split TypedDataset
1 parentcaf78df commit3be72e8

File tree

3 files changed

+94
-50
lines changed

3 files changed

+94
-50
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
packagecom.sparkTutorial.sparkSql;
2+
3+
importorg.apache.log4j.Level;
4+
importorg.apache.log4j.Logger;
5+
importorg.apache.spark.SparkConf;
6+
importorg.apache.spark.api.java.JavaRDD;
7+
importorg.apache.spark.api.java.JavaSparkContext;
8+
importorg.apache.spark.sql.Dataset;
9+
importorg.apache.spark.sql.Encoders;
10+
importorg.apache.spark.sql.SparkSession;
11+
12+
publicclassRddToDataset {
13+
14+
privatestaticfinalStringCOMMA_DELIMITER =",(?=([^\"]*\"[^\"]*\")*[^\"]*$)";
15+
16+
publicstaticvoidmain(String[]args)throwsException {
17+
18+
Logger.getLogger("org").setLevel(Level.ERROR);
19+
SparkConfconf =newSparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
20+
21+
JavaSparkContextsc =newJavaSparkContext(conf);
22+
23+
SparkSessionsession =SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate();
24+
25+
JavaRDD<String>lines =sc.textFile("in/2016-stack-overflow-survey-responses.csv");
26+
27+
JavaRDD<Response>responseRDD =lines
28+
.filter(line -> !line.split(COMMA_DELIMITER, -1)[2].equals("country"))
29+
.map(line -> {
30+
String[]splits =line.split(COMMA_DELIMITER, -1);
31+
returnnewResponse(splits[2],convertStringToFloat(splits[6]),splits[9],convertStringToFloat(splits[14]));
32+
});
33+
Dataset<Response>responseDataset =session.createDataset(responseRDD.rdd(),Encoders.bean(Response.class));
34+
35+
System.out.println("=== Print out schema ===");
36+
responseDataset.printSchema();
37+
38+
System.out.println("=== Print 20 records of responses table ===");
39+
responseDataset.show(20);
40+
41+
JavaRDD<Response>responseJavaRDD =responseDataset.toJavaRDD();
42+
43+
for (Responseresponse :responseJavaRDD.collect()) {
44+
System.out.println(response);
45+
}
46+
47+
}
48+
49+
privatestaticIntegerconvertStringToFloat(Stringsplit) {
50+
returnsplit.isEmpty() ?null :Math.round(Float.valueOf(split));
51+
}
52+
53+
}

‎src/main/java/com/sparkTutorial/sparkSql/Response.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
publicclassResponseimplementsSerializable {
66
privateStringcountry;
7-
privatefloatageMidPoint;
7+
privateIntegerageMidPoint;
88
privateStringoccupation;
9-
privatefloatsalaryMidPoint;
9+
privateIntegersalaryMidPoint;
1010

11-
publicResponse(Stringcountry,floatageMidPoint,Stringoccupation,floatsalaryMidPoint) {
11+
publicResponse(Stringcountry,IntegerageMidPoint,Stringoccupation,IntegersalaryMidPoint) {
1212
this.country =country;
1313
this.ageMidPoint =ageMidPoint;
1414
this.occupation =occupation;
@@ -26,11 +26,11 @@ public void setCountry(String country) {
2626
this.country =country;
2727
}
2828

29-
publicfloatgetAgeMidPoint() {
29+
publicIntegergetAgeMidPoint() {
3030
returnageMidPoint;
3131
}
3232

33-
publicvoidsetAgeMidPoint(floatageMidPoint) {
33+
publicvoidsetAgeMidPoint(IntegerageMidPoint) {
3434
this.ageMidPoint =ageMidPoint;
3535
}
3636

@@ -42,11 +42,22 @@ public void setOccupation(String occupation) {
4242
this.occupation =occupation;
4343
}
4444

45-
publicfloatgetSalaryMidPoint() {
45+
publicIntegergetSalaryMidPoint() {
4646
returnsalaryMidPoint;
4747
}
4848

49-
publicvoidsetSalaryMidPoint(floatsalaryMidPoint) {
49+
publicvoidsetSalaryMidPoint(IntegersalaryMidPoint) {
5050
this.salaryMidPoint =salaryMidPoint;
5151
}
52+
53+
54+
@Override
55+
publicStringtoString() {
56+
return"Response{" +
57+
"country='" +country +'\'' +
58+
", ageMidPoint=" +ageMidPoint +
59+
", occupation='" +occupation +'\'' +
60+
", salaryMidPoint=" +salaryMidPoint +
61+
'}';
62+
}
5263
}

‎src/main/java/com/sparkTutorial/sparkSql/TypedDataset.java

Lines changed: 23 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,81 +2,61 @@
22

33
importorg.apache.log4j.Level;
44
importorg.apache.log4j.Logger;
5-
importorg.apache.spark.SparkConf;
6-
importorg.apache.spark.api.java.JavaRDD;
7-
importorg.apache.spark.api.java.JavaSparkContext;
8-
importorg.apache.spark.sql.Dataset;
9-
importorg.apache.spark.sql.Encoders;
10-
importorg.apache.spark.sql.SparkSession;
5+
importorg.apache.spark.sql.*;
116

127
importstaticorg.apache.spark.sql.functions.avg;
8+
importstaticorg.apache.spark.sql.functions.col;
139
importstaticorg.apache.spark.sql.functions.max;
1410

1511

1612
publicclassTypedDataset {
1713
privatestaticfinalStringAGE_MIDPOINT ="ageMidpoint";
1814
privatestaticfinalStringSALARY_MIDPOINT ="salaryMidPoint";
1915
privatestaticfinalStringSALARY_MIDPOINT_BUCKET ="salaryMidpointBucket";
20-
privatestaticfinalfloatNULL_VALUE = -1.0f;
21-
privatestaticfinalStringCOMMA_DELIMITER =",(?=([^\"]*\"[^\"]*\")*[^\"]*$)";
2216

2317
publicstaticvoidmain(String[]args)throwsException {
2418

2519
Logger.getLogger("org").setLevel(Level.ERROR);
26-
SparkConfconf =newSparkConf().setAppName("StackOverFlowSurvey").setMaster("local[1]");
20+
SparkSessionsession =SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate();
2721

28-
JavaSparkContextsc =newJavaSparkContext(conf);
22+
DataFrameReaderdataFrameReader =session.read();
2923

30-
SparkSessionsession =SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate();
24+
Dataset<Row>responses =dataFrameReader.option("header","true").csv("in/2016-stack-overflow-survey-responses.csv");
3125

32-
JavaRDD<String>lines =sc.textFile("in/2016-stack-overflow-survey-responses.csv");
26+
Dataset<Row>responseWithSelectedColumns =responses.select(col("country"),col("age_midpoint").as("ageMidPoint").cast("integer"),col("occupation"),col("salary_midpoint").as("salaryMidPoint").cast("integer"));
3327

34-
JavaRDD<Response>responseRDD =lines
35-
.filter(line -> !line.split(COMMA_DELIMITER, -1)[2].equals("country"))
36-
.map(line -> {
37-
String[]splits =line.split(COMMA_DELIMITER, -1);
38-
returnnewResponse(splits[2],convertStringToFloat(splits[6]),splits[9],convertStringToFloat(splits[14]));
39-
});
40-
Dataset<Response>responseDataset =session.createDataset(responseRDD.rdd(),Encoders.bean(Response.class));
28+
Dataset<Response>typedDataset =responseWithSelectedColumns.as(Encoders.bean(Response.class));
4129

4230
System.out.println("=== Print out schema ===");
43-
responseDataset.printSchema();
31+
typedDataset.printSchema();
4432

4533
System.out.println("=== Print 20 records of responses table ===");
46-
responseDataset.show(20);
34+
typedDataset.show(20);
4735

4836
System.out.println("=== Print records where the response is from Afghanistan ===");
49-
responseDataset.filter(response ->response.getCountry().equals("Afghanistan")).show();
37+
typedDataset.filter(response ->response.getCountry().equals("Afghanistan")).show();
5038

5139
System.out.println("=== Print the count of occupations ===");
52-
responseDataset.groupBy(responseDataset.col("occupation")).count().show();
53-
40+
typedDataset.groupBy(typedDataset.col("occupation")).count().show();
5441

5542
System.out.println("=== Print records with average mid age less than 20 ===");
56-
responseDataset.filter(response ->response.getAgeMidPoint() !=NULL_VALUE &&response.getAgeMidPoint() <20).show();
43+
typedDataset.filter(response ->response.getAgeMidPoint() !=null &&response.getAgeMidPoint() <20).show();
5744

5845
System.out.println("=== Print the result with salary middle point in descending order ===");
59-
responseDataset.orderBy(responseDataset.col(SALARY_MIDPOINT ).desc()).show();
46+
typedDataset.orderBy(typedDataset.col(SALARY_MIDPOINT ).desc()).show();
6047

6148
System.out.println("=== Group by country and aggregate by average salary middle point and max age middle point ===");
62-
responseDataset
63-
.filter(response ->response.getSalaryMidPoint() !=NULL_VALUE)
64-
.groupBy("country")
65-
.agg(avg(SALARY_MIDPOINT),max(AGE_MIDPOINT))
66-
.show();
49+
typedDataset.filter(response ->response.getSalaryMidPoint() !=null)
50+
.groupBy("country")
51+
.agg(avg(SALARY_MIDPOINT),max(AGE_MIDPOINT))
52+
.show();
6753

6854
System.out.println("=== Group by salary bucket ===");
69-
70-
responseDataset
71-
.map(response ->Math.round(response.getSalaryMidPoint()/20000) *20000,Encoders.INT())
72-
.withColumnRenamed("value",SALARY_MIDPOINT_BUCKET)
73-
.groupBy(SALARY_MIDPOINT_BUCKET)
74-
.count()
75-
.orderBy(SALARY_MIDPOINT_BUCKET).show();
55+
typedDataset.filter(response ->response.getSalaryMidPoint() !=null)
56+
.map(response ->Math.round(response.getSalaryMidPoint()/20000) *20000,Encoders.INT())
57+
.withColumnRenamed("value",SALARY_MIDPOINT_BUCKET)
58+
.groupBy(SALARY_MIDPOINT_BUCKET)
59+
.count()
60+
.orderBy(SALARY_MIDPOINT_BUCKET).show();
7661
}
77-
78-
privatestaticfloatconvertStringToFloat(Stringsplit) {
79-
returnsplit.isEmpty() ?NULL_VALUE :Float.valueOf(split);
80-
}
81-
8262
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp