|
| 1 | +frompyspark.sqlimportSparkSession |
| 2 | +fromcommons.UtilsimportUtils |
| 3 | + |
| 4 | +defgetColNames(line:str): |
| 5 | +splits=Utils.COMMA_DELIMITER.split(line) |
| 6 | +return [splits[2],splits[6],splits[9],splits[14]] |
| 7 | + |
| 8 | +defmapResponseRdd(line:str): |
| 9 | +splits=Utils.COMMA_DELIMITER.split(line) |
| 10 | +double1=Noneifnotsplits[6]elsefloat(splits[6]) |
| 11 | +double2=Noneifnotsplits[14]elsefloat(splits[14]) |
| 12 | +returnsplits[2],double1,splits[9],double2 |
| 13 | + |
| 14 | +if__name__=="__main__": |
| 15 | + |
| 16 | +session=SparkSession.builder.appName("StackOverFlowSurvey").master("local").getOrCreate() |
| 17 | +sc=session.sparkContext |
| 18 | +sc.setLogLevel("ERROR") |
| 19 | + |
| 20 | +lines=sc.textFile("in/2016-stack-overflow-survey-responses.csv") |
| 21 | + |
| 22 | +colNames=lines \ |
| 23 | + .filter(lambdaline:Utils.COMMA_DELIMITER.split(line)[2]=="country") \ |
| 24 | + .map(getColNames) |
| 25 | + |
| 26 | +responseRDD=lines \ |
| 27 | + .filter(lambdaline:notUtils.COMMA_DELIMITER.split(line)[2]=="country") \ |
| 28 | + .map(mapResponseRdd) |
| 29 | + |
| 30 | +responseDataFrame=responseRDD.toDF(colNames.collect()[0]) |
| 31 | + |
| 32 | +print("=== Print out schema ===") |
| 33 | +responseDataFrame.printSchema() |
| 34 | + |
| 35 | +print("=== Print 20 records of responses table ===") |
| 36 | +responseDataFrame.show(20) |
| 37 | + |
| 38 | +forresponseinresponseDataFrame.rdd.collect(): |
| 39 | +print(response) |