Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Scala to Python - rdd folder#1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
jleetutorial merged 10 commits intomasterfrompedromb-scala_to_python
Sep 29, 2017
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletionscommons/Utils.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
import re

class Utils():

COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''')
13 changes: 7 additions & 6 deletionsrdd/WordCount.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -2,9 +2,10 @@
from pyspark import SparkContext

if __name__ == "__main__":
sc = SparkContext("local", "word count")
lines = sc.textFile("in/word_count.text")
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()
for word, count in wordCounts.items():
print(word, count)
sc = SparkContext("local", "word count")
sc.setLogLevel("ERROR")
lines = sc.textFile("in/word_count.text")
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()
for word, count in wordCounts.items():
print(word, count)
17 changes: 17 additions & 0 deletionsrdd/airports/AirportsByLatitudeProblem.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
from pyspark import SparkContext

if __name__ == "__main__":

'''
Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40.
Then output the airport's name and the airport's latitude to out/airports_by_latitude.text.

Each row of the input file contains the following columns:
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format

Sample output:
"St Anthony", 51.391944
"Tofino", 49.082222
...
'''
20 changes: 0 additions & 20 deletionsrdd/airports/AirportsByLatitudeProblem.scala
View file
Open in desktop

This file was deleted.

17 changes: 17 additions & 0 deletionsrdd/airports/AirportsByLatitudeSolution.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
from pyspark import SparkContext
from commons.Utils import Utils

def splitComma(line: str):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Have you tried to run this program? It doesn't compile

  File "/Users/cwei/code/python-spark-tutorial-new/rdd/airports/AirportsByLatitudeSolution.py", line 4    def splitComma(line: str):                       ^SyntaxError: invalid syntax

Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hi, yes I did ran all programs. Which version of python are you running? This should work in the latest Python 3 version

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

sorry for the confusion. It works for Python 3. I was running python 2.7. Feel free to ignore this comment.

splits = Utils.COMMA_DELIMITER.split(line)
return "{}, {}".format(splits[1], splits[6])

if __name__ == "__main__":
sc = SparkContext("local", "airports")

airports = sc.textFile("in/airports.text")

airportsInUSA = airports.filter(lambda line: float(Utils.COMMA_DELIMITER.split(line)[6]) > 40)

airportsNameAndCityNames = airportsInUSA.map(splitComma)

airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text")
23 changes: 0 additions & 23 deletionsrdd/airports/AirportsByLatitudeSolution.scala
View file
Open in desktop

This file was deleted.

17 changes: 17 additions & 0 deletionsrdd/airports/AirportsInUsaProblem.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
from pyspark import SparkContext

if __name__ == "__main__":

'''
Create a Spark program to read the airport data from in/airports.text, find all the airports which are located in United States
and output the airport's name and the city's name to out/airports_in_usa.text.

Each row of the input file contains the following columns:
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format

