|
| 1 | +importre |
| 2 | +frompyspark.sqlimport* |
| 3 | +frompyspark.sql.functionsimport* |
| 4 | +frompyspark.sql.typesimport* |
| 5 | + |
| 6 | +fromlib.loggerimportLog4j |
| 7 | + |
| 8 | + |
| 9 | +defparse_gender(gender): |
| 10 | +female_pattern=r"^f$|f.m|w.m" |
| 11 | +male_pattern=r"^m$|ma|m.l" |
| 12 | +ifre.search(female_pattern,gender.lower()): |
| 13 | +return"Female" |
| 14 | +elifre.search(male_pattern,gender.lower()): |
| 15 | +return"Male" |
| 16 | +else: |
| 17 | +return"Unknown" |
| 18 | + |
| 19 | + |
| 20 | +if__name__=="__main__": |
| 21 | +spark=SparkSession \ |
| 22 | + .builder \ |
| 23 | + .appName("UDF Demo") \ |
| 24 | + .master("local[2]") \ |
| 25 | + .getOrCreate() |
| 26 | + |
| 27 | +logger=Log4j(spark) |
| 28 | + |
| 29 | +survey_df=spark.read \ |
| 30 | + .option("header","true") \ |
| 31 | + .option("inferSchema","true") \ |
| 32 | + .csv("data/survey.csv") |
| 33 | + |
| 34 | +survey_df.show() |
| 35 | + |
| 36 | +parse_gender_udf=udf(parse_gender,returnType=StringType()) |
| 37 | + [logger.info(r)forrinspark.catalog.listFunctions()if"parse_gender"inr.name] |
| 38 | +survey_df2=survey_df.withColumn("Gender",parse_gender_udf("Gender")) |
| 39 | +survey_df2.show() |
| 40 | + |
| 41 | +spark.udf.register("parse_gender_udf",parse_gender,StringType()) |
| 42 | + [logger.info(r)forrinspark.catalog.listFunctions()if"parse_gender"inr.name] |
| 43 | +survey_df3=survey_df.withColumn("Gender",expr("parse_gender_udf(Gender)")) |
| 44 | +survey_df3.show() |