Processing data with Spark

The world before Spark

Parallelism

Parallelism is about speeding up computations by utilising clusters of machines.

  • Task parallelism: Distribute computations across different processors
  • Data parallelism: Apply the same computation on different data sets

Distributed data parallelism involves splitting the data over several distributed nodes, where nodes work in parallel, and combine the individual results to come up with a final one.

Issues with data parallelism

  • Latency: Operations are 1.000x (disk) or 1.000.000x (network) slower than accessing data in memory
  • (Partial) failures: Computations on 100s of machines may fail at any time

This means that our programming model and execution should hide (but not forget!) those.

Hadoop: Pros and Cons

What is good about Hadoop: Fault tolerance

Hadoop was the first framework to enable computations to run on 1000s of commodity computers.

What is wrong: Performance

  • Before each Map or Reduce step, Hadoop writes to HDFS
  • Hard to express iterative problems in M/R
    • All machine learning problems are iterative

The Hadoop Workflow

DryadLINQ

Microsoft’s DryadLINQ combined the Dryad distributed execution engine with the LINQ language for defining queries.

DryadLINQ architecture

FlumeJava

Google’s FlumeJava attempted to provide a few simple abstractions for programming data-parallel computations. These abstractions are higher-level than those provided by MapReduce, and provide better support for pipelines.

PTable<String,Integer> wordsWithOnes =
  words.parallelDo(
      new DoFn<String, Pair<String,Integer>>() {
    void process(String word,
                 EmitFn<Pair<String,Integer>> emitFn) {
      emitFn.emit(Pair.of(word, 1));
    }
  }, tableOf(strings(), ints()));

PTable<String,Collection<Integer>>
  groupedWordsWithOnes = wordsWithOnes.groupByKey();

PTable<String,Integer> wordCounts =
  groupedWordsWithOnes.combineValues(SUM_INTS);

Spark and RDDs

What is Spark?

Spark is on open source cluster computing framework.

  • automates distribution of data and computations on a cluster of computers
  • provides a fault-tolerant abstraction to distributed datasets
  • is based on functional programming primitives
  • provides two abstractions to data, list-like (RDDs) and table-like (Datasets)

Spark Logo

Resilient Distributed Datasets (RDDs)

RDDs are the core abstraction that Spark uses.

RDDs make datasets distributed over a cluster of machines look like a Scala collection. RDDs:

  • are immutable
  • reside (mostly) in memory
  • are transparently distributed
  • feature all FP programming primitives
    • in addition, more to minimize shuffling

In practice, RDD[A] works like Scala’s List[A]

Counting words with Spark

In Scala

// http://classics.mit.edu/Homer/odyssey.mb.txt
val rdd = sc.textFile("./datasets/odyssey.mb.txt")
rdd.
  flatMap(l => l.split(" ")).       // Split file in words
  map(x => (x, 1)).                 // Create key,1 pairs
  reduceByKey((acc, x) => acc + x). // Count occurences of same pairs
  sortBy(x => x._2, false).         // Sort by number of occurences
  take(50).                         // Take the first 50 results
  foreach(println)

In Python

text_file = sc.textFile("odyssey.mb.txt")
counts = text_file.flatMap(lambda line: line.split(" ")). \
             map(lambda word: (word, 1)). \
             reduceByKey(lambda a, b: a + b)

Wait, what? Python?

Yes. RDDs are implemented in Scala, Java, Python and R.

Spark itself is implemented in Scala, internally using the Akka actor framework to handle distributed state and Netty to handle networking.

For Python, Spark uses Py4J, which allows Python programs to access Java objects in a remote JVM. The PySpark API is designed to do most computations in the remote JVM; if processing needs to happen in Python, data must be copied; this incurs a performance penalty.

How to create an RDD?

RDDs can only be created in the following 3 ways

  1. Reading data from external sources
