In this assignment, we will use Spark to have a look at the Movie Lens dataset containing user generated ratings for movies. The dataset comes in 3 files:
ratings.dat
contains the ratings in the following format: UserID::MovieID::Rating::Timestamp
users.dat
contains demographic information about the users: UserID::Gender::Age::Occupation::Zip-code
movies.dat
contains meta information about the movies: MovieID::Title::Genres
Refer to the README for the detailed description of the data.
Note: when using the files use the filepath data/[file].dat
, otherwise automatic grading will fail.
Grade: This assignment consists of 105 points. You need to collect them all to get a 10! All cells that are graded include the expected answer. Your task is to write the code that comes up with the expected result. The automated grading process will be run on a different dataset.
Q1 (5 points): Download the ratings file, parse it and load it in an RDD named ratings
.
import org.apache.spark.rdd.RDD
case class Rating(user_ID: Integer, movie_ID: Integer, rating: Integer, timestamp: String)
case class Movie(movie_ID: Integer, title: String, genre: String)
case class User(user_ID: Integer, gender: String, age: Integer, occupation: String, zip_code: String)
def parseRatings(row: String): Rating = {
val splitted = row.split("::").map(_.trim).toList
return Rating(splitted(0).toInt, splitted(1).toInt, splitted(2).toInt, splitted(3))
}
val ratings = sc.textFile("data/ratings.dat").map(element => parseRatings(element))
ratings.cache
ratings
Q2 (5 points): How many lines does the ratings
RDD contain?
ratings.count
Q3 (5 points): Count how many times the rating '1' has been given.
ratings.filter(x => x.rating == 1).count()
Q4 (5 points): Count how many unique movies have been rated.
ratings.map(x => x.movie_ID).distinct.count()
Q5 (5 points): Which user gave most ratings? Return the userID
and number of ratings.
ratings.keyBy(x => x.user_ID).mapValues(x => 1).reduceByKey((x,y) => x + y).sortBy(_._2, false).take(1)
Q6 (5 points): Which user gave most '5' ratings? Return the userID
and number of ratings.
ratings.
filter(x => x.rating == 5).
keyBy(x => x.user_ID).
aggregateByKey(0)((acc, x) => acc + 1, (x, y) => x + y).
sortBy(_._2, false).
take(1)
Q7 (5 points): Which movie was rated most times? Return the movieID
and number of ratings.
ratings.
keyBy(x => x.movie_ID).
aggregateByKey(0)((acc, x) => acc + 1, (x, y) => x + y).
sortBy(_._2, false).
take(1)
Now we will look at two additional files from the Movie Lens dataset.
Q8 (5 points): Read the movies
and users
files into RDDs. How many records are there in each RDD?
def parseMovies(row: String): Movie = {
val splitted = row.split("::").map(_.trim).toList
return Movie(splitted(0).toInt, splitted(1), splitted(2))
}
val movies = sc.textFile("data/movies.dat").map(element => parseMovies(element)).cache
movies.count
def parseUsers(row: String): User = {
val splitted = row.split("::").map(_.trim).toList
return User(splitted(0).toInt, splitted(1), splitted(2).toInt, splitted(3), splitted(4))
}
val users = sc.textFile("data/users.dat").map(element => parseUsers(element)).cache
users.count
As you probably have noticed there are more movies in the movies dataset than rated movies.
Q9 (5 points): How many of the movies are a comedy?
movies.filter(x => x.genre.contains("Comedy")).count
Q10 (10 points): Which comedy has the most ratings? Return the title and the number of rankings. Answer this question by joining two datasets.
movies.
filter(x => x.genre.contains("Comedy")).
keyBy(x => x.movie_ID).
join(ratings.keyBy(x => x.movie_ID)).
map(x => (x._2._1.title)).
groupBy(x => x).
map{case (k, v) => (k, v.size)}.
sortBy(x => x._2, false).
take(1)
Q11 (10 points): For users under 18 years old (category 1), what is the frequency of each star rating? Return a list/array with the rating and the number of times it appears, e.g. Array((4,16), (1,3), (3,9), (5,62), (2,2))
users.filter(u => u.age == 1).
keyBy(u => u.user_ID).
join(ratings.keyBy(r => r.user_ID)).
map(x => x._2._2).
keyBy(r => r.rating).
map{case (k,v) => (k,1)}.
reduceByKey(_ + _).
collect
As you have noticed, typical operations on RDDs require grouping on a specific part of each record and then calculating specific counts given the groups. While this operation can be achieved with the groupBy
family of functions, it is often useful to create a structure called an inverted index. An inverted index creates an 1..n
mapping from the record part to all occurencies of the record in the dataset. For example, if the dataset looks like the following:
col1,col2,col3
A,1,foo
B,1,bar
C,2,foo
D,3,baz
E,1,foobar
an inverted index on col2 would look like
1 -> [(A,1,foo), (B,1,bar), (E,1,foobar)]
2 -> [(C,2,foo)]
3 -> [(D,3,baz)]
Inverted indexes enable us to quickly access precalculated partitions of the dataset. Let's compute an inverted index on the rating
field of ratings-student.dat
.
Q12 (5 points): Compute the number of unique users that rated the movies with movie_ID
s 2858, 356 and 2329.
ratings.filter(x => (List(2858, 356, 2329).contains(x.movie_ID))).groupBy(x => x.user_ID).keys.distinct.count
Measure the time (in seconds) it takes to make this computation.
val start_time: Long = (System.currentTimeMillis)
ratings.filter(x => (List(2858, 356, 2329).contains(x.movie_ID))).groupBy(x => x.user_ID).keys.distinct.count
println(((System.currentTimeMillis) - start_time)/1000.0)
Q13 (5 points): Create an inverted index on ratings
, field movie_ID
. Print the first item.
val idx = ratings.groupBy(r => r.movie_ID).cache
idx.lookup(1).take(1)
Q14 (5 points): Compute the number of unique users that rated the movies with movie_ID
s 2858, 356 and 2329 using the index
val items = sc.parallelize(List[Integer](2858, 356, 2329)).keyBy(x => x)
idx.join(items).flatMap{x => x._2._1}.map(x => x.user_ID).distinct.count
Measure the time (in seconds) it takes to compute the same result using the index.
val start_time: Long = (System.currentTimeMillis)
val items = sc.parallelize(List[Integer](2858, 356, 2329)).keyBy(x => x)
idx.join(items).flatMap{x => x._2._1}.map(x => x.user_ID).distinct.count
println(((System.currentTimeMillis) - start_time)/1000.0)
You should have noticed difference in performance. Is the indexed version faster? If yes, why? If not, why not? Discuss this with your partner.
Q15 (5 points): Create a data frame from the ratings
RDD and count the number of lines in it. Also register the data frame as an SQL table.
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType};
import org.apache.spark.sql.Row;
import org.apache.spark.sql.DataFrame
val logSchema = StructType(Array(
StructField("user_ID", IntegerType, nullable=true),
StructField("movie_ID", IntegerType, nullable=true),
StructField("rating", IntegerType, nullable=true),
StructField("timestamp", StringType, nullable=true)
));
val rdd = ratings.map(x => Row(x.user_ID, x.movie_ID, x.rating, x.timestamp))
val df = spark.createDataFrame(rdd, logSchema)
df.createOrReplaceTempView("rating")
df.count
Q16 (5 points): Provide the statistical summary of the column containing ratings (use Spark function that returns a table with count, mean, stddev, min, max).
Hint: To select the correct column you might first want to print the datatypes and names of each of the columns.
df.describe("rating").show()
Q17 (5 points): Count how many times the rating '1' has been given, by filtering it from the ratings DataFrame. Measure the execution time and compare with the execution time of the same query using RDD. Think for yourself when it would be usefull to use DataFrames and when not.
df.filter(df("rating") === "1").count()
Q18 (5 points): Count how many times the rating '1' has been given, using an SQL query
spark.sql("SELECT count(*) FROM rating WHERE rating = 1").show
Q19 (5 points): Which user gave most '5' ratings? Return the userID
and number of ratings, using an SQL query
spark.sql("select user_ID, count(*) as num_ratings from rating where rating = 5 group by user_ID order by num_ratings desc").show(1)