In this section, we review the basic data types we use when processing data.
D: What types of data are more convenient when processing?
Sequences or Lists or Arrays represent consecutive items in memory
In Python:
= [1, 2, 3, 4] a
In Scala
val l = List(1,2,3,4)
Basic properties:
a[1]
or l.get(3)
Sets store values, without any particular order, and no repeated values.
val s = Set(1,2,3,4,4)
scala> collection.immutable.Set[Int] = Set(1, 2, 3, 4) s: scala.
Basic properties:
Maps or Dictionaries or Associative Arrays is a collection of (k,v)
pairs in such a way that each k
appears only once.
Some languages have build-in support for Dictionaries
= {'a' : 1, 'b' : 2} a
Basic properties:
A graph data structure consists of a finite set of vertices or nodes, together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph.
Graphs are usually represented as Map[Node, List[Edge]]
, where
case class Node(id: Int, attributes: Map[A, B])
case class Edge(a: Node, b: Node, directed: Option[Boolean],
weight: Option[Double] )
Ordered graphs without loops
= {"id": "5542101946", "type": "PushEvent",
a "actor": {
"id": 801183,
"login": "tvansteenburgh"
},"repo": {
"id": 42362423,
"name": "juju-solutions/review-queue"
}}
If we parse the above JSON in almost any language, we get a series of nested maps
"id" -> 5542101946L,
Map("type" -> "PushEvent",
"actor" -> Map("id" -> 801183.0, "login" -> "tvansteenburgh"),
"repo" -> Map("id" -> 4.2362423E7, "name" -> "juju-solutions/review-queue")
)
An \(n\)-tuple is a sequence of \(n\) elements, whose types are known.
val record = Tuple4[Int, String, String, Int]
1, 'Georgios', 'Mekelweg', 4) (
Scala makes it easy to declare and use tuples by automatically infering the types of the tuple contents.
val a = (1, ("Foo", 2)) // type: Tuple2[Int, Tuple2[String, Int]]
println(a._1) // prints 1
println(a._2._1) // prints Foo
A relation is a Set
of \(n\)-tuples \((d1, d2, ..., dn)\) of the same type; one of the tuple elements denotes a key. Keys cannot be repeated.
Relations are very important for data processing, as they form the theoretical framework (Relational Algebra) for relational (SQL) databases.
Typical operations on relations are insert, remove and join. Join allows us to compute new relations by joining existing ones on common fields.
val addr1 = (1, "Gebouw 35", "Mekelweg", 5)
val addr2 = (2, "Gebouw 36", "Drebbelweg", 4)
val addr = Set(addr1, addr2)
val georgios = (1, "Georgios", 1)
val wouter = (1, "Wouter", 2)
val joris = (1, "Joris", 4)
val people = Set(georgios, wouter, joris)
Q: How can we get a list of buildings and the people that work there?
A key/value pair (or K/V) is a more general type of a relation, where each key can appear more than once.
// We assume that the first Tuple element represents the key
val a = (1, ("Mekelweg", 4))
val b = (1, ("VMB", 6))
val kv = List(a, b)
Another way to represent K/V pairs is with a Map
val xs : Map[Int, List[(String, Int)]] =
1 -> List(("Mekelweg", 4), ("VMB", 6))) Map(
K and V are flexible: that’s why the Key/Value abstraction is key to NoSQL databases, including MongoDB, DynamoDB, Redis etc.
In this section, we discuss the basics of functional programming, as those apply to data processing with tools like Hadoop, Spark and Flink.
Functional programming is a programming paradigm that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data (Wikipedia).
Functional programming characteristics:
\(foo(x: [A], y: B) \rightarrow C\)
We read this as: Function foo takes as arguments an array/list of type A and an argument of type B and returns an argument of type C
Q: What does the following function signature mean? \(f(x:[A], y:(z: A) \rightarrow B) \rightarrow [B]\)
A function has a side effect if it modifies some state outside its scope or has an observable interaction with its calling functions or the outside world besides returning a value.
max = -1
def ge(a, b):
global max
if a >= b:
max = a ## <- Side effect!
return True
else:
max = b
return False
As a general rule, any function that returns nothing (void
or Unit
) does a side effect!
How can we write code that does something useful given those restrictions?
class Cafe {
def buyCoffee(cc: CreditCard): Coffee = {
val cup = new Coffee()
charge(cup.price)
cc.
cup
} }
Can you spot any problems with this code?
charge()
performs a side-effect: we need to contact the credit card company!buyCoffee()
is not testableclass Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = {
val cup = new Coffee()
charge(cc, cup.price)
p.
cup
} }
Slightly better option, but:
Payments
has to be an interfacePayments
mockQ: How can we buy 10 coffees?
class Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = { ... }
def buyCoffees(cc: CreditCard, p: Payments, num: Int) = {
for (i <- 1 to num) {
buyCoffee(cc, p)
}
} }
Seems to be working, but:
Idea: how about instead of charging in place, we decouple the action of buying coffee from that of charging for it?
class Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = {
val cup = new Coffee()
Charge(cc, cup.price))
(cup,
} }
Nice! We can now:
Charge
s in oneclass Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = { ... }
def buyCoffees(cc: CreditCard, num: Int): Seq[(Coffee, Charge)] =
1 to num).map(buyCoffee(cc))
(
def checkout(cc: CreditCard, charges: Seq[Charge]) : Seq[Charge] = {
charges.filter(charge => charge.cc == cc).
foldLeft(new Charge(cc, 0.0)){(acc, x) => acc.combine(x)}.
// <- side-effect, but once, in one place.
pay
filter(charge => charge.cc != cc)
charges.
} }
This example was adapted from the (awesome) FP in Scala book, by Chiusano and Bjarnason
A higher order function is a function that can take a function as an argument or return a function.
class Array[A] {
// Return elements that satisfy f
def filter(f: A => Boolean) : Array[A]
}
In the context of BDP, high-order functions capture common idioms of processing data as enumarated elements, e.g. going over all elements, selectively removing elements and aggregating them.
map(xs: List[A], f: A => B) : List[B]
Applies f
to all elements and returns a new list.
flatMap(xs: List[A], f: A => List[B]) : List[B]
Like map
, but flattens the result to a single list.
foldL(xs: List[A], f: (B, A) => B, init: B) : B
Takes f
of 2 arguments and an init value and combines the elements by applying f
on the result of each previous application. AKA reduce
.
groupBy(xs: List[A], f: A => K): Map[K, List[A]]
Partitions xs
into a map of traversable collections according to a discriminator function.
filter(xs: List[A], f: A => Boolean) : List[A]
Takes a predicate and returns all elements that satisfy it
scanL(xs: List[A], f: (B, A) => B, init: B) : List[B]
Like foldL
, but returns a list of all intermediate results
zip(xs: List[A], ys: List[B]): List[(A,B)]
Returns a iterable collection formed by iterating over the corresponding items of xs
and ys
.
Laziness is an evaluation strategy which delays the evaluation of an expression until its value is needed.
def primes(limit):
= [False]*limit
sieve
for j in range(2, limit):
if sieve[j]: continue
yield j
if j*j > limit: continue
for i in range(j, limit, j):
= True
sieve[i]
print([x for x in primes(100)][1:10])
## [3, 5, 7, 11, 13, 17, 19, 23, 29]
In tools like Spark and Flink, we always express computations in a lazy manner. This allows for optimizations before the actual computation is executed
# Word count in PySpark
= sc.textFile("words.txt")
text_file = text_file \
counts lambda line: line.split(" ")) \
.flatMap(map(lambda word: (word, 1)) \
.lambda a, b: a + b)
.reduceByKey(
"results.txt") counts.saveAsTextFile(
Q: In the code above, when will Spark compute the result?
A: When saveAsTextFile
is called!
Monads are a design pattern that defines how functions can be used together to build generic types. Practically, a monad is a value-wrapping type that:
identity
functionflatMap
function, that allows data to be transfered between monad typestrait Monad[M[_]] {
def unit[S](a: S) : M[S]
def flatMap[S, T] (m: M[S])(f: S => M[T]) : Monad[T]
}
Monads are the tool FP uses to deal with (side-)effects
Option[T]
Try[T]
Future[T]
flatMap
flatMap
enables us to join sequences of arbitrary types.
Example: Future
s wrap values that will be eventually available.
def callWebService(): Future[R]
def heavyComputation(r: R): Future[V]
val r = callWebService().flatMap(r => heavyComputation2(r))
Example: Option
s wrap values that may be null.
def getArgument(k: String): Option[Arg]
def process(a: Arg): Option[Result]
getArgument("foo").
flatMap(processArgument).
getOrElse(new Result("default"))
object Converter extends App {
def toInt(a: String): Try[Int] = Try{Integer.parseInt(a)}
def toString(b: Int): Try[String] = Try{b.toString}
val a = toInt("4").flatMap(x => toString(x))
println(a)
val b = toInt("foo").flatMap(x => toString(x))
println(b)
}
Try
is a type that can have 2 instances:
Success[T]
, where T
represents the type of the resultFailure[E]
, where E
represents the type of error, usually an exceptionpublic class Converter {
public Integer toInt(String a) throws NumberFormatException {
return Integer.parseInt(a)
}public String toString(Integer a) throws NullPointerException {
return b.toString();
}
public static void main(String[] args) {
try {
Integer five = toInt('5');
try {
return toString(five);
catch(NullPointerException e) {
} //baaah
}catch(NumberFormatException r) {
} //ooofff
}
} }
class Amazon {
def login(login: String, passwd: String): Future[Amazon]
def search(s: String): Future[Seq[String]]
}
class Main extends App {
val amazon = new Amazon()
val result =
login("uname", "passwd") // Might fail
amazon.flatMap(a => a.search("foo"))
. }
We now need to encode error handling, so that the compiler checks that we are doing it propertly.
D: Which of the following should our new search
return?
Seq[Future[Try]]
Future[Seq[Try]]
Future[Try[Seq]]
Before starting to process datasets, we need to be able to go over their contents in a systematic way. The process of visiting all items in a dataset is called traversal.
In a big data system:
There are two fundamental techniques for the client to process all available data in the data source
Iteration: The client asks the data source whether there are items left and then pulls the next item.
Observation: The data source pushes the next available item to a client end point.
D: What are the relative merits of each technique?
In the context of BDP, iteration allows us to process finite-sized data sets without loading them in memory at once.
trait Iterator[A] {
def hasNext: Boolean
def next(): A
}
Typical usage
val it = Array(1,2,3,4).iterator
while(it.hasNext) {
val i = it.next + 1
println(i)
}
The Iterator
pattern is supported in all programming languages.
Reading from a file, first in Scala
val data = scala.io.Source.fromFile("/big/data").getLines
while (data.hasNext) {
println(data.next)
}// Equivalently...
for (line <- data) {
println(line)
}
and then in Python
with open("/big/data","r") as data:
# readlines() returns an object that implements __iter__()
for line in data.readlines():
print line
Observation allows us to process (almost) unbounded size data sets, where the data source controls the processing rate
// Consumer
trait Observer[A] {
def onNext(a: A): Unit
def onError(t: Throwable): Unit
def onComplete(): Unit
}
// Producer
trait Observable[A] {
def subscribe(obs: Observer[A]): Unit
}
Typical usage
from(1,2,3,4,5).
Observable.map(x => x + 1).
subscribe(x => println(x))
Iterator-based map
. “Pulls” data out of array.
1,2,3,4).map(x => x + 1) // Scala Array(
map([1,2,3,4], lambda x: x + 1) # Python
Observation-based (reactive) map
. Data is “pushed” to it asynchronously, when new data is available.
from(1,2,3,4,5).map(x => x + 1) // Scala Observable.
1,2,3,4]).map(lambda x: x + 1) # Python Observable.from_([
D: (How) Can we convert between the two types of enumeration?
We apply a strategy to visit all individual items in a collection.
for i in [1,2,3]:
print i
for k,v in {"x": 1, "y": 2}:
print k
In case of nested data types (e.g. trees/graphs), we need to decide how to traverse. Common strategies include:
In most programming environments, traversal is implemented by iterators.
class Tree(object):
def __init__(self, title, children=None):
self.title = title
self.children = children or []
def __iter__(self): ## Depth first
yield self
for child in self.children:
for node in child:
yield node
Then, we can iterate using standard language constructs
= Tree("a", [Tree("b", [Tree("c"), Tree("d")])])
t for node in iter(t):
print node
Operations are transformations, aggregations or cross-referenceing of data stored in data types. All of container data types can be iterated.
We generally have two types of operations:
Element-wise ops apply a function to each individual message. This is the equivalent of map or flatMap in batch systems.
Aggregations group multiple events together and apply a reduction (e.g. fold or max) on them.
Suppose we have a list of people; each person is identified by a unique identifier, their age, their height in cm and their gender.
= {id: 10, age: 50, height: 180, weight: 75, gender: 'Male'} p
Now, let’s create 1000 random people!
from random import randint, seed
42) seed(
from random import randint, seed
42)
seed(
= []
people = ['Male', 'Female', 'Other']
genders for i in range(1000):
= {'id': i,
p 'age': randint(10,80),
'height': randint(60, 200),
'weight': randint(40, 120),
'gender': genders[randint(0,2)] }
people.append(p)
## people[0]: {'id': 0, 'age': 24, 'height': 66, 'weight': 75, 'gender': 'Male'}
To convert values, we traverse a collection and apply a convertion function to each individual element. This is generalized to the \(map\) function:
\(map(xs: [A], f: (A) \rightarrow B) \rightarrow [B]\)
Let’s convert the persons’ heights to meters
def to_m(person):
'height'] = person['height'] * 1.0 / 100
person[return person
= list(map(lambda x: to_m(x), people)) people
## people[0]: {'id': 0, 'age': 24, 'height': 0.66, 'weight': 75, 'gender': 'Male'}
Projection allows us to select parts of a Tuple, Relation or other nested data type for further processing. To implement this, we need to iterate over all items of a collection and apply a conversion function.
To filter values from a list, we traverse a collection and apply a predicate function to each individual element.
\(filter(xs: [A], f: (A) \rightarrow Boolean): [A]\)
Q: How can we implement filter?
def filter(xs, pred):
= []
result for i in xs:
if pred(i):
result.append(i)
return result
Aggregations apply a combining operator on a traversable sequence to aggregate the individual items into a single result.
Aggregation is implemented using reduction (or folding). Two variants exist: left reduction and right reduction.
Our task is to calculate the total weight for our people list. To do so, we need to iterate over the list and add the individual weights. First, we do it with an imperative approach:
= 0
total_weight for p in people:
= total_weight + p['weight']
total_weight
print(total_weight)
We notice that iteration and reduction are independent. What if we abstract iteration away?
\(reduceL(xs: [A], init: B, f: (acc: B, x: A) \rightarrow B) \rightarrow B\)
Left reduce takes a function f
with 2 arguments and an initial value and applies f
on all items, starting from the left most. f
combines the result of its previous application with the current element.
def total_weight(acc, p): # acc is for accumulator
return acc + p['weight']
Then we can calculate the total weight of all people in our collection as follows
= reduce(total_weight, people, 0)
total # or equivalently, using an anonymous function
= reduce(lambda x,y: x + y['weight'], people, 0) total
from functools import reduce
\(reduceR(xs: [A], init: B, f: (x: A, acc: B) \rightarrow B) \rightarrow B\)
Left right takes a function f
with 2 arguments and an initial value and applies f
on all items, starting from the right most. f
combines the result of its previous application with the current element.
def reduceR(func, xs, init):
return reduce(lambda x,y: func(y,x), reversed(xs), init)
reduceR
and reduceL
: differencesTo see how reduceR
and reduceL
are evaluated, we define a reduction function that prints the intermediate steps.
def reduce_pp(acc, x):
return "(%s + %s)" % (acc, x)
How does reduceL
work?
print(reduce(reduce_pp, range(1,10), 0))
## (((((((((0 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)
How does reduceR
work?
print(reduceR(reduce_pp, range(1,10), 0))
## (1 + (2 + (3 + (4 + (5 + (6 + (7 + (8 + (9 + 0)))))))))
Q: Can we always apply reduceL
instead of reduceR
?
reduceR
!= reduceL
The answer to the previous question is: it depends on whether the reduction operation is commutative.
An op \(\circ\) is commutative iff \(x \circ y = y \circ x\).
We can see that if we choose a non-commutative operator, reduceL
and reduceR
produce different results.
print(reduce(lambda x, y: x * 1.0 / y, range(1,10), 1))
## 2.7557319223985893e-06
print(reduceR(lambda x, y: x * 1.0 / y, range(1,10), 1))
## 2.4609375
The simplest possible aggregation is counting the number of elements, possiblibly matching a condition
\(count(xs: [A], pred): Integer\)
def count(xs, pred):
len(filter(xs, pred))
Q: How can we implement this with a reduction?
Given a sequence of items, produce a new sequence with no duplicates.
\(distinct(xs: [A]): [A]\)
Distinct operations are usually implemented by copying the initial sequence elements to a set
data structure.
= [1,2,2,3]
a set(a)
## {1, 2, 3}
Distinct assumes that items have unique identities; in Python, we use the id()
method (same as hashCode()
in Java).
Aggregation functions have the following generic signature:
\(f: [A] \rightarrow Number\)
Their job is to reduce sequences of elements to a single measurement. Some examples are:
min
, max
, count
mean
, median
, stdev
Grouping splits a sequence of items to groups given a classification function.
\(groupBy(xs:[A], f: A \rightarrow K): Map[K, [A]]\)
def group_by(classifier, xs):
= dict()
result for x in xs:
= classifier(x)
k if k in result.keys():
result[k].append(x)else:
= [x]
result[k] return result
def number_classifier(x):
if x % 2 == 0:
return "even"
else:
return "odd"
= [1,2,3,4,5,6,7]
a print(group_by(number_classifier, a))
## {'odd': [1, 3, 5, 7], 'even': [2, 4, 6]}
How can we get the average height per gender (adults only) in our list of people?
from numpy import mean
= filter(lambda x: x['age'] > 18, people)
adults = group_by(lambda x: x['gender'], adults)
adults_per_gender = \
avg_age_per_gender map(lambda k, v: {k, mean(map(lambda y: y['age'], v))},
adults_per_gender.items())
print(avg_age_per_gender)
## <map object at 0x148bf4700>
The above is equivalent to the following SQL expression
select gender, mean(age)
from people
where age > 18
group by gender
D: What are the relative strengths and weaknesses of each representation?
KV stores is the most common format for distributed databases.
What KV systems enable us to do effectively is processing data locally (e.g. by key) before re-distributing them for further processing. Keys are naturally used to aggregate data before distribution. They also enable (distributed) data joins.
Typical examples of distributed KV stores are Dynamo, MongoDB and Cassandra
The most common data structure in big data processing is key-value pairs.
# Python
[ 'EWI': ["Mekelweg", 4]],
['TPM': ["Jafaalaan", 5]],
['AE': ["Kluyverweg", 1]]
[ ]
// Scala
List( "EWI", Tuple2("Mekelweg", 4)),
List("TPM", Tuple2("Jafaalaan", 5)),
List("AE", Tuple2("Kluyverweg", 1))
List( )
mapValues
: Transform the values part\(mapVal(kv: [(K,V)], f: V\rightarrow U): [(K,U)]\)
groupByKey
: Group the values for each key into a single sequence.\(groupByKey(kv: [(K,V)]) : [(K, [V])]\)
reduceByKey
: Combine all elements mapped by the same key into one\(reduceByKey(kv: [(K,V)], f: (V,V) \rightarrow V) : [(K, V)]\)
join
: Return a sequence containing all pairs of elements with matching keys\(join(kv1: [(K,V)], kv2: [(K,W)]) : [(K, (V,W))]\)
mapValues
: With mapValues
we can apply a transformation and keep data local.
def mapValues[U](f: (V) => U): RDD[(K, U)]
reduceByKey
: Reducing values by key allows us to avoid moving data among nodes. This is because we can reduce locally and only distribute the results of the reduction for further aggregation. This is also why f
does not allow us to change the reduction type.
def reduceByKey(f: (V, V) => V): RDD[(K, V)]
Supose we have a dataset of addresses
case class Addr(k: String, street: String, num: Int)
val addr = List(
Addr("EWI", "Mekelweg", 4),
Addr("EWI", "Van Mourik Broekmanweg", 6),
Addr("TPM", "Jafaalaan", 5),
Addr("AE", "Kluyverweg", 1)
)
and a dataset of deans
case class Dean(k: String, name: String, surname: String)
val deans = List(
Dean("EWI", "John", "Schmitz"),
Dean("TPM", "Hans", "Wamelink")
)
Q: Define a method deanAddresses
to retrieve a list of address of each dean.
def deanAddresses : List[Dean, List[Addr]] =
map {d => (d, addr.filter(a => a.k == d.k))} deans.
In practice, we get the following results
// EEEWWWW
List(
(Dean(EWI,John,Schmitz), List(
Addr(EWI,Mekelweg,4),
Addr(EWI,Van Mourik Broekmanweg,6))
),
(Dean(TPM,Hans,Wamelink),List(
Addr(TPM,Jafaalaan,5))
)
)
This is OK, but a more practical result would be a List[(Dean, Addr)]
. For this, we need to flatten the internal sequence.
flatMap
: Map and flatten in one step\(flatMap(xs: [A], f: A \rightarrow [B]): [B]\)
flatMap
enables us to combine two data collections and return a new collection with flattened values.
def deanAddresses2: Seq[(Dean, Addr)] =
flatMap(d =>
deans.filter(a => a.k.==(d.k)).
addr.map(v => (d, v)))
In Scala, flatMap
is special
def deanAddresses2: Seq[(Dean, Addr)] = {
for (
d <- deans;filter(a => a.k == d.k)
v <- addr.yield (d, v)
) }
A KV pair is an alternative form of a relation, indexed by a key. We can always convert between the two
val relation : Set[Tuple3[Int, Int, String]] = ???
Convert the above relation to a KV pair
val kvpair : Set[Tuple2[Int, Tuple2[Int, String]]] =
map(x => (x._1, (x._2, x._3))) relation.
Convert the KV pair back to a relation
val relation2: Set[Tuple3[Int, Int, String]] =
map(x=> (x._1, x._2._1, x._2._2)) kvpair.
This means that any operation we can do on relations, we can also do on KV pairs.
One of the key characteristics of data processing is that data is never modified in place. Instead, we apply operations that create new versions of the data, without modifying the original version.
Immutability is a general concept that expands in much of the data processing stack.
Image from Helland in ref [1]
Copy-On-Write is a general technique that allows us to share memory for read-only access accross processes and deal with writes only if/when they are performed by copying the modified resource in a private version.
COW is the basis for many operating system mechanisms, such as process creation (forking), while many new filesystems (e.g. BTRFS, ZFS) use it as their storage format.
COW enables systems with multiple readers and few writers to efficiently share resources.
Immutable or persistent data structures always preserve the previous version of themselves when they are modified [2].
With immutable data structures, we can:
They come at a cost of increased memory usage (data is never deleted).
Scala has both mutable and immutable versions of many common data structures. If in doubt, use immutable.
val xs = TreeSet[Char]('a', 'b',
'c', 'd',
'g', 'f',
'h')
xs
is looks like a regular mutable tree here. We can share it between 2 threads, , without worrying about updates.
The difference of immutability is apparent only when we try to insert a new node e
.
val ys = xs ++ TreeSet[Char]('e')
ADT | collection.mutable |
collection.immutable |
---|---|---|
Array | ArrayBuffer | Vector |
List | LinkedList | List |
Map | HashMap | HashMap |
Set | HashSet | HashSet |
Queue | SynchronizedQueue | Queue |
Tree | — | TreeSet |
This work is (c) 2017, 2018, 2019 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.