- Notifications
You must be signed in to change notification settings - Fork301
Scala to Python - rdd folder#1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
8838805
4d7b7a4
7b40998
652d9ff
80c20ed
08b146a
afc939b
c7168ce
131e3cf
f637b18
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
import re | ||
class Utils(): | ||
COMMA_DELIMITER = re.compile(''',(?=(?:[^'"]|'[^']*'|"[^"]*")*$)''') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
''' | ||
Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40. | ||
Then output the airport's name and the airport's latitude to out/airports_by_latitude.text. | ||
Each row of the input file contains the following columns: | ||
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, | ||
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format | ||
Sample output: | ||
"St Anthony", 51.391944 | ||
"Tofino", 49.082222 | ||
... | ||
''' |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from pyspark import SparkContext | ||
from commons.Utils import Utils | ||
def splitComma(line: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Have you tried to run this program? It doesn't compile
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Hi, yes I did ran all programs. Which version of python are you running? This should work in the latest Python 3 version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. sorry for the confusion. It works for Python 3. I was running python 2.7. Feel free to ignore this comment. | ||
splits = Utils.COMMA_DELIMITER.split(line) | ||
return "{}, {}".format(splits[1], splits[6]) | ||
if __name__ == "__main__": | ||
sc = SparkContext("local", "airports") | ||
airports = sc.textFile("in/airports.text") | ||
airportsInUSA = airports.filter(lambda line: float(Utils.COMMA_DELIMITER.split(line)[6]) > 40) | ||
airportsNameAndCityNames = airportsInUSA.map(splitComma) | ||
airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text") |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
''' | ||
Create a Spark program to read the airport data from in/airports.text, find all the airports which are located in United States | ||
and output the airport's name and the city's name to out/airports_in_usa.text. | ||
Each row of the input file contains the following columns: | ||
Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, | ||
ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format | ||
Sample output: | ||
"Putnam County Airport", "Greencastle" | ||
"Dowagiac Municipal Airport", "Dowagiac" | ||
... | ||
''' |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from pyspark import SparkContext | ||
from commons.Utils import Utils | ||
def splitComma(line: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Again, it didn't compile, I think you don't need the type.
| ||
splits = Utils.COMMA_DELIMITER.split(line) | ||
return "{}, {}".format(splits[1], splits[2]) | ||
if __name__ == "__main__": | ||
sc = SparkContext("local", "airports") | ||
airports = sc.textFile("in/airports.text") | ||
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"") | ||
airportsNameAndCityNames = airportsInUSA.map(splitComma) | ||
airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text") |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
sc = SparkContext("local", "collect") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Please set the logging level to ERROR similar to what the Scala problem does to reduce the noise of the output There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Hi James, some considerations about the logging level when using pyspark:
| ||
sc.setLogLevel("ERROR") | ||
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] | ||
wordRdd = sc.parallelize(inputWords) | ||
words = wordRdd.collect() | ||
for word in words: | ||
print(word) |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. again, set the logging level to ERROR | ||
sc = SparkContext("local", "count") | ||
sc.setLogLevel("ERROR") | ||
inputWords = ["spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop"] | ||
wordRdd = sc.parallelize(inputWords) | ||
print("Count: {}".format(wordRdd.count())) | ||
worldCountByValue = wordRdd.countByValue() | ||
print("CountByValue: ") | ||
for word, count in worldCountByValue.items(): | ||
print("{} : {}".format(word, count)) |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
''' | ||
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. | ||
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 | ||
Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days. | ||
Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file. | ||
Example output: | ||
vagrant.vf.mmc.com | ||
www-a1.proxy.aol.com | ||
..... | ||
Keep in mind, that the original log files contains the following header lines. | ||
host logname time method url response bytes | ||
Make sure the head lines are removed in the resulting RDD. | ||
''' |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
sc = SparkContext("local", "sameHosts") | ||
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") | ||
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") | ||
julyFirstHosts = julyFirstLogs.map(lambda line: line.split("\t")[0]) | ||
augustFirstHosts = augustFirstLogs.map(lambda line: line.split("\t")[0]) | ||
intersection = julyFirstHosts.intersection(augustFirstHosts) | ||
cleanedHostIntersection = intersection.filter(lambda host: host != "host") | ||
cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv") |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from pyspark import SparkContext | ||
if __name__ == "__main__": | ||
''' | ||
"in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. | ||
"in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 | ||
Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st, | ||
take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file. | ||
Keep in mind, that the original log files contains the following header lines. | ||
host logname time method url response bytes | ||
Make sure the head lines are removed in the resulting RDD. | ||
''' |
This file was deleted.
Uh oh!
There was an error while loading.Please reload this page.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from pyspark import SparkContext | ||
def isNotHeader(line: str): | ||
return not (line.startswith("host") and "bytes" in line) | ||
if __name__ == "__main__": | ||
sc = SparkContext("local", "unionLogs") | ||
julyFirstLogs = sc.textFile("in/nasa_19950701.tsv") | ||
augustFirstLogs = sc.textFile("in/nasa_19950801.tsv") | ||
aggregatedLogLines = julyFirstLogs.union(augustFirstLogs) | ||
cleanLogLines = aggregatedLogLines.filter(isNotHeader) | ||
sample = cleanLogLines.sample(withReplacement = True, fraction = 0.1) | ||
sample.saveAsTextFile("out/sample_nasa_logs.csv") |
Uh oh!
There was an error while loading.Please reload this page.