One of the most common uses of Spark is analyzing and processing log files. In this assignment, we will put Spark to good use for an OSS project that retrieves and downloads data from GitHub, called GHTorrent. GHTorrent works by following the Github event timeline and then retrieving all items linked from each event recursively and exhaustively. To make monitoring and debugging easier, the GHTorrent maintainers use extensive runtime logging for the downloader scripts.
Here is an extract of what the GHTorrent log looks like:
DEBUG, 2017-03-23T10:02:27+00:00, ghtorrent-40 -- ghtorrent.rb: Repo EFForg/https-everywhere exists
DEBUG, 2017-03-24T12:06:23+00:00, ghtorrent-49 -- ghtorrent.rb: Repo Shikanime/print exists
INFO, 2017-03-23T13:00:55+00:00, ghtorrent-42 -- api_client.rb: Successful request. URL: https://api.github.com/repos/CanonicalLtd/maas-docs/issues/365/events?per_page=100, Remaining: 4943, Total: 88 ms
WARN, 2017-03-23T20:04:28+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 3031
DEBUG, 2017-03-23T09:06:09+00:00, ghtorrent-2 -- ghtorrent.rb: Transaction committed (11 ms)
Each log line comprises of a standard part (up to .rb:
) and an
operation-specific part. The standard part fields are like so:
DEBUG
, INFO
, WARN
, ERROR
(separated by ,
),
)--
)event_processing
ght_data_retrieval
api_client
retriever
ghtorrent
Grade: This assignment consists of 130 points. You need to collect 100 to get a 10!
# for parsing rdd rows
# Columns:
# 0: logging level, 1: timestamp, 2: downloader id,
# 3: retrieval stage, 4: Action?
def myParse(line):
line = line.replace(' -- ', ', ')
line = line.replace('.rb: ', ', ')
line = line.replace(', ghtorrent-', ', ')
return line.split(', ', 4)
def getRDD(filename):
textFile = sc.textFile("hdfs://bdp1:8020/ghtorrent-logs.txt")
parsedRDD = textFile.map(myParse)
return parsedRDD
rowrdd = getRDD("ghtorrent-logs.txt").cache()
T (5 points): How many lines does the RDD contain?
print(rowrdd.count())
T (5 points): Count the number of WARNing messages
numWarns = rowrdd.filter(lambda x: x[0] == "WARN")
print(numWarns.count())
T (10 points): How many repositories where processed in total? Use the api_client
lines only.
import itertools
# Add repositories as column 5
# rewrite with split, and use only api_client
def parseRepos(x):
try:
# Filter for repos by looking for it in url
# For instance:
# Successful request. URL: https://api.github.com/repos/CanonicalLtd/maas-docs
# /issues/365/events?per_page=100, Remaining: 4943, Total: 88 ms
# Should return "CanonicalLtd/maas-docs/maas-docs"
split = x[4].split('/')[4:6]
joinedSplit = '/'.join(split)
result = joinedSplit.split('?')[0]
except:
result = ''
x.append(result)
return x
# Filters out rows without enough elements (about 50 rows)
filteredRdd = rowrdd.filter(lambda x: len(x) == 5)
# Only look at api_client calls
apiRdd = filteredRdd.filter(lambda x: x[3] == "api_client")
# Add another column with the repo if can find one, otherwise ''
reposRdd = apiRdd.map(parseRepos)
# Filter out rows without repo
removedEmpty = reposRdd.filter(lambda x: x[5] != '')
# Group by repo and count
uniqueRepos = removedEmpty.groupBy(lambda x: x[5])
print(uniqueRepos.count())
T (5 points): Which client did most HTTP requests?
# Group by, count and find max
usersHttp = apiRdd.groupBy(lambda x: x[2])
usersHttpSum = usersHttp.map(lambda x: (x[0], x[1].__len__()))
print(usersHttpSum.max(key=lambda x: x[1]))
T (5 points): Which client did most FAILED HTTP requests? Use group_by
to provide
an answer.
# filter failed http requests
onlyFailed = apiRdd.filter(lambda x: x[4].split(' ', 1)[0] == "Failed")
# Group by, count, find max
usersFailedHttp = onlyFailed.groupBy(lambda x: x[2])
usersFailedHttpSum = usersFailedHttp.map(lambda x: (x[0], x[1].__len__()))
print(usersFailedHttpSum.max(key=lambda x: x[1]))
T (5 points): What is the most active hour of day?
# Get hour of the day from timestamp and add it
def appendAndReturn(x, toAdd):
x.append(toAdd)
return x
# Split date to hour only
onlyHours = filteredRdd.map(lambda x: appendAndReturn(x, x[1].split('T', 1)[1].split(':', 1)[0]))
# Group by, count, find max
groupOnlyHours = onlyHours.groupBy(lambda x: x[5])
hoursCount = groupOnlyHours.map(lambda x: (x[0], x[1].__len__()))
print(hoursCount.max(key=lambda x: x[1]))
T (5 points): What is the most active repository (hint: use messages from the ghtorrent.rb
layer only)?
# Group by, count, find max
activityRepos = removedEmpty.groupBy(lambda x: x[5])
countActivityRepos = activityRepos.map(lambda x: (x[0], x[1].__len__()))
print(countActivityRepos.max(key=lambda x: x[1]))
#(u'greatfakeman/Tabchi', 79523)
T (5 points): Which access keys are failing most often? (hint: extract the Access: ...
part from failing requests)?
# Add access code
addedCodes = onlyFailed.map(lambda x: appendAndReturn(x, x[4].split('Access: ', 1)[1].split(',', 1)[0]))
# most failed access
accessCodes = addedCodes.groupBy(lambda x: x[5])
countAccessCodes = accessCodes.map(lambda x: (x[0], x[1].__len__()))
print(countAccessCodes.max(key=lambda x: x[1]))
Typical operations on RDDs require grouping on a specific part of each
record and then calculating specific counts given the groups. While this
operation can be achieved with the group_by
family of funcions, it is often
useful to create a structure called an inverted index. An inverted index
creates an 1..n
mapping from the record part to all occurencies of the
record in the dataset. For example, if the dataset looks like the following:
col1,col2,col3
A,1,foo
B,1,bar
C,2,foo
D,3,baz
E,1,foobar
an inverted index on col2
would look like
1 -> [(A,1,foo), (B,1,bar), (E,1,foobar)]
2 -> [(C,2,foo)]
3 -> [(D,3,baz)]
Inverted indexes enable us to quickly access precalculated partitions of the
dataset. To see their effect on large datasets, lets compute an inverted
index on the downloader id
part.
T (10 points): Create a function that given an RDD[Seq[T]]
and an index position
(denotes which field to index on), it computes an inverted index on the RDD.
# Create inverted index for rdd on column idx_id
def inverted_index(rdd, idx_id):
inv_id = rdd.groupBy(lambda x: x[idx_id])
return inv_id
T (5 points): Compute the number of different repositories accessed by the client
ghtorrent-22
(without using the inverted index).
import time
# get unique repos count for user 22 without inverted index
start_time = time.time()
user22 = removedEmpty.filter(lambda row: row[2] == '22')
user22repos = user22.groupBy(lambda x: x[5])
print(user22repos.count())
print(time.time() - start_time)
T (5 points): Compute the number of different repositories accessed by the client
ghtorrent-22
(using the inverted index you calculated above). Remember that
Spark computations are lazy, so you need to run the inverted index generation
before you actually use the index.
# create inverted index on ID
invertedIndex = inverted_index(removedEmpty, 2)
# dummy lookup here to create the index
invertedIndex.lookup('21')
# lookup user 22 and check unique repos
start_time = time.time()
lookedUp22 = invertedIndex.lookup('22')
entries22 = next(lookedUp22.__iter__())
uniqueRepos = []
for x in entries22:
if x[5] not in uniqueRepos:
uniqueRepos.append(x[5])
print(len(uniqueRepos))
print(time.time() - start_time)
T (5 points): You should have noticed some difference in performance. Why is the indexed version faster?
79.28 seconds vs 1.31 seconds Because after creating an inverted index on the ID key retrieving the data using an ID goes a lot faster, it only has to find 1 row instead of multiple rows containing the user.
T (5 points): Read up about groupByKey
. Explain in 3 lines why it the worst function
in the Spark API and what you can use instead.
We now need to monitor the behaviour of interesting repositories. Use this link to download a list of repos into which we are interested to. This list was generated on Oct 10, 2017, more than 7 months after the log file was created. The format of the file is CSV, and the meaning of the fields can be found on the GHTorrent project web site documentation.
T (5 points): Read in the CSV file to an RDD (let's call it interesting). How many records are there?
textfile = sc.textFile("hdfs://bdp1:8020/important-repos.csv")
interesting = textfile.map(lambda line: line.split(","))
interesting.count()
T (10 points): How many records in the log file refer to entries in the interesting file?
def changeRepo(x):
try:
x[5] = x[5].split("/")[1]
except:
x[5] = ''
return x
interestingRepo = interesting.keyBy(lambda x: x[3])
logLineRepo = reposRdd.map(changeRepo).filter(lambda x: x[5] != '').keyBy(lambda x: x[5])
joinedRepo = interestingRepo.join(logLineRepo)
joinedRepo.count()
T (5 points): Which of the interesting repositories has the most failed API calls?
joinedRepo.filter(lambda (key, (k, v)): v[4].startswith("Failed")) \
.map(lambda (key, (k, v)): (key, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda (k, v): v, False) \
.take(3)
T (10 points) Read in the interesting repos file using Spark's CSV parser. Convert the log RDD to a Dataframe.
interesting_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("hdfs://bdp1:8020/important-repos.csv");
log_df = reposRdd.map(changeRepo).filter(lambda x: x[5] != '').toDF()
T (15 points) Repeat all 3 queries in the "Joining" section above using either SQL or the Dataframe API. Measure the time it takes to execute them.
interesting_df.count()
joined_df = interesting_df.join(log_df, interesting_df.name == log_df._6);
joined_df.count()
from pyspark.sql.functions import *
joined_df.filter(joined_df._5.startswith("Failed")) \
.groupBy(joined_df.name) \
.count() \
.orderBy(desc("count")) \
.take(3)
T (5 points) Select one of the queries and compare the execution plans between the RDD version and your version. (you can see them by going to localhost:4040 in your VM). What differences do you see?