val rdd1 = sc.textFile("hdfs://...")
val rdd2 = sc.textFile("file://odyssey.txt")
val rdd3 = sc.textFile("s3://...")
  1. Convert a local memory dataset to a distributed one
val xs: Range[Int] = Range(1, 10000)
val rdd: RDD[Int] = sc.parallelize(xs)
  1. Transform an existing RDD
rdd.map(x => x.toString) //returns an RDD[String]

RDDs are lazy!

There are two types of operations we can do on an RDD:

  • Transformation: Applying a function that returns a new RDD. They are lazy.

  • Action: Request the computation of a result. They are eager.

// This just sets up the pipeline
val result = rdd.
  flatMap(l => l.split(" ")).
  map(x => (x, 1))

// Side-effect, triggers computation
result.foreach(println)

Common transformations on RDDs

map: Apply \(f\) on all items in the RDD and return an RDD of the result.

\(RDD[A].map(f: A \rightarrow B) : RDD[B]\)

flatMap: Apply \(f\) on all RDD contents, return an RDD with the contents of all intermediate iterators.

\(RDD[A].flatMap(f: A \rightarrow Iterable[B]): RDD[B]\)

filter: Apply predicate \(p\), return items that satisfy it.

\(RDD[A].filter(p: A \rightarrow Boolean): RDD[A]\)

Examples of RDD transformations

All uses of articles in the Odyssey

val odyssey = sc.textFile("datasets/odyssey.mb.txt").
                 flatMap(_.split(" "))

odyssey.map(_.toLowerCase).
        filter(Seq("a", "the").contains(_))

Q: How can we find uses of all regular verbs in past tense?

odyssey.filter(x => x.endsWith("ed"))

Q: How can we remove all punctuation marks?

odyssey.map(x => x.replaceAll("\\p{Punct}|\\d", ""))

Common actions on RDDs

collect: Return all elements of an RDD

\(RDD[A].collect() : [A]\)

take: Return the first n elements of the RDD

\(RDD[A].take(n): Array[A]\)

reduce, fold: Combine all elements to a single result of the same time.

\(RDD[A].reduce(f: (A, A) \rightarrow A): A\)

aggregate: Aggregate the elements of each partition, and then the results for all the partitions

\(RDD[A].aggr(init: B)(seqOp: (B, A) \rightarrow B, combOp: (B, B) \rightarrow B): B\)

Examples of RDD actions

How many words are there?

val odyssey = sc.textFile("datasets/odyssey.mb.txt").flatMap(_.split(" "))

odyssey.map(x => 1).reduce((a,b) => a + b)

How can we sort the RDD?

odyssey.sortBy(x => x)

How can we sample data from the RDD?

val (train, test) = odyssey.randomSplit(Array(0.8, 0.2))

Pair RDDs

RDDs can represent any complex data type, if it can be iterated. Spark treats RDDs of the type RDD[(K,V)] as special, named PairRDDs, as they can be both iterated and indexed.

Operations such as join are only defined on Pair RDDs, meaning that we can only combine RDDs if their contents can be indexed.

We can create Pair RDDs by applying an indexing function or by grouping records:

val rdd = List("foo", "bar", "baz").parallelize // RDD[String]
val pairRDD = rdd.map(x => (x.charAt(0), x))  // RDD[(Char, String)]
pairRDD.collect
// Array((f,foo), (b,bar), (b,baz))
val pairRDD2 = rdd.groupBy(x => x.charAt(0))  // RDD[(Char, Iterable(String))]
pairRDD2.collect
//Array((b,CompactBuffer(bar, baz)), (f,CompactBuffer(foo)))

Transformations on Pair RDDs

The following functions are only available on RDD[(K,V)]

reduceByKey: Merge the values for each key using an associative and commutative reduce function

\(reduceByKey(f: (V, V) \rightarrow V): RDD[(K, V)]\)

aggregateByKey: Aggregate the values of each key, using given combine functions and a neutral “zero value”