Sample output:
"Putnam County Airport", "Greencastle"
"Dowagiac Municipal Airport", "Dowagiac"
...
'''
19 changes: 0 additions & 19 deletionsrdd/airports/AirportsInUsaProblem.scala
View file
Open in desktop

This file was deleted.

15 changes: 15 additions & 0 deletionsrdd/airports/AirportsInUsaSolution.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
from pyspark import SparkContext
from commons.Utils import Utils

def splitComma(line: str):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Again, it didn't compile, I think you don't need the type.

  File "/Users/cwei/code/python-spark-tutorial-new/rdd/airports/AirportsInUsaSolution.py", line 4    def splitComma(line: str):                       ^SyntaxError: invalid syntax

splits = Utils.COMMA_DELIMITER.split(line)
return "{}, {}".format(splits[1], splits[2])

if __name__ == "__main__":
sc = SparkContext("local", "airports")

airports = sc.textFile("in/airports.text")
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")

airportsNameAndCityNames = airportsInUSA.map(splitComma)
airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
22 changes: 0 additions & 22 deletionsrdd/airports/AirportsInUsaSolution.scala
View file
Open in desktop

This file was deleted.

10 changes: 10 additions & 0 deletionsrdd/collect/CollectExample.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
from pyspark import SparkContext

if __name__ == "__main__":
sc = SparkContext("local", "collect")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Please set the logging level to ERROR similar to what the Scala problem does to reduce the noise of the output

Copy link
CollaboratorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hi James, some considerations about the logging level when using pyspark:

  • From the script itself, when using pyspark, we can only set the log level after starting the SparkContext, this means that logs printed when the SparkContext is starting will be printed anyway.
  • The best way to reduce the noise of the output is to configure the file log4j.properties inside spark/conf folder.
    That being said, I will set the log levels to ERROR after the SparkContext starts

sc.setLogLevel("ERROR")
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
wordRdd = sc.parallelize(inputWords)
words = wordRdd.collect()
for word in words:
print(word)
20 changes: 0 additions & 20 deletionsrdd/collect/CollectExample.scala
View file
Open in desktop

This file was deleted.

12 changes: 12 additions & 0 deletionsrdd/count/CountExample.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
from pyspark import SparkContext

if __name__ == "__main__":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

again, set the logging level to ERROR

sc = SparkContext("local", "count")
sc.setLogLevel("ERROR")
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"]
wordRdd = sc.parallelize(inputWords)
print("Count: {}".format(wordRdd.count()))
worldCountByValue = wordRdd.countByValue()
print("CountByValue: ")
for word, count in worldCountByValue.items():
print("{} : {}".format(word, count))
23 changes: 0 additions & 23 deletionsrdd/count/CountExample.scala
View file
Open in desktop

This file was deleted.

20 changes: 20 additions & 0 deletionsrdd/nasaApacheWebLogs/SameHostsProblem.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
from pyspark import SparkContext

if __name__ == "__main__":

'''
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995
Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days.
Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file.

Example output:
vagrant.vf.mmc.com
www-a1.proxy.aol.com
.....

Keep in mind, that the original log files contains the following header lines.
host logname time method url response bytes

Make sure the head lines are removed in the resulting RDD.
'''
23 changes: 0 additions & 23 deletionsrdd/nasaApacheWebLogs/SameHostsProblem.scala
View file
Open in desktop

This file was deleted.

15 changes: 15 additions & 0 deletionsrdd/nasaApacheWebLogs/SameHostsSolution.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
from pyspark import SparkContext

if __name__ == "__main__":
sc = SparkContext("local", "sameHosts")

julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")

julyFirstHosts = julyFirstLogs.map(lambda line: line.split("\t")[0])
augustFirstHosts = augustFirstLogs.map(lambda line: line.split("\t")[0])

intersection = julyFirstHosts.intersection(augustFirstHosts)

cleanedHostIntersection = intersection.filter(lambda host: host != "host")
cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv")
23 changes: 0 additions & 23 deletionsrdd/nasaApacheWebLogs/SameHostsSolution.scala
View file
Open in desktop

This file was deleted.

15 changes: 15 additions & 0 deletionsrdd/nasaApacheWebLogs/UnionLogProblem.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
from pyspark import SparkContext

if __name__ == "__main__":

'''
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995
Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st,
take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file.

Keep in mind, that the original log files contains the following header lines.
host logname time method url response bytes

Make sure the head lines are removed in the resulting RDD.
'''
18 changes: 0 additions & 18 deletionsrdd/nasaApacheWebLogs/UnionLogProblem.scala
View file
Open in desktop

This file was deleted.

17 changes: 17 additions & 0 deletionsrdd/nasaApacheWebLogs/UnionLogSolutions.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
from pyspark import SparkContext

def isNotHeader(line: str):
return not (line.startswith("host") and "bytes" in line)

if __name__ == "__main__":
sc = SparkContext("local", "unionLogs")

julyFirstLogs = sc.textFile("in/nasa_19950701.tsv")
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv")

aggregatedLogLines = julyFirstLogs.union(augustFirstLogs)

cleanLogLines = aggregatedLogLines.filter(isNotHeader)
sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1)

sample.saveAsTextFile("out/sample_nasa_logs.csv")
Loading

[8]ページ先頭

©2009-2025 Movatter.jp