{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Assignment on Spark\n", "One of the most common uses of Spark is analyzing and processing log files.\n", "In this assignment, we will put Spark to good use for an OSS project that\n", "retrieves and downloads data from GitHub, called [GHTorrent](http://ghtorrent.org).\n", "GHTorrent works by following the Github [event timeline](https://api.github.com/events) \n", "and then retrieving all items linked from each event recursively and exhaustively.\n", "To make monitoring and debugging easier, the GHTorrent maintainers use extensive\n", "runtime logging for the downloader scripts.\n", "\n", "Here is an extract of what the GHTorrent log looks like:\n", "\n", "```\n", "DEBUG, 2017-03-23T10:02:27+00:00, ghtorrent-40 -- ghtorrent.rb: Repo EFForg/https-everywhere exists\n", "DEBUG, 2017-03-24T12:06:23+00:00, ghtorrent-49 -- ghtorrent.rb: Repo Shikanime/print exists\n", "INFO, 2017-03-23T13:00:55+00:00, ghtorrent-42 -- api_client.rb: Successful request. URL: https://api.github.com/repos/CanonicalLtd/maas-docs/issues/365/events?per_page=100, Remaining: 4943, Total: 88 ms\n", "WARN, 2017-03-23T20:04:28+00:00, ghtorrent-13 -- api_client.rb: Failed request. URL: https://api.github.com/repos/greatfakeman/Tabchi/commits?sha=Tabchi&per_page=100, Status code: 404, Status: Not Found, Access: ac6168f8776, IP: 0.0.0.0, Remaining: 3031\n", "DEBUG, 2017-03-23T09:06:09+00:00, ghtorrent-2 -- ghtorrent.rb: Transaction committed (11 ms)\n", "```\n", "\n", "Each log line comprises of a standard part (up to `.rb:`) and an \n", "operation-specific part. The standard part fields are like so:\n", "\n", "1. Logging level, one of `DEBUG`, `INFO`, `WARN`, `ERROR` (separated by `,`)\n", "2. A timestamp (separated by `,`)\n", "3. The downloader id, denoting the downloader instance (separated by ` -- `)\n", "4. The retrieval stage, denoted by the Ruby class name, one of:\n", " * `event_processing`\n", " * `ght_data_retrieval`\n", " * `api_client`\n", " * `retriever`\n", " * `ghtorrent`\n", "\n", "**Grade:** This assignment consists of 130 points. You need to collect 100 to\n", "get a 10!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading and parsing the file\n", "\n", "For the remaining of the assignment, you need to use\n", "[this file](https://drive.google.com/file/d/0B9Rx0uhucsroYWJxdEpPd2JYcjg/view?usp=sharing) (~300MB compressed). Make sure you use the **correct kernel** in your notebook (either the PySpark kernel or the Apache Toree Scala kernel).\n", "\n", "**T (5 points)**: Download the log file and write a function to load it in an RDD. If\n", "you are doing this in Scala, make sure you use a case class to map the file\n", "fields." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: How many lines does the RDD contain?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Basic counting and filtering\n", "\n", "**T (5 points)**: Count the number of WARNing messages" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (10 points)**: How many repositories where processed in total? Use the `api_client`\n", "lines only." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Analytics\n", "\n", "**T (5 points)**: Which client did most HTTP requests?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Which client did most FAILED HTTP requests?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: What is the most active hour of day?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: What is the most active repository?\n", "\n", "_Hint:_ use messages from the `ghtorrent.rb` layer only" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Which access keys are failing most often? \n", "\n", "_Hint:_ extract the `Access: ...` part from failing requests" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Indexing\n", "\n", "Typical operations on RDDs require grouping on a specific part of each\n", "record and then calculating specific counts given the groups. While this\n", "operation can be achieved with the `group_by` family of funcions, it is often\n", "useful to create a structure called an _inverted index_. An inverted index\n", "creates an `1..n` mapping from the record part to all occurencies of the\n", "record in the dataset. For example, if the dataset looks like the following:\n", "\n", "```\n", "col1,col2,col3\n", "A,1,foo\n", "B,1,bar\n", "C,2,foo\n", "D,3,baz\n", "E,1,foobar\n", "```\n", "\n", "an inverted index on `col2` would look like\n", "\n", "```\n", "1 -> [(A,1,foo), (B,1,bar), (E,1,foobar)]\n", "2 -> [(C,2,foo)]\n", "3 -> [(D,3,baz)]\n", "```\n", "\n", "Inverted indexes enable us to quickly access precalculated partitions of the \n", "dataset. To see their effect on large datasets, lets compute an inverted\n", "index on the `downloader id` part.\n", "\n", "**T (10 points)**: Create a function that given an `RDD[Seq[T]]` and an index \n", "position (denotes which field to index on), it computes an inverted index on \n", "the RDD.\n", "```python\n", "def inverted_index(rdd, idx_id):\n", " pass\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Compute the number of different _repositories_ accessed by \n", "the client `ghtorrent-22` (without using the inverted index)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Compute the number of different _repositories_ accessed by \n", "the client `ghtorrent-22` (using the inverted index you calculated above). \n", "Remember that Spark computations are lazy, so you need to run the inverted \n", "index generation before you actually use the index.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "**T (5 points)**: You should have noticed some difference in performance. Why is the\n", "indexed version faster?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Read up about `groupByKey`. Explain in 3 lines why it the worst function \n", "in the Spark API and what you can use instead." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Joining\n", "\n", "We now need to monitor the behaviour of interesting repositories. Use \n", "[this link](https://drive.google.com/open?id=0B9Rx0uhucsroRHNVTFpzMV9OUGs)\n", "to download a list of repos into which we are interested to. This list was\n", "generated on Oct 10, 2017, more than 7 months after the log file was created.\n", "The format of the file is CSV, and the meaning of the fields can be found on\n", "the GHTorrent project web site [documentation](http://ghtorrent.org/relational.html).\n", "\n", "**T (5 points)**: Read in the CSV file to an *RDD* (let's call it _interesting_). \n", "How many records are there?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (10 points)**: How many records in the log file refer to entries in the \n", "_interesting_ file? \n", "\n", "_Hint:_ Yes, you need to join :) First, you need to key both RDDs by the\n", "repository name to do such a join." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (5 points)**: Which of the _interesting_ repositories has the most failed \n", "API calls?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dataframes\n", "\n", "**T (10 points)** Read in the _interesting_ repos file using Spark's CSV parser. \n", "Convert the log RDD to a Dataframe.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**T (15 points)** Repeat all 3 queries in the \"Joining\" section above using \n", "either SQL or the Dataframe API. Measure the time it takes to execute them." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "**T (5 points)** Select one of the queries and compare the execution plans \n", "between the RDD version and your version. (you can see them by going to \n", "localhost:4040 in your VM). What differences do you see?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" } }, "nbformat": 4, "nbformat_minor": 2 }