Name:
Student Number:
BSc CS/TI or minor student?:
The total number of points is 75. You have 180 minutes: Good Luck!
Please provide brief answers to the following questions. Be precice!
Relation: a set of tuples which share the same type(s). Keys are always unique. Key/Value pair: a 2-tuple where the key is the (unique) identifier for the value. It can contain duplicate keys.
Differences: in a relation the types are predefined while using key/value pairs this is not the case. This makes key-value pairs more flexible while relations provide more (type) safety.
Provide the function signatures for the following functions of the type List[A]
.
foldL: foldL(list: List[A], f: (acc: B, x: A) => B, init: B) : B
reduceR: reduceR(list: List[A], init: B, f: (acc: A, x: B) => B) : B
flatMap: flatMap(list: List[A], f: A => List[B]) : List[B]
scan: scan(list: List[A], f: (B, A) => B, init: B) : List[B]
groupBy: groupBy(list: List[A], f: => K) : Map[K, List[A]]
(2 points) Implement groupBy
for List[A]
using foldL
.
def groupBy(list: List[A], classifier : (A) => String)) : Map[String, List[A]] = {
def helper(map : Map[String, List[A]], item : A) : Map[String, List[A]] = {
var key = classifier(item)
if (map.contains(key)) {
val values: List[A] = (map.get(key)).get
return map + (key -> (values :+ item))
} else {
return map + (key -> (item :: Nil))
}
}
return foldL(list, Map[String, List[A]](), helper)
}
foldL
for List[A]
using foldR
def foldL(list: List[A], f: (B, A) => B, init: B) : B = {
var functionSwap = (x: A, y: B) => f(y, x)
return foldR(list.reverse, functionSwap, init)
}
leftJoin(xs:[(K, A)], ys:[(K, B)])
for KV pairs xs
and ys
where the type K
signifies the key. What is the return type?def leftJoin[K,A,B](xs:List[(K, A)], ys:List[(K, B)]) : List[(K, (A, Option[B]))] = {
xs.flatMap { x =>
val inBoth = ys.filter(y => y._1 == x._1)
if (inBoth.size > 0) {
inBoth.map(i => (x._1, (x._2, Some(i._2))))
} else {
List((x._1, (x._2, None)))
}
}
}
val xs = List((1,2), (2,3), (1,4))
val ys = List((1,2))
leftJoin(xs, ys)
Example 1, in Scala
var sum = 0
for (i <- 1 to 10) {
sum = sum + i
}
var toSum = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
var summed = toSum.reduce(_ + _)
Example 2, in Python
x = {}
for i in [1,2,3,4,5,6]:
key = i % 2
if key in x.keys():
x[key].append(i)
else:
x[key] = [i]
val toGroup = List(1, 2, 3, 4, 5, 6)
toGroup.groupBy(x => x % 2)
They are a design pattern which define how functions can be used together to build generic types. They wrap values, that can only be transformed to another monad. They are useful to deal with side-effects.
Example: Option[T]
, which deals with null values, or Future[T]
, which masks network latency.
Amdahl’s law is a formula to calculate the maximum improvement if you parallelize part of the system. It gives us an upper bound to the speedup that we can obtain through parallelization. One implication is that performance gains through paralellization drop sharply even if a very small fraction (e.g. 5%) of our computation is serial.
The CAP conjecture consists of the following guarantees: - Consistency - Availability - Partition tolerance
It is speculated that a distributed system can only provide 2 of those 3 guarantees. In practice, a system can provide all 3 guarantees, while the network is working.
master-slave:
benefit: Reads can scale
drawback: Bad availability if the master crashes
master-master:
benefit: If a master crashes, there are still other masters available. Writes can scale.
drawback: Might run in conflicts with concurrent writes
leaderless replication:
benefit: Node failures can be tolerated more easily
drawback: Client has to handle synchronization conflicts
A: atomic: a transaction is either completely executed or not.
C: consistent: a transaction will bring the database from one valid state to another.
I: isolated: transactions cannot read data from each other
(depending on the isolation level) if a transaction is not yet completed.
D: durable: if a transaction has been committed, it will be remain so, even after a crash or error
data * replication factor = 128 * 3 = 384GB total data
384 / 5 datanodes = 76.8GB per data node
(76.8 * 1000MB) / 64MB = 1200 blocks
The client requests the NameNode to write a file. The NameNode provides the address of the DataNodes. Then the client directly writes the data on the DataNodes. Internally the DataNodes will replicate the data 3 times. Once the data is replicated, the DataNode sends an acknowledgment to the client.
(10 points) You are tasked to design a system that reliably delivers messages from one client to another (think Whatsapp). The process to deliver a message from client A to client B is the following:
The system needs to work 24/7 and be resilient to database and queue failures. You will be using ready-made databases and queues that support replication, sharding and transactions. How would you design it? Discuss the advantages and disadvantages of your design decisions.
This is an open question with no single correct solution. Some elements that would lead to a 10 point grade in this question are the following:
Ordering of messages only depends on the source (client A); therefore, a timestamp-based ordering is enough to guarantee that client B sees A’s messages in the correct order. If two messages arrive out of order on B, B can just sort them based on the timestamp.
Both the queue and the database need to be both highly available and high performance. This means that they need to employ replication. Multi-master replication can work nicely for the queue as it is write intensive and ordering of messages does not need to be maintained within the system. The database requires transactional semantics both for writing messages and acknoweledgements, so a simpler single-master replication scheme might be more suitable.
The queue and the database need to co-ordinate writes; to do this in a way that no messages are lot between 2 indpendent systems, we need to employ 2 phase commits.
It is difficult to scale millions of clients (B) connected to a single database that continuously ask the database whether there are new messages for them. So what we need is an Observation read pattern, where each client B connects to an intermediate system that sits between the database and the client and just maintains an open connection to B. When a new message transaction is committed in the database, the database notifies the intermediate system about a new message for each client B. The notification can happen through, for example, a stored procedure. The intermediate system then pushes the new message to B.
The intermediate system can also acknoweledge messages to the database after it pushes them to B and B acknoweledges receipt. There are no transaction requirements as the writes only affect a single record.
You are given the following dataset; it contains citation information in the IEEE CS format (lines that end with \
continue to the next line):
S. Ryza, U. Laserson, S. Owen, and J. Wills, Advanced analytics with spark: \
Patterns for learning from data at scale. O’Reilly Media, Inc., 2015.
H. Karau, A. Konwinski, P. Wendell, and M. Zaharia, Learning spark: \
Lightning-fast big data analysis. O’Reilly Media, Inc., 2015.
B. Chambers and M. Zaharia, Spark: The definitive guide. O’Reilly Media, Inc., 2017.
M. Kleppmann, Designing data-intensive applications. O’Reilly Media, Inc., 2017.
H. Karau and R. Warren, High performance spark. O’Reilly Media, Inc., 2017.
T. H. Cormen, C. E. Leiserson, Ronald L. Rivest, and C. Stein, Introduction \
to algorithms (3rd ed.). MIT press, 2009.
P. Louridas, Real world algorithms. MIT press, 2017.
The format looks like this:
author1, author2, ... , authorN, title. publisher, year.
def parser(input: String) : (Array[String], String, String, Integer) = {
var authors = input.split(",")
var last2Elements = authors.takeRight(2)
var titlePublisher = last2Elements(0).split(".", 2) //split on . max 2 elements
(authors.dropRight(2), titlePublisher(0), titlePublisher(1), last2Elements(1))
}
val rdd = spark.textFile(data).map(x = parser(x))
//convert to dataframe
rdd.toDF()
O’Reilly Media, Inc., 5
MIT press, 2
rdd.map(x => (x._3, 1)).reduceByKey((a, b) => a + b).sortBy(x => x._2, false).collect()
M. Zaharia
.rdd.map(x => x._1).flatMap(x => x).map(x => (x, 1)).reduceByKey((a, b) => a + b).sortBy(x => x._2, false).take(1)
reduceByKey and sortBy
Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.
A job is initiated when an action is called on a RDD. The RDD dependency graph is then evaluated backwards and a graph of stages is build. Each stage consists of multiple tasks. Tasks are scheduled in parallel on cluster nodes.
Only answer the following questions if you are a BSc TI student!
We need streaming windows to aggregate data on a relevant set of data.
direct messaging: send events directly to clients, drawback is that both the sources and clients have to be online at the same time.
message brokers: adds some sort of ‘queue’ between source and client also functioning as load balancer. drawback is that the broker needs to ‘bookkeep’ the events in the queue and this broker is not scalable.
partitioned logs: ‘distributed’ message brokers, so easily scalable. Drawback is that using partitioned logs requires a more complex setup.
Only answer the following questions if you are a minor student!
find . -type f |
xargs cat |
cut -f 1 -d ' ' |
sort |
uniq -c |
sort -n |
tail -n 10
For the next 2 questions, you are given a CSV file whose contents look like the following:
$ cat fileA
user_id, account_id, balance
10, 12, 1000
10, 1, 32
12, 122, 5
...
$ cat fileB
account_id, transaction_id, amount
12, 332, 21
122, 21, 20
...
fileA
sort fileA | uniq -c | sort -n | tr -s ' '|grep "^ [2-9]* .*"
join -1 2 -2 1 -t ',' <(cat fileA|sed '1d'|sort -n -k 2 -t ',') <(cat fileB|sed '1d'| sort -n)
Please select and circle only one answer per question.
(1 point) What is the correct function signature for reduce on Spark RDDs?
RDD[A].reduce(f: (A,B) -> B)
RDD[A].reduce(f: (A,A) -> A)
<—RDD[A].reduce(init: B, seqOp: (A, B) -> A, combOp: (B, B) -> B)
RDD[A].reduce(init:A, f: (A,A) -> A)
(1 point) Distributed systems:
(1 point) Lamport timestamps
(1 point) The serializable
transaction isolation level protects transcations against:
(1 point) Which of the following methods is part of the Observer
interface, for dealing with push-based data consumption?
def subscribe(obs: Observer[A]): Unit
def onNext(a: A): Unit
<—def map(f: (A) -> B): [B]
def onExit(): Unit
(1 point) An transformation in Spark:
(1 point) Which of the following is a likely computation order of applying reduceR
to a list of 10 integers with the ‘+’ operator?
(1 point) Collaborative filtering:
(1 point) What is precision in the context of Machine Learning? (\(TP\) = True Positive, \(FP\) = False Positive, \(TN\) = True Negative, \(FN\) = False Negative)
(1 point) What is Byzantine fault tolerance?
(1 point) What is eventual consistency?
(1 point) Which of the following RDD API calls is a performance killer in Spark?
reduceByKey
keyBy
groupByKey
<—aggregatebyKey
(1 point) Copy-on-write is a technique to:
(1 point) What is the correct signature for rightOuterJoin
on Spark RDDs?
RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
<–RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, W))]
RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
(1 point) A GFS chunkserver/HDFS datanode is responsible to: