General description

In this assignment, you will be processing a stream of events on GitHub pull requests for the Ruby on Rails project. To make processing repeatable, we will be reading events from a file. The input file (12 MB uncompressed) contains records in the following format:

pull_request_id,author_login,event,timestamp_in_iso8601
4,datanoise,opened,2010-09-02 03:34:17
5,marsuboss,opened,2010-09-02 07:14:12
6,m3talsmith,opened,2010-09-06 16:07:08
7,sferik,opened,2010-09-08 19:09:56
8,sferik,opened,2010-09-09 04:33:23
8,dtrasbo,discussed,2010-09-09 04:44:25
[...]

The event field can have various values, of which we only care about the following ones:

Each line maps naturally to a case class:

case class PREvent(prId: Int, author: String, event: String, timestamp: Date)

We use the following code to parse the file, generate 1 event per line, and set the event time per event.

private def fileLoader(path: String): DataStream[PREvent] =
    env.readTextFile(path).flatMap { l =>
      l match {
        case lineFormat(pr_id, user, action, timestamp) =>
          Some(PREvent(pr_id.toInt, user, action, new SimpleDateFormat(dateInputFormat).parse(timestamp)))
        case _ => {
          println(l)
          None
        }
      }
    }.assignTimestampsAndWatermarks(new PREventBoundedOutOfOrdernessTimestampExtractor(Time.hours(1)))

Assignment setup

To help you setup your environment, we provide an IntelliJ project. To use it, download the latest IntelliJ community edition, install the Scala plug-in and import the directory as a new SBT project.

The expected answers for all questions are also provided.

⚠Note: You only need to submit the FlinkAssignment.scala file at CPM.

Grade: This assignment consists of 100 points. You need to collect them all to get a 10!

Questions

T (10 points) Count all events per day. Expected output:

2010-09-25,10
2010-09-27,7
2010-09-28,2
2010-09-29,4
2010-09-30,7
2010-10-01,3
  def question_one(events: DataStream[PREvent]): DataStream[(String, Int)] = {
    val format = new SimpleDateFormat(dateFormatDay)

    events.
      map(pr => (format.format(pr.timestamp), 1)).
      windowAll(TumblingEventTimeWindows.of(Time.days(1))).
      sum(1)
  }

T (20 points) Per week, report the hottest (most active) PR. A week is defined as a period of 7 days. You do not need to align to typical week boundaries (e.g. Monday to Sunday).

Hint: Consider using a Process[All]WindowFunction to perform a custom aggregation of the results.

Output format: date,pull_request_id,number_of_events Note that the date is the end date of the window.

Example output:

[...]
2014-04-10,14609,19
2014-04-17,14729,24
2014-04-24,14796,22
2014-05-01,14893,21
2014-05-08,14964,32
2014-05-15,15050,30
2014-05-22,15134,28
2014-05-29,15284,90
[...]
type Week = String
type PRID = Int
type NumEvents = Int

def question_two(events: DataStream[PREvent]):
      DataStream[(Week, PRID, NumEvents)] = {

    class HottestPR extends ProcessAllWindowFunction[(PRID, NumEvents), (Week, PRID, NumEvents), TimeWindow] {
      override def process(context: Context,
                           input: Iterable[(PRID, NumEvents)],
                           out: Collector[(Week, PRID, NumEvents)]): Unit = {

        val wEnd = new Date(context.window.getEnd)
        val max = input.groupBy(_._1).map(m => (m._1, m._2.size)).maxBy(_._2)

        out.collect(((new SimpleDateFormat(dateFormatDay)).format(wEnd), max._1, max._2))
      }
    }

    events.
      map(pr => (pr.prId, 1)).
      windowAll(TumblingEventTimeWindows.of(Time.days(7))).
      process(new HottestPR())
}

T (10 points) Output the most active commenter per month. Comment events are identified by the event type “discussed”. Assume a month lasts 30 days (again, no need to align to month boundaries).

Hint: Consider using a custom AggregateFunction

Output format: author,number_of_comments

Example output:

[...]
josevalim,11
josevalim,22
josevalim,9
josevalim,12
jeremy,21
spastorino,23
josevalim,10
dhh,34
[...]
def question_three(events: DataStream[PREvent]): DataStream[(String, Int)] = {

    class MaxCount extends AggregateFunction[(String, Int), Map[String, Int], (String, Int)] {
      override def createAccumulator(): Map[String, Int] = Map()

      override def add(value: (String, Int), acc: Map[String, Int]): Map[String, Int] =
        acc ++ Map((value._1, acc.getOrElse(value._1, 0) + 1))

      override def getResult(accumulator: Map[String, Int]): (String, Int) = accumulator.maxBy(_._2)

      override def merge(a: Map[String, Int], b: Map[String, Int]): Map[String, Int] =
        (a.toList ++ b.toList).groupBy(_._1).mapValues(_.maxBy(_._2)).map { case (k, v) => (k, v._2) }
    }

    events.
      filter(_.event == "discussed").
      map(pr => (pr.author, 1)).
      windowAll(TumblingEventTimeWindows.of(Time.days(30))).
      aggregate(new MaxCount)
}

T (20 points) Core team members: Per month, report all (distinct) authors that merged PRs within the last 3 months.

Hint: Consider using a custom AggregateFunction to collect both dates and author names

Output format: date_of_month,List(member1, member2, member3)

Example output:

[...]
2018-02-18,List(dhh, javan, schneems, tenderlove, ...)
2018-06-15,List(dhh, sikachu, javan, schneems, tenderlove, ...)
2018-09-01,List(dhh, sikachu, javan, schneems, tenderlove, ...)
2012-05-20,List(tenderlove, jeremy, fxn, iHiD, vijaydev, ...)
2012-09-17,List(dhh, tenderlove, jeremy, fxn, vijaydev, ...)
2013-01-14,List(dhh, VadimPushtaev, tenderlove, danchoi, ...)
[...]
def question_four(events: DataStream[PREvent]): DataStream[(Date, Seq[String])] = {

    case class DateAuthorAcc(date: Date, authors: Set[String])

    class DistinctAuthors extends AggregateFunction[(String, Date), DateAuthorAcc, (Date, Seq[String])] {

      override def createAccumulator() = DateAuthorAcc(new Date(1), Set[String]())

      override def add(value: (String, Date), acc: DateAuthorAcc) =
        acc.copy(
          date = new Date(math.max(value._2.getTime, acc.date.getTime)),
          authors = acc.authors + value._1
        )

      override def merge(a: DateAuthorAcc, b: DateAuthorAcc) = {
        a.copy(
          date = new Date(math.max(a.date.getTime, b.date.getTime)),
          authors = b.authors ++ a.authors
        )
      }

      override def getResult(acc: DateAuthorAcc) = (acc.date, acc.authors.toSeq)
    }

    events.
      filter(_.event == "merged").
      map(pr => (pr.author, pr.timestamp)).
      windowAll(SlidingEventTimeWindows.of(Time.days(90), Time.days(30))).
      aggregate(new DistinctAuthors)
}

T (20 points) Longest event streak per year (including the user that made it). An event streak is defined as an ordered list of consecutive events a, b where timestamp(b) - timestamp(a) < 1 hour. A year is 365 consequtive days.

Hint: Use a session window

Output format: author,longest_event_streak_for_this_year

Example output

[...]
rafaelfranca,49
sgrif,109
maclover7,69
[...]
def question_five(events: DataStream[PREvent]) : DataStream[(String, Int)] =
  events.
      map(e => (e.author, 1)).
      keyBy(0).
      window(EventTimeSessionWindows.withGap(Time.hours(1))).
      sum(1).
      windowAll(TumblingEventTimeWindows.of(Time.days(365))).
      maxBy(1)

T (20 points) Find all PR ids that are merged within 1 day, after more than 10 discussion comments. We only need to count discussion comments, so if other events exist between an open event and the first discussion comment, they should be discarded.

Hint: Use the Complex Event Processing library.

Output format: a list of pull request ids, one per line

Example output:

[...]
2052
3943
4114
7436
[...]
def question_six(events: DataStream[PREvent]): DataStream[Int] = {

    val pattern = Pattern.
      begin[PREvent]("opened").
      where(_.event == "opened").
      followedBy("prdiscussed").
      where(_.event == "discussed").
      timesOrMore(11).
      followedBy("prmerge").
      where(_.event == "merged").
      within(Time.days(1))

    class Distinct extends ProcessFunction[Int, Int] {
      val set = mutable.HashSet[Int]()

      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit =
        if (set.add(value)) out.collect(value)
    }

    CEP.
      pattern(events.keyBy(_.prId), pattern).
      select(_("prmerge").head.prId).
      process(new Distinct)

}