\(aggrByKey(zero: U)(f: (U, V) \rightarrow U, g: (U, U) \rightarrow U): RDD[(K, U)]\)

join: Return an RDD containing all pairs of elements with matching keys

\(join(b: RDD[(K, W)]): RDD[(K, (V, W))]\)

Pair RDD example: aggregateByKey

How can we count the number of occurrences of part of speach elements?

object PartOfSpeach {
  sealed trait EnumVal
  case object Verb extends EnumVal
  case object Noun extends EnumVal
  case object Article extends EnumVal
  case object Other extends EnumVal
  val partsOfSpeach = Seq(Verb, Noun, Article, Other)
}

def partOfSpeach(w: word): PartOfSpeach = ...

odyssey.groupBy(partOfSpeach).
        aggregateByKey(0)((acc, x) => acc + 1,
                          (x, y) => x + y)

D: What type conversions take place here?

Pair RDD example: join

case class Person(id: Int, name: String)
case class Addr(id: Int, person_id: Int,
                address: String, number: Int)

val pers = sc.textFile("pers.csv") // id, name
val addr = sc.textFile("addr.csv") // id, person_id, street, num

val ps = pers.map(_.split(",")).map(x => Person(x(0).toInt, x(1)))
val as = addr.map(_.split(",")).map(x => Addr(x(0).toInt, x(1).toInt,
                                             x(2), x(3).toInt))

Q: What are the types of ps and as? How can we join them?

val pairPs = ps.keyBy(_.id)
val pairAs = as.keyBy(_.person_id)

val addrForPers = pairAs.join(pairPs) // RDD[(Int, (Addr, Person))]

Join types

Given a “left” RDD[(K,A)] and a “right” RDD[(K,B)]

  • Inner Join (join): The result contains only records that have the keys in both RDDs.
  • Outer joins (left/rightOuterJoin): The result contains records that have keys in either the “left” or the “right” RDD in addition to the inner join results.

\(left.loj(right) : RDD[(K,(A, Option[B]))]\) \(left.roj(right) : RDD[(K,(Option[A], B))]\)

  • Full outer join: The result contains records that have keys in any of the “left” or the “right” RDD in addition to the inner join results.

\(left.foj(right) : RDD[(K,(Option[A], Option[B]))]\)

RDDs under the hood

Internals

From the RDD.scala, line 61.

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs
  • Optionally, a list of preferred locations to compute each split on

D: Why does an RDD need are all those?

Partitions

Even though RDDs might give the impression of continuous memory blocks spread across a cluster, data in RDDs is split into partitions.

Partitions define a unit of computation and persistence: any Spark computation transforms a partition to a new partition.

If during computation a machine fails, Spark will redistribute its partitions to other machines and restart the computation one those partitions only.

The partitioning scheme of an application is configurable; better configurations lead to better performance.

How does partitioning work?

Spark supports 3 types of partitioning schemes:

  • Default partitioning: Splits in equally sized partitions, without knowing the underlying data properties.

Extended partitioning is only configurable on Pair RDDs.

  • Range partitioning: Takes into account the natural order of keys to split the dataset in the required number of partitions. Requires keys to be naturally ordered and keys to be equally distributed across the value range.

  • Hash partitioning: Calculates a hash over the each item key and then produces the modulo of this hash to determine the new partition. This is equivalent to:

key.hashCode() % numPartitions

Partition dependencies

Partition Dependencies

Narrow dependencies: Each partition of the source RDD is used by at most one partition of the target RDD

Wide dependencies: Multiple partitions in the target RDD depend on a single partition in the source RDD.

RDD lineage

RDDs contain information on how to compute themselves, including dependencies to other RDDs. Effectively, RDDs create a directed acyclic graph of computations.

To demonstrate this, consider the following example:

val rdd1 = sc.parallelize(0 to 10)
val rdd2 = sc.parallelize(10 to 100)
val rdd3 = rdd1.cartesian(rdd2)
val rdd4 = rdd3.map(x => (x._1 + 1, x._2 + 1))

Q: What are the computation dependencies here?

scala> rdd4.toDebugString
res3: String =
(16) MapPartitionsRDD[3] at map at <console>:30 []
 |   CartesianRDD[2] at cartesian at <console>:28 []
 |   ParallelCollectionRDD[0] at parallelize at <console>:24 []
 |   ParallelCollectionRDD[1] at parallelize at <console>:24 []

Shuffling

When operations need to calculate results using a common characteristic (e.g. a common key), this data needs to reside on the same physical node. This is the case with all “wide dependency” operations. The process of re-arranging data so that similar records end up in the same partitions is called shuffling.

Shuffling is very expensive as it involves moving data across the network and possibly spilling them to disk (e.g. if too much data is computed to be hosted on a single node). Avoid it at all costs!

See this link for a great write up on Spark shuffling

Shuffling example

Shuffling explained

Persistence

Data in RDDs is stored in three ways:

  • As Java objects: Each item in an RDD is an allocated object
  • As serialized data: Special (usually memory-efficient) formats. Serialization is more CPU intensive, but faster to send across the network or write to disk.
  • On the filesystem: In case the RDD is too big, it can be mapped on a file system, usually HDFS.

Persistence in Spark is configurable and can be used to store frequently used computations in memory, e.g.:

val rdd = sc.textFile("hdfs://host/foo.txt")
val persisted = rdd.map(x => x + 1).persist(StorageLevel.MEMORY_ONLY_SER)

persisted is now cached. Further accesses will avoid reloading it and applying the map function.

Spark applications

Spark cluster architecture

Spark cluster architecture

  • Executors do the actual processing; worker nodes can contain multiple executors.
  • The driver accepts user programs and returns processing results
  • The cluster manager does resource allocation

A Spark application

A Spark application:

  • begins by specifying the required cluster resources, which the cluster manager seeks to fulfil. If resources are available, executors are started on worker nodes.
  • creates a Spark context, which acts as the driver.
  • issues a number of jobs, which load data partitions on the executor heap and run in threads on the executor’s CPUs
  • when it finishes, the cluster manager frees the resources.

What is a Spark job?

Jobs are created when an action is requested. Spark walks the RDD dependency graph backwards and builds a graph of stages.

Stages are jobs with wide dependencies. When such an operation is requested (e.g. groupByKey or sort) the Spark scheduler will need to reshuffle/repartition the data. Stages (per RDD) are always executed serially. Each stage consists of one or more tasks.

Tasks is the minimum unit of execution; a task applies a narrow dependency function on a data partition. The cluster manager starts as many jobs as the data partitions.

A job graph

We can see how our job executes in we connect to our driver’s WebUI (port 4040 on the driver machine).

Here is the graph for the word counting job we saw before.

Word Count job graph

Requesting resources

Here is an example of how to start an application with a custom resource configuration.

spark-shell   \
    --master spark://spark.master.ip:7077 \
    --deploy-mode cluster  \
    --driver-cores 12
    --driver-memory 5g \
    --num-executors 52 \
    --executor-cores 6 \
    --executor-memory 30g

Fault tolerance

Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.

Recomputing happens at the stage level.

To minimize recompute time, we can use checkpointing. With checkpointing we can save job stages to reliable storage. Checkpointing effective truncates the RDD lineage graph.

Spark clusters are reliable to node failures, but not to master failures. Running Spark on middleware such as YARN or Mesos is the only way to run multi-master setups.

Effective Spark

RDDs and formatted data

RDDs by default do not impose any format on the data they store. However, if the data is formatted (e.g. log lines with known format), we can create a schema and have the Scala compiler type-check our computations.

Consider the following data (common log format):

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] \
 "GET /apache_pb.gif HTTP/1.0" 200 2326

We can map this data to a Scala case class

case class LogLine(ip: String, id: String, user: String,
                   dateTime: Date, req: String, resp: Int,
                   bytes: Int)

and use a regular expression to parse the data:

([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)

Then, we can use flatMap in combination with Scala’s pattern matching to filter out bad lines:

import java.text.SimpleDateFormat
import java.util.Date

val dateFormat = "d/M/y:HH:mm:ss Z"
val regex = """([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)""".r
val rdd = sc.
    textFile("access-log.txt").
    flatMap ( x => x match {
      case regex(ip, id, user, dateTime, req, resp, bytes) =>
        val df = new SimpleDateFormat(dateFormat)
        new Some(LogLine(ip, id, user, df.parse(dateTime),
                         req, resp.toInt, bytes.toInt))
      case _ => None
      })

Then, we can compute the total traffic per month

val bytesPerMonth =
  rdd.
    groupBy(k => k.dateTime.getMonth).
    aggregateByKey(0)({(acc, x) => acc + x.map(_.bytes).sum},
                      {(x,y) => x + y})

Notice that all code on this slide is type checked!

Connecting to databases

The data sources that Spark can use go beyond textFiles. Spark can connect to databases such as

  • MongoDB
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/github.events"))
sc.loadFromMongoDB(readConfig),
val events = MongoSpark.load(sc, readConfig)

events.count
  • MySQL or Postgres over JDBC
val users = spark.read.format("jdbc").options(
  Map("url" ->  "jdbc:mysql://localhost:3306/ghtorrent?user=root&password=",
  "dbtable" -> "ghtorrent.users",
  "fetchSize" -> "10000"
  )).load()

users.count

or to distributed file systems like HDFS, Amazon S3, Azure Data Lake etc

Optimizing Partitioning

Partitioning becomes an important consideration when we need to run iterative algorithms. Some cases benefit a lot from defining custom partitioning schemes:

  • Joins between a large, almost static dataset with a much smaller, continuosly updated one.
  • reduceByKey or aggregateByKey on RDDs with arithmetic keys benefit from range partitioning as the shuffling stage is minimal (or none) because reduction happens locally!

Broadcasts

From the docs: Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Broadcasts are often used to ship precomputed items, e.g. lookup tables or machine learning models, to workers so that they do not have to retransfer them on every shuffle.

With broadcasts, we can implement efficient in-memory joins between a processed dataset and a lookup table.

val curseWords = List("foo", "bar") // Use your imagination here!
val bcw = sc.broadcast(curseWords)

odyssey.filter(x => !curseWords.contains(x))

Broadcasted data should be relatively big and immutable.

Accumulators

Some times we need to keep track of variables like performance counters, debug values or line counts while computations are running.

// Bad code
var taskTime = 0L
odyssey.map{x =>
  val ts = System.currentTimeMillis()
  val r = foo(x)
  taskTime += (System.currentTimeMillis() - ts)
  r
}

To make it work, we need to define an accumulator

val taskTime = sc.accumulator(0L)
odyssey.map{x =>
  val ts = System.currentTimeMillis()
  val r = foo(x)
  taskTime += (System.currentTimeMillis() - ts)
  r
}

Using accumulators is a side-effecting operation and should be avoided as it complicates design. It is important to understand that accumulators are aggregated at the driver, so frequent writes will cause large amounts of network traffic.

Image credits

Bibliography

[1]
M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: Cluster computing with working sets.” HotCloud, vol. 10, no. 10–10, p. 95, 2010.
[2]
Y. Yu et al., “DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language,” in Proceedings of the 8th USENIX conference on operating systems design and implementation, 2008, pp. 1–14.
[3]
C. Chambers et al., “FlumeJava: Easy, efficient data-parallel pipelines,” in ACM SIGPLAN conference on programming language design and implementation (PLDI), 2010, pp. 363–375.