In this assignment, we will use Spark to have a look at the Movie Lens dataset containing user generated ratings for movies. The dataset comes in 3 files:
ratings.dat
contains the ratings in the following format: UserID::MovieID::Rating::Timestamp
users.dat
contains demographic information about the users: UserID::Gender::Age::Occupation::Zip-code
movies.dat
contains meta information about the movies: MovieID::Title::Genres
Refer to the README for the detailed description of the data.
Note: when using the files use the filepath data/[file].dat
, otherwise automatic grading will fail.
Grade: This assignment consists of 105 points. You need to collect them all to get a 10! All cells that are graded include the expected answer. Your task is to write the code that comes up with the expected result. The automated grading process will be run on a different dataset.
Q1 (5 points): Download the ratings file, parse it and load it in an RDD named ratings.
def parse_file(element):
return element.split('::', 4)
# load data to RDD and use parse_ratings function to parse it.
ratings = sc.textFile("data/ratings.dat")
ratings = ratings.map(parse_file).cache()
ratings
Q2 (5 points): How many lines does the ratings
RDD contain?
ratings.count()
Q3 (5 points): Count how many times the rating '1' has been given.
ratings.filter(lambda x: x[2] == '1').count()
Q4 (5 points): Count how many unique movies have been rated.
ratings.groupBy(lambda x: x[1]).count()
Q5 (5 points): Which user gave most ratings? Return the userID
and number of ratings.
ratings.groupBy(lambda x: x[0]).map(lambda x: (x[0], x[1].__len__())).max(key=lambda x: x[1])
Q6 (5 points): Which user gave most '5' ratings? Return the userID
and number of ratings.
ratings.filter(lambda x: x[2] == '5')\
.groupBy(lambda x: x[0])\
.map(lambda x: (x[0], x[1].__len__()))\
.max(key=lambda x: x[1])
Q7 (5 points): Which movie was rated most times? Return the movieID
and number of ratings.
ratings.groupBy(lambda x: x[1]).map(lambda x: (x[0], x[1].__len__())).max(key=lambda x: x[1])
Now we will look at two additional files from the Movie Lens dataset.
Q8 (5 points): Read the movies
and users
files into RDDs. How many records are there in each RDD?
#load movies dataset to RDD, parse and cache it.
movies = sc.textFile("data/movies.dat")
movies = movies.map(parse_file).cache()
#how many records are in movies RDD's?
movies.count()
#load users dataset to RDD, parse and cache it.
users = sc.textFile("data/users.dat")
users = users.map(parse_file).cache()
#how many records are in users RDD's?
users.count()
As you probably have noticed there are more movies in the movies dataset than rated movies.
Q9 (5 points): How many of the movies are a comedy?
movies.filter(lambda x: x[2].count('Comedy')!=0).count()
Q10 (10 points): Which comedy has the most ratings? Return the title and the number of rankings. Answer this question by joining two datasets.
#group ratings by unique movie
#join with the filtered comedies
#count ratings per comedy
ratings.groupBy(lambda x: x[1]).keyBy(lambda x: x[0]) \
.join(movies.filter(lambda x: x[2].count('Comedy')!=0).keyBy(lambda x: x[0])) \
.map(lambda (key, (k,v)): (key, v[1], k[1].__len__())) \
.max(key=lambda x: x[2])
Q11 (10 points): For users under 18 years old (category 1), what is the frequency of each star rating? Return a list/array with the rating and the number of times it appears, e.g. Array((4,16), (1,3), (3,9), (5,62), (2,2))
users.filter(lambda x: x[2]=='1')\
.keyBy(lambda x: x[0])\
.join(ratings.keyBy(lambda x: x[0]))\
.map(lambda (key, (k, v)): v)\
.keyBy(lambda x: x[2])\
.map(lambda x: (x[0], 1))\
.reduceByKey(lambda a, b: a+b)\
.collect()
As you have noticed, typical operations on RDDs require grouping on a specific part of each record and then calculating specific counts given the groups. While this operation can be achieved with the groupBy
family of functions, it is often useful to create a structure called an inverted index. An inverted index creates an 1..n
mapping from the record part to all occurencies of the record in the dataset. For example, if the dataset looks like the following:
col1,col2,col3
A,1,foo
B,1,bar
C,2,foo
D,3,baz
E,1,foobar
an inverted index on col2 would look like
1 -> [(A,1,foo), (B,1,bar), (E,1,foobar)]
2 -> [(C,2,foo)]
3 -> [(D,3,baz)]
Inverted indexes enable us to quickly access precalculated partitions of the dataset. Let's compute an inverted index on the rating
field of `ratings-student.dat.
Q12 (5 points): Compute the number of unique users that rated the movies with movie_ID
s 2858, 356 and 2329.
ratings.filter(lambda x: x[1] in ["2858", "356", "2329"])\
.groupBy(lambda x: x[0]).count()
Measure the time (in seconds) it takes to make this computation.
import time
start_time_1 = time.time()
ratings.filter(lambda x: x[1] in ["2858", "356", "2329"]).groupBy(lambda x: x[0]).count()
print(time.time() - start_time_1)
Q13 (5 points): Create an inverted index on ratings
, field movie_ID
. Print the first item.
idx = ratings.groupBy(lambda x: x[1])
list(list(idx.lookup('1'))[0])
Q14 (5 points): Compute the number of unique users that rated the movies with movie_ID
s 2858, 356 and 2329 using the index
idx.filter(lambda x: x[0] in ["2858", "356", "2329"])\
.map(lambda x: x[1])\
.flatMap(lambda x: list(x))\
.groupBy(lambda x: x[0]).count()
Measure the time (in seconds) it takes to compute the same result using the index.
# Measure the time of this computation.
start_time_1 = time.time()
idx.filter(lambda x: x[0] in ["2858", "356", "2329"])\
.map(lambda x: x[1])\
.flatMap(lambda x: list(x))\
.groupBy(lambda x: x[0]).count()
print(time.time() - start_time_1)
You should have noticed difference in performance. Is the indexed version faster? If yes, why? If not, why not? Discuss this with your partner.
Q15 (5 points): Create a data frame from the ratings
RDD and count the number of lines in it. Also register the data frame as an SQL table
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = ratings.toDF()
sqlContext.registerDataFrameAsTable(df, "table")
df.count()
Q16 (5 points): Provide the statistical summary of the column containing ratings (use Spark function that returns a table with count, mean, stddev, min, max).
Hint: To select the correct column you might first want to print the datatypes and names of each of the columns.
df.describe("_3").show()
Q17 (5 points): Count how many times the rating '1' has been given, by filtering it from the ratings DataFrame. Measure the execution time and compare with the execution time of the same query using RDD. Think for yourself when it would be usefull to use DataFrames and when not.
# Count number of ratings "1"
df.filter(df._3=='1').count()
Q18 (5 points): Count how many times the rating '1' has been given, using an SQL query.
sqlContext.sql("SELECT count(*) FROM table WHERE _3 ==1").show()
Q19 (5 points): Which user gave most '5' ratings? Return the userID
and number of ratings, using an SQL query.
sqlContext.sql("select _1, count(*) as num_ratings from table where _3 = 5 group by _1 order by num_ratings desc").show(1)