Parallelism is about speeding up computations by utilising clusters of machines.
Distributed data parallelism involves splitting the data over several distributed nodes, where nodes work in parallel, and combine the individual results to come up with a final one.
This means that our programming model and execution should hide (but not forget!) those.
What is good about Hadoop: Fault tolerance
Hadoop was the first framework to enable computations to run on 1000s of commodity computers.
What is wrong: Performance
Microsoft’s DryadLINQ combined the Dryad distributed execution engine with the LINQ language for defining queries.
Google’s FlumeJava attempted to provide a few simple abstractions for programming data-parallel computations. These abstractions are higher-level than those provided by MapReduce, and provide better support for pipelines.
String,Integer> wordsWithOnes =
PTable<parallelDo(
words.new DoFn<String, Pair<String,Integer>>() {
void process(String word,
String,Integer>> emitFn) {
EmitFn<Pair<emit(Pair.of(word, 1));
emitFn.
}tableOf(strings(), ints()));
},
String,Collection<Integer>>
PTable<groupByKey();
groupedWordsWithOnes = wordsWithOnes.
String,Integer> wordCounts =
PTable<combineValues(SUM_INTS); groupedWordsWithOnes.
Spark is on open source cluster computing framework.
RDDs are the core abstraction that Spark uses.
RDDs make datasets distributed over a cluster of machines look like a Scala collection. RDDs:
In practice, RDD[A]
works like Scala’s List[A]
In Scala
// http://classics.mit.edu/Homer/odyssey.mb.txt
val rdd = sc.textFile("./datasets/odyssey.mb.txt")
rdd.flatMap(l => l.split(" ")). // Split file in words
map(x => (x, 1)). // Create key,1 pairs
reduceByKey((acc, x) => acc + x). // Count occurences of same pairs
sortBy(x => x._2, false). // Sort by number of occurences
take(50). // Take the first 50 results
foreach(println)
In Python
= sc.textFile("odyssey.mb.txt")
text_file = text_file.flatMap(lambda line: line.split(" ")). \
counts map(lambda word: (word, 1)). \
lambda a, b: a + b) reduceByKey(
Yes. RDDs are implemented in Scala, Java, Python and R.
Spark itself is implemented in Scala, internally using the Akka actor framework to handle distributed state and Netty to handle networking.
For Python, Spark uses Py4J, which allows Python programs to access Java objects in a remote JVM. The PySpark API is designed to do most computations in the remote JVM; if processing needs to happen in Python, data must be copied; this incurs a performance penalty.
RDDs can only be created in the following 3 ways
val rdd1 = sc.textFile("hdfs://...")
val rdd2 = sc.textFile("file://odyssey.txt")
val rdd3 = sc.textFile("s3://...")
val xs: Range[Int] = Range(1, 10000)
val rdd: RDD[Int] = sc.parallelize(xs)
map(x => x.toString) //returns an RDD[String] rdd.
There are two types of operations we can do on an RDD:
Transformation: Applying a function that returns a new RDD. They are lazy.
Action: Request the computation of a result. They are eager.
// This just sets up the pipeline
val result = rdd.
flatMap(l => l.split(" ")).
map(x => (x, 1))
// Side-effect, triggers computation
foreach(println) result.
map
: Apply \(f\) on all items in the RDD and return an RDD of the result.\(RDD[A].map(f: A \rightarrow B) : RDD[B]\)
flatMap
: Apply \(f\) on all RDD contents, return an RDD with the contents of all intermediate iterators.\(RDD[A].flatMap(f: A \rightarrow Iterable[B]): RDD[B]\)
filter
: Apply predicate \(p\), return items that satisfy it.\(RDD[A].filter(p: A \rightarrow Boolean): RDD[A]\)
All uses of articles in the Odyssey
val odyssey = sc.textFile("datasets/odyssey.mb.txt").
flatMap(_.split(" "))
map(_.toLowerCase).
odyssey.filter(Seq("a", "the").contains(_))
Q: How can we find uses of all regular verbs in past tense?
filter(x => x.endsWith("ed")) odyssey.
Q: How can we remove all punctuation marks?
map(x => x.replaceAll("\\p{Punct}|\\d", "")) odyssey.
collect
: Return all elements of an RDD\(RDD[A].collect() : [A]\)
take
: Return the first n elements of the RDD\(RDD[A].take(n): Array[A]\)
reduce
, fold
: Combine all elements to a single result of the same time.\(RDD[A].reduce(f: (A, A) \rightarrow A): A\)
aggregate
: Aggregate the elements of each partition, and then the results for all the partitions\(RDD[A].aggr(init: B)(seqOp: (B, A) \rightarrow B, combOp: (B, B) \rightarrow B): B\)
How many words are there?
val odyssey = sc.textFile("datasets/odyssey.mb.txt").flatMap(_.split(" "))
map(x => 1).reduce((a,b) => a + b) odyssey.
How can we sort the RDD?
sortBy(x => x) odyssey.
How can we sample data from the RDD?
val (train, test) = odyssey.randomSplit(Array(0.8, 0.2))
RDDs can represent any complex data type, if it can be iterated. Spark treats RDDs of the type RDD[(K,V)]
as special, named PairRDDs, as they can be both iterated and indexed.
Operations such as join
are only defined on Pair RDDs, meaning that we can only combine RDDs if their contents can be indexed.
We can create Pair RDDs by applying an indexing function or by grouping records:
val rdd = List("foo", "bar", "baz").parallelize // RDD[String]
val pairRDD = rdd.map(x => (x.charAt(0), x)) // RDD[(Char, String)]
collect
pairRDD.// Array((f,foo), (b,bar), (b,baz))
val pairRDD2 = rdd.groupBy(x => x.charAt(0)) // RDD[(Char, Iterable(String))]
collect
pairRDD2.//Array((b,CompactBuffer(bar, baz)), (f,CompactBuffer(foo)))
The following functions are only available on RDD[(K,V)]
reduceByKey
: Merge the values for each key using an associative and commutative reduce function\(reduceByKey(f: (V, V) \rightarrow V): RDD[(K, V)]\)
aggregateByKey
: Aggregate the values of each key, using given combine functions and a neutral “zero value”\(aggrByKey(zero: U)(f: (U, V) \rightarrow U, g: (U, U) \rightarrow U): RDD[(K, U)]\)
join
: Return an RDD containing all pairs of elements with matching keys\(join(b: RDD[(K, W)]): RDD[(K, (V, W))]\)
aggregateByKey
How can we count the number of occurrences of part of speach elements?
object PartOfSpeach {
sealed trait EnumVal
case object Verb extends EnumVal
case object Noun extends EnumVal
case object Article extends EnumVal
case object Other extends EnumVal
val partsOfSpeach = Seq(Verb, Noun, Article, Other)
}
def partOfSpeach(w: word): PartOfSpeach = ...
groupBy(partOfSpeach).
odyssey.aggregateByKey(0)((acc, x) => acc + 1,
(x, y) => x + y)
D: What type conversions take place here?
join
case class Person(id: Int, name: String)
case class Addr(id: Int, person_id: Int,
address: String, number: Int)
val pers = sc.textFile("pers.csv") // id, name
val addr = sc.textFile("addr.csv") // id, person_id, street, num
val ps = pers.map(_.split(",")).map(x => Person(x(0).toInt, x(1)))
val as = addr.map(_.split(",")).map(x => Addr(x(0).toInt, x(1).toInt,
x(2), x(3).toInt))
Q: What are the types of ps
and as
? How can we join them?
val pairPs = ps.keyBy(_.id)
val pairAs = as.keyBy(_.person_id)
val addrForPers = pairAs.join(pairPs) // RDD[(Int, (Addr, Person))]
Given a “left” RDD[(K,A)]
and a “right” RDD[(K,B)]
join
): The result contains only records that have the keys in both RDDs.left/rightOuterJoin
): The result contains records that have keys in either the “left” or the “right” RDD in addition to the inner join results.\(left.loj(right) : RDD[(K,(A, Option[B]))]\) \(left.roj(right) : RDD[(K,(Option[A], B))]\)
\(left.foj(right) : RDD[(K,(Option[A], Option[B]))]\)
From the RDD.scala, line 61.
Internally, each RDD is characterized by five main properties:
D: Why does an RDD need are all those?
Even though RDDs might give the impression of continuous memory blocks spread across a cluster, data in RDDs is split into partitions.
Partitions define a unit of computation and persistence: any Spark computation transforms a partition to a new partition.
If during computation a machine fails, Spark will redistribute its partitions to other machines and restart the computation one those partitions only.
The partitioning scheme of an application is configurable; better configurations lead to better performance.
Spark supports 3 types of partitioning schemes:
Extended partitioning is only configurable on Pair RDDs.
Range partitioning: Takes into account the natural order of keys to split the dataset in the required number of partitions. Requires keys to be naturally ordered and keys to be equally distributed across the value range.
Hash partitioning: Calculates a hash over the each item key and then produces the modulo of this hash to determine the new partition. This is equivalent to:
hashCode() % numPartitions key.
Narrow dependencies: Each partition of the source RDD is used by at most one partition of the target RDD
Wide dependencies: Multiple partitions in the target RDD depend on a single partition in the source RDD.
RDDs contain information on how to compute themselves, including dependencies to other RDDs. Effectively, RDDs create a directed acyclic graph of computations.
To demonstrate this, consider the following example:
val rdd1 = sc.parallelize(0 to 10)
val rdd2 = sc.parallelize(10 to 100)
val rdd3 = rdd1.cartesian(rdd2)
val rdd4 = rdd3.map(x => (x._1 + 1, x._2 + 1))
Q: What are the computation dependencies here?
scala> rdd4.toDebugString
res3: String =
(16) MapPartitionsRDD[3] at map at <console>:30 []
| CartesianRDD[2] at cartesian at <console>:28 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
| ParallelCollectionRDD[1] at parallelize at <console>:24 []
When operations need to calculate results using a common characteristic (e.g. a common key), this data needs to reside on the same physical node. This is the case with all “wide dependency” operations. The process of re-arranging data so that similar records end up in the same partitions is called shuffling.
Shuffling is very expensive as it involves moving data across the network and possibly spilling them to disk (e.g. if too much data is computed to be hosted on a single node). Avoid it at all costs!
See this link for a great write up on Spark shuffling
Data in RDDs is stored in three ways:
Persistence in Spark is configurable and can be used to store frequently used computations in memory, e.g.:
val rdd = sc.textFile("hdfs://host/foo.txt")
val persisted = rdd.map(x => x + 1).persist(StorageLevel.MEMORY_ONLY_SER)
persisted
is now cached. Further accesses will avoid reloading it and applying the map
function.
A Spark application:
Jobs are created when an action is requested. Spark walks the RDD dependency graph backwards and builds a graph of stages.
Stages are jobs with wide dependencies. When such an operation is requested (e.g. groupByKey
or sort
) the Spark scheduler will need to reshuffle/repartition the data. Stages (per RDD) are always executed serially. Each stage consists of one or more tasks.
Tasks is the minimum unit of execution; a task applies a narrow dependency function on a data partition. The cluster manager starts as many jobs as the data partitions.
We can see how our job executes in we connect to our driver’s WebUI (port 4040 on the driver machine).
Here is the graph for the word counting job we saw before.
Here is an example of how to start an application with a custom resource configuration.
spark-shell \
--master spark://spark.master.ip:7077 \
--deploy-mode cluster \
--driver-cores 12
--driver-memory 5g \
--num-executors 52 \
--executor-cores 6 \
--executor-memory 30g
Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.
Recomputing happens at the stage level.
To minimize recompute time, we can use checkpointing. With checkpointing we can save job stages to reliable storage. Checkpointing effective truncates the RDD lineage graph.
Spark clusters are reliable to node failures, but not to master failures. Running Spark on middleware such as YARN or Mesos is the only way to run multi-master setups.
RDDs by default do not impose any format on the data they store. However, if the data is formatted (e.g. log lines with known format), we can create a schema and have the Scala compiler type-check our computations.
Consider the following data (common log format):
127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] \
"GET /apache_pb.gif HTTP/1.0" 200 2326
We can map this data to a Scala case class
case class LogLine(ip: String, id: String, user: String,
dateTime: Date, req: String, resp: Int, bytes: Int)
and use a regular expression to parse the data:
([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)
Then, we can use flatMap
in combination with Scala’s pattern matching to filter out bad lines:
import java.text.SimpleDateFormat
import java.util.Date
val dateFormat = "d/M/y:HH:mm:ss Z"
val regex = """([^\s]+) ([^\s]+) ([^\s]+) ([^\s]+) "(.+)" (\d+) (\d+)""".r
val rdd = sc.
textFile("access-log.txt").
flatMap ( x => x match {
case regex(ip, id, user, dateTime, req, resp, bytes) =>
val df = new SimpleDateFormat(dateFormat)
new Some(LogLine(ip, id, user, df.parse(dateTime),
toInt, bytes.toInt))
req, resp.case _ => None
})
Then, we can compute the total traffic per month
val bytesPerMonth =
rdd.groupBy(k => k.dateTime.getMonth).
aggregateByKey(0)({(acc, x) => acc + x.map(_.bytes).sum},
{(x,y) => x + y})
Notice that all code on this slide is type checked!
The data sources that Spark can use go beyond textFile
s. Spark can connect to databases such as
val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/github.events"))
loadFromMongoDB(readConfig),
sc.val events = MongoSpark.load(sc, readConfig)
count events.
val users = spark.read.format("jdbc").options(
"url" -> "jdbc:mysql://localhost:3306/ghtorrent?user=root&password=",
Map("dbtable" -> "ghtorrent.users",
"fetchSize" -> "10000"
load()
)).
count users.
or to distributed file systems like HDFS, Amazon S3, Azure Data Lake etc
Partitioning becomes an important consideration when we need to run iterative algorithms. Some cases benefit a lot from defining custom partitioning schemes:
reduceByKey
or aggregateByKey
on RDDs with arithmetic keys benefit from range partitioning as the shuffling stage is minimal (or none) because reduction happens locally!From the docs: Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Broadcasts are often used to ship precomputed items, e.g. lookup tables or machine learning models, to workers so that they do not have to retransfer them on every shuffle.
With broadcasts, we can implement efficient in-memory joins between a processed dataset and a lookup table.
val curseWords = List("foo", "bar") // Use your imagination here!
val bcw = sc.broadcast(curseWords)
filter(x => !curseWords.contains(x)) odyssey.
Broadcasted data should be relatively big and immutable.
Some times we need to keep track of variables like performance counters, debug values or line counts while computations are running.
// Bad code
var taskTime = 0L
map{x =>
odyssey.val ts = System.currentTimeMillis()
val r = foo(x)
currentTimeMillis() - ts)
taskTime += (System.
r }
To make it work, we need to define an accumulator
val taskTime = sc.accumulator(0L)
map{x =>
odyssey.val ts = System.currentTimeMillis()
val r = foo(x)
currentTimeMillis() - ts)
taskTime += (System.
r }
Using accumulators is a side-effecting operation and should be avoided as it complicates design. It is important to understand that accumulators are aggregated at the driver, so frequent writes will cause large amounts of network traffic.
This work is (c) 2017, 2018 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.