In this section, we review the basic data types we use when processing data.

**Unstructured**: Data whose format is not known- Raw text documents
- HTML pages

**Semi-Structured**: Data with a known format.**Structured**: Data with known formats, linked together in graphs or tables- SQL or Graph databases
- Images

**D**: *What types of data are more convenient when processing?*

**Sequences** or **Lists** or **Arrays** represent consecutive items in memory

In Python:

`= [1, 2, 3, 4] a `

In Scala

`val l = List(1,2,3,4)`

Basic properties:

- Size is bounded by memory
- Items can be accessed by an index:
`a[1]`

or`l.get(3)`

- Items can only inserted at the end (
*append*) - Can be sorted

Sets store values, without any particular order, and no repeated values.

```
val s = Set(1,2,3,4,4)
scala> collection.immutable.Set[Int] = Set(1, 2, 3, 4) s: scala.
```

Basic properties:

- Size is bounded by memory
- Can be queried for containment
- Set operations: union, intersection, difference, subset

**Maps** or **Dictionaries** or **Associative Arrays** is a collection of `(k,v)`

pairs in such a way that each `k`

appears only once.

Some languages have build-in support for Dictionaries

`= {'a' : 1, 'b' : 2} a `

Basic properties:

- One key always corresponds to one value.
- Accessing a value given a key is very fast (\(\approx O(1)\))

A graph data structure consists of a finite set of vertices or nodes, together with a set of unordered pairs of these vertices for an undirected graph or a set of ordered pairs for a directed graph.

- Nodes can contain attributes
- Edges can contain weights and directions

Graphs are usually represented as `Map[Node, List[Edge]]`

, where

```
case class Node(id: Int, attributes: Map[A, B])
case class Edge(a: Node, b: Node, directed: Option[Boolean],
weight: Option[Double] )
```

Ordered graphs without loops

```
= {"id": "5542101946", "type": "PushEvent",
a "actor": {
"id": 801183,
"login": "tvansteenburgh"
},"repo": {
"id": 42362423,
"name": "juju-solutions/review-queue"
}}
```

If we parse the above JSON in almost any language, we get a series of nested maps

```
"id" -> 5542101946L,
Map("type" -> "PushEvent",
"actor" -> Map("id" -> 801183.0, "login" -> "tvansteenburgh"),
"repo" -> Map("id" -> 4.2362423E7, "name" -> "juju-solutions/review-queue")
)
```

An \(n\)-tuple is a sequence of \(n\) elements, whose types are known.

```
val record = Tuple4[Int, String, String, Int]
1, 'Georgios', 'Mekelweg', 4) (
```

Scala makes it easy to declare and use tuples by automatically infering the types of the tuple contents.

```
val a = (1, ("Foo", 2)) // type: Tuple2[Int, Tuple2[String, Int]]
println(a._1) // prints 1
println(a._2._1) // prints Foo
```

A *relation* is a `Set`

of \(n\)-tuples \((d1, d2, ..., dn)\) of the same type; one of the tuple elements denotes a *key*. Keys cannot be repeated.

Relations are very important for data processing, as they form the theoretical framework (Relational Algebra) for *relational (SQL) databases*.

Typical operations on relations are insert, remove and *join*. Join allows us to compute new relations by joining existing ones on common fields.

```
val addr1 = (1, "Gebouw 35", "Mekelweg", 5)
val addr2 = (2, "Gebouw 36", "Drebbelweg", 4)
val addr = Set(addr1, addr2)
val georgios = (1, "Georgios", 1)
val wouter = (1, "Wouter", 2)
val joris = (1, "Joris", 4)
val people = Set(georgios, wouter, joris)
```

**Q:** How can we get a list of buildings and the people that work there?

A key/value pair (or K/V) is a more general type of a relation, where each key can appear more than once.

```
// We assume that the first Tuple element represents the key
val a = (1, ("Mekelweg", 4))
val b = (1, ("VMB", 6))
val kv = List(a, b)
```

Another way to represent K/V pairs is with a `Map`

```
val xs : Map[Int, List[(String, Int)]] =
1 -> List(("Mekelweg", 4), ("VMB", 6))) Map(
```

K and V are flexible: that’s why the Key/Value abstraction is key to NoSQL databases, including MongoDB, DynamoDB, Redis etc.

In this section, we discuss the basics of functional programming, as those apply to data processing with tools like Hadoop, Spark and Flink.

Functional programming is a programming paradigm that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data (Wikipedia).

Functional programming characteristics:

- Absence of
**side-effects**: A function, given an argument, always returns the same results irrespective of and without modifying its environment. **Higher-order functions**: Functions can take functions as arguments to parametrise their behavior**Lazyness**: The art of waiting to compute till you can wait no more

\(foo(x: [A], y: B) \rightarrow C\)

- \(foo\): function name
- \(x\) and \(y\): Names of function arguments
- \([A]\) and \(B\): Types of function arguments.
- \(\rightarrow\): Denotes the return type
- \(C\): Type of the returned result
- \([A]\): Denotes that type \(A\) can be traversed

We read this as: *Function foo takes as arguments an array/list of type A and an argument of type B and returns an argument of type C*

**Q**: *What does the following function signature mean?* \(f(x:[A], y:(z: A) \rightarrow B) \rightarrow [B]\)

A function has a side effect if it **modifies** some state outside its scope or has an observable interaction with its calling functions or the outside world besides returning a value.

```
max = -1
def ge(a, b):
global max
if a >= b:
max = a ## <- Side effect!
return True
else:
max = b
return False
```

As a general rule, any function that returns nothing (`void`

or `Unit`

) does a side effect!

- Setting a field on an object: OO is not FP!
- Modifying a data structure in place: In FP, data structures are always persistent.
- Throwing an exception or halting with an error: In FP, we use types that encapsulate and propagate erroneous behaviour
- Printing to the console or reading user input, reading writing to files or the screen: In FP, we encapsulate external resources into
*Monads*.

How can we write code that does something useful given those restrictions?

```
class Cafe {
def buyCoffee(cc: CreditCard): Coffee = {
val cup = new Coffee()
charge(cup.price)
cc.
cup
} }
```

Can you spot any problems with this code?

`charge()`

performs a side-effect: we need to contact the credit card company!`buyCoffee()`

is*not testable*

```
class Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = {
val cup = new Coffee()
charge(cc, cup.price)
p.
cup
} }
```

Slightly better option, *but*:

`Payments`

has to be an interface- We still need to perform side effects
- We need to inspect state within the
`Payments`

mock

**Q:** How can we buy 10 coffees?

```
class Cafe {
def buyCoffee(cc: CreditCard, p: Payments): Coffee = { ... }
def buyCoffees(cc: CreditCard, p: Payments, num: Int) = {
for (i <- 1 to num) {
buyCoffee(cc, p)
}
} }
```

Seems to be working, *but*:

- No way to batch payments
- No way to batch checkouts

**Idea**: how about instead of charging in place, we decouple the action of *buying* coffee from that of *charging* for it?

```
class Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = {
val cup = new Coffee()
Charge(cc, cup.price))
(cup,
} }
```

Nice! We can now:

- Test
- Combine multiple
`Charge`

s in one - Maintain in flight accounts for all customers

```
class Charge(cc: CreditCard, amount: Double) {
def combine(other: Charge) = new Charge(cc, amount + other.amount)
def pay = cc.charge(amount)
}
class Cafe {
def buyCoffee(cc: CreditCard): (Coffee, Charge) = { ... }
def buyCoffees(cc: CreditCard, num: Int): Seq[(Coffee, Charge)] =
1 to num).map(buyCoffee(cc))
(
def checkout(cc: CreditCard, charges: Seq[Charge]) : Seq[Charge] = {
charges.filter(charge => charge.cc == cc).
foldLeft(new Charge(cc, 0.0)){(acc, x) => acc.combine(x)}.
// <- side-effect, but once, in one place.
pay
filter(charge => charge.cc != cc)
charges.
} }
```

*This example was adapted from the (awesome) FP in Scala book, by Chiusano and Bjarnason*

A higher order function is a function that can take a function as an argument or return a function.

```
class Array[A] {
// Return elements that satisfy f
def filter(f: A => Boolean) : Array[A]
}
```

In the context of BDP, high-order functions capture common idioms of processing data as enumarated elements, e.g. going over all elements, selectively removing elements and aggregating them.

`map(xs: List[A], f: A => B) : List[B]`

Applies

`f`

to all elements and returns a new list.`flatMap(xs: List[A], f: A => List[B]) : List[B]`

Like

`map`

, but flattens the result to a single list.`foldL(xs: List[A], f: (B, A) => B, init: B) : B`

Takes

`f`

of 2 arguments and an init value and combines the elements by applying`f`

on the result of each previous application. AKA`reduce`

.

`groupBy(xs: List[A], f: A => K): Map[K, List[A]]`

Partitions

`xs`

into a map of traversable collections according to a discriminator function.`filter(xs: List[A], f: A => Boolean) : List[A]`

Takes a predicate and returns all elements that satisfy it

`scanL(xs: List[A], f: (B, A) => B, init: B) : List[B]`

Like

`foldL`

, but returns a list of all intermediate results`zip(xs: List[A], ys: List[B]): List[(A,B)]`

Returns a iterable collection formed by iterating over the corresponding items of

`xs`

and`ys`

.

*Laziness* is an evaluation strategy which delays the evaluation of an expression until its value is needed.

- Seperating a pipeline construction from its evaluation
- Not requiring to read datasets in memory: we can process them in lazy-loaded batches
- Generating
*infinite*collections - Optimising execution plans

```
def primes(limit):
= [False]*limit
sieve
for j in range(2, limit):
if sieve[j]: continue
yield j
if j*j > limit: continue
for i in range(j, limit, j):
= True
sieve[i]
print([x for x in primes(100)][1:10])
```

`## [3, 5, 7, 11, 13, 17, 19, 23, 29]`

In tools like Spark and Flink, we always express computations in a lazy manner. This allows for optimizations before the actual computation is executed

```
# Word count in PySpark
= sc.textFile("words.txt")
text_file = text_file \
counts lambda line: line.split(" ")) \
.flatMap(map(lambda word: (word, 1)) \
.lambda a, b: a + b)
.reduceByKey(
"results.txt") counts.saveAsTextFile(
```

**Q:** In the code above, when will Spark compute the result?

**A:** When `saveAsTextFile`

is called!

*Monads* are a design pattern that defines how functions can be used together to build generic types. Practically, a monad is a value-wrapping type that:

- Has an
`identity`

function - Has a
`flatMap`

function, that allows data to be transfered between monad types

```
trait Monad[M[_]] {
def unit[S](a: S) : M[S]
def flatMap[S, T] (m: M[S])(f: S => M[T]) : Monad[T]
}
```

Monads are the tool FP uses to deal with (side-)effects

- Null points:
`Option[T]`

- Exceptions:
`Try[T]`

- Latency in asynchronous actions:
`Future[T]`

`flatMap`

`flatMap`

enables us to join sequences of arbitrary types.

Example: `Future`

s wrap values that will be eventually available.

```
def callWebService(): Future[R]
def heavyComputation(r: R): Future[V]
val r = callWebService().flatMap(r => heavyComputation2(r))
```

Example: `Option`

s wrap values that may be null.

```
def getArgument(k: String): Option[Arg]
def process(a: Arg): Option[Result]
getArgument("foo").
flatMap(processArgument).
getOrElse(new Result("default"))
```

```
object Converter extends App {
def toInt(a: String): Try[Int] = Try{Integer.parseInt(a)}
def toString(b: Int): Try[String] = Try{b.toString}
val a = toInt("4").flatMap(x => toString(x))
println(a)
val b = toInt("foo").flatMap(x => toString(x))
println(b)
}
```

`Try`

is a type that can have 2 instances:

`Success[T]`

, where`T`

represents the type of the result`Failure[E]`

, where`E`

represents the type of error, usually an exception

```
public class Converter {
public Integer toInt(String a) throws NumberFormatException {
return Integer.parseInt(a)
}public String toString(Integer a) throws NullPointerException {
return b.toString();
}
public static void main(String[] args) {
try {
Integer five = toInt('5');
try {
return toString(five);
catch(NullPointerException e) {
} //baaah
}catch(NumberFormatException r) {
} //ooofff
}
} }
```

```
class Amazon {
def login(login: String, passwd: String): Future[Amazon]
def search(s: String): Future[Seq[String]]
}
class Main extends App {
val amazon = new Amazon()
val result =
login("uname", "passwd") // Might fail
amazon.flatMap(a => a.search("foo"))
. }
```

We now need to encode error handling, so that the compiler checks that we are doing it propertly.

**D**: Which of the following should our new `search`

return?

`Seq[Future[Try]]`

`Future[Seq[Try]]`

`Future[Try[Seq]]`

Before starting to process datasets, we need to be able to go over their contents in a systematic way. The process of visiting all items in a dataset is called *traversal*.

In a big data system:

**Client**code processes data- A
**data source**is a container of data (e.g. array, database, web service)

There are two fundamental techniques for the client to process all available data in the data source

**Iteration**: The client*asks*the data source whether there are items left and then*pulls*the next item.**Observation**: The data source*pushes*the next available item to a client end point.

**D**: *What are the relative merits of each technique?*

In the context of BDP, iteration allows us to process finite-sized data sets without loading them in memory at once.

```
trait Iterator[A] {
def hasNext: Boolean
def next(): A
}
```

Typical usage

```
val it = Array(1,2,3,4).iterator
while(it.hasNext) {
val i = it.next + 1
println(i)
}
```

The `Iterator`

pattern is supported in all programming languages.

Reading from a file, first in Scala

```
val data = scala.io.Source.fromFile("/big/data").getLines
while (data.hasNext) {
println(data.next)
}// Equivalently...
for (line <- data) {
println(line)
}
```

and then in Python

```
with open("/big/data","r") as data:
# readlines() returns an object that implements __iter__()
for line in data.readlines():
print line
```

Observation allows us to process (almost) unbounded size data sets, where the data source controls the processing rate

```
// Consumer
trait Observer[A] {
def onNext(a: A): Unit
def onError(t: Throwable): Unit
def onComplete(): Unit
}
// Producer
trait Observable[A] {
def subscribe(obs: Observer[A]): Unit
}
```

Typical usage

```
from(1,2,3,4,5).
Observable.map(x => x + 1).
subscribe(x => println(x))
```

- Iteration and Observation are
*dual* - The same set of higher-order functions can be used to process data in both cases

Iterator-based `map`

. “Pulls” data out of array.

`1,2,3,4).map(x => x + 1) // Scala Array(`

`map([1,2,3,4], lambda x: x + 1) # Python`

Observation-based (reactive) `map`

. Data is “pushed” to it asynchronously, when new data is available.

`from(1,2,3,4,5).map(x => x + 1) // Scala Observable.`

`1,2,3,4]).map(lambda x: x + 1) # Python Observable.from_([`

**D:** (How) Can we convert between the two types of enumeration?

We apply a strategy to visit all individual items in a collection.

```
for i in [1,2,3]:
print i
for k,v in {"x": 1, "y": 2}:
print k
```

In case of nested data types (e.g. trees/graphs), we need to decide how to traverse. Common strategies include:

**Breadth-first traversal**: From any node A, visit its neighbours first, then its children.**Depth-first traversal**: From any node A, visit its children first, then its neighbours.

In most programming environments, traversal is implemented by *iterators*.

```
class Tree(object):
def __init__(self, title, children=None):
self.title = title
self.children = children or []
def __iter__(self): ## Depth first
yield self
for child in self.children:
for node in child:
yield node
```

Then, we can iterate using standard language constructs

```
= Tree("a", [Tree("b", [Tree("c"), Tree("d")])])
t for node in iter(t):
print node
```

Operations are transformations, aggregations or cross-referenceing of data stored in data types. All of container data types can be iterated.

We generally have two types of operations:

**Element-wise**ops apply a function to each individual message. This is the equivalent of map or flatMap in batch systems.**Aggregations**group multiple events together and apply a reduction (e.g. fold or max) on them.

**Conversion**: Convert values of type \(A\) to type \(B\)- Celcius to Kelvin
- € to $

**Filtering**: Only present data items that match a condition- All adults from a list of people
- Remove duplicates

**Projection**: Only present parts of each data item- From a list of cars, only display their brand

Suppose we have a list of people; each person is identified by a unique identifier, their age, their height in cm and their gender.

`= {id: 10, age: 50, height: 180, weight: 75, gender: 'Male'} p `

Now, let’s create 1000 random people!

```
from random import randint, seed
42) seed(
```

```
from random import randint, seed
42)
seed(
= []
people = ['Male', 'Female', 'Other']
genders for i in range(1000):
= {'id': i,
p 'age': randint(10,80),
'height': randint(60, 200),
'weight': randint(40, 120),
'gender': genders[randint(0,2)] }
people.append(p)
```

`## people[0]: {'id': 0, 'age': 24, 'height': 66, 'weight': 75, 'gender': 'Male'}`

To convert values, we traverse a collection and apply a convertion function to each individual element. This is generalized to the \(map\) function:

\(map(xs: [A], f: (A) \rightarrow B) \rightarrow [B]\)

Let’s convert the persons’ heights to meters

```
def to_m(person):
'height'] = person['height'] * 1.0 / 100
person[return person
= list(map(lambda x: to_m(x), people)) people
```

`## people[0]: {'id': 0, 'age': 24, 'height': 0.66, 'weight': 75, 'gender': 'Male'}`

Projection allows us to select parts of a Tuple, Relation or other nested data type for further processing. To implement this, we need to iterate over all items of a collection and apply a *conversion* function.

To filter values from a list, we traverse a collection and apply a predicate function to each individual element.

\(filter(xs: [A], f: (A) \rightarrow Boolean): [A]\)

**Q:** How can we implement filter?

```
def filter(xs, pred):
= []
result for i in xs:
if pred(i):
result.append(i)
return result
```

Aggregations apply a *combining operator* on a traversable sequence to aggregate the individual items into a single result.

Aggregation is implemented using reduction (or folding). Two variants exist: *left reduction* and *right reduction*.

- With left reduction, we traverse items from the first to last
- With right reduction, we traverse items from the last to first

Our task is to calculate the total weight for our people list. To do so, we need to iterate over the list and add the individual weights. First, we do it with an imperative approach:

```
= 0
total_weight for p in people:
= total_weight + p['weight']
total_weight
print(total_weight)
```

We notice that iteration and reduction are independent. What if we abstract iteration away?

\(reduceL(xs: [A], init: B, f: (acc: B, x: A) \rightarrow B) \rightarrow B\)

Left reduce takes a function `f`

with 2 arguments and an initial value and applies `f`

on all items, starting from the **left** most. `f`

combines the result of its previous application with the current element.

```
def total_weight(acc, p): # acc is for accumulator
return acc + p['weight']
```

Then we can calculate the total weight of all people in our collection as follows

```
= reduce(total_weight, people, 0)
total # or equivalently, using an anonymous function
= reduce(lambda x,y: x + y['weight'], people, 0) total
```

`from functools import reduce`

\(reduceR(xs: [A], init: B, f: (x: A, acc: B) \rightarrow B) \rightarrow B\)

Left right takes a function `f`

with 2 arguments and an initial value and applies `f`

on all items, starting from the **right** most. `f`

combines the result of its previous application with the current element.

```
def reduceR(func, xs, init):
return reduce(lambda x,y: func(y,x), reversed(xs), init)
```

`reduceR`

and `reduceL`

: differencesTo see how `reduceR`

and `reduceL`

are evaluated, we define a reduction function that prints the intermediate steps.

```
def reduce_pp(acc, x):
return "(%s + %s)" % (acc, x)
```

How does `reduceL`

work?

`print(reduce(reduce_pp, range(1,10), 0))`

`## (((((((((0 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)`

How does `reduceR`

work?

`print(reduceR(reduce_pp, range(1,10), 0))`

`## (1 + (2 + (3 + (4 + (5 + (6 + (7 + (8 + (9 + 0)))))))))`

**Q**: *Can we always apply reduceL instead of reduceR?*

`reduceR`

!= `reduceL`

The answer to the previous question is: *it depends on whether the reduction operation is commutative*.

An op \(\circ\) is commutative iff \(x \circ y = y \circ x\).

We can see that if we choose a non-commutative operator, `reduceL`

and `reduceR`

produce different results.

`print(reduce(lambda x, y: x * 1.0 / y, range(1,10), 1))`

`## 2.7557319223985893e-06`

`print(reduceR(lambda x, y: x * 1.0 / y, range(1,10), 1))`

`## 2.4609375`

The simplest possible aggregation is counting the number of elements, possiblibly matching a condition

\(count(xs: [A], pred): Integer\)

```
def count(xs, pred):
len(filter(xs, pred))
```

**Q:** How can we implement this with a reduction?

Given a sequence of items, produce a new sequence with no duplicates.

\(distinct(xs: [A]): [A]\)

Distinct operations are usually implemented by copying the initial sequence elements to a `set`

data structure.

```
= [1,2,2,3]
a set(a)
```

`## {1, 2, 3}`

Distinct assumes that items have unique identities; in Python, we use the `id()`

method (same as `hashCode()`

in Java).

Aggregation functions have the following generic signature:

\(f: [A] \rightarrow Number\)

Their job is to reduce sequences of elements to a single measurement. Some examples are:

- Mathematical functions:
`min`

,`max`

,`count`

- Statistical functions:
`mean`

,`median`

,`stdev`

Grouping splits a sequence of items to groups given a classification function.

\(groupBy(xs:[A], f: A \rightarrow K): Map[K, [A]]\)

```
def group_by(classifier, xs):
= dict()
result for x in xs:
= classifier(x)
k if k in result.keys():
result[k].append(x)else:
= [x]
result[k] return result
```

```
def number_classifier(x):
if x % 2 == 0:
return "even"
else:
return "odd"
= [1,2,3,4,5,6,7]
a print(group_by(number_classifier, a))
```

`## {'odd': [1, 3, 5, 7], 'even': [2, 4, 6]}`

How can we get the average height per gender (adults only) in our list of people?

```
from numpy import mean
= filter(lambda x: x['age'] > 18, people)
adults = group_by(lambda x: x['gender'], adults)
adults_per_gender = \
avg_age_per_gender map(lambda k, v: {k, mean(map(lambda y: y['age'], v))},
adults_per_gender.items())
print(avg_age_per_gender)
```

`## <map object at 0x148bf4700>`

The above is equivalent to the following SQL expression

```
select gender, mean(age)
from people
where age > 18
group by gender
```

**D**: What are the relative strengths and weaknesses of each representation?

KV stores is the most common format for distributed databases.

What KV systems enable us to do effectively is processing data locally (e.g. by key) before re-distributing them for further processing. Keys are naturally used to aggregate data before distribution. They also enable (distributed) data joins.

Typical examples of distributed KV stores are Dynamo, MongoDB and Cassandra

The most common data structure in big data processing is *key-value pairs*.

- A
**key**is something that identifies a data record. - A
**value**is the data record. Can be a complex data structure. - The KV pairs are usually represented as sequences

```
# Python
[ 'EWI': ["Mekelweg", 4]],
['TPM': ["Jafaalaan", 5]],
['AE': ["Kluyverweg", 1]]
[ ]
```

```
// Scala
List( "EWI", Tuple2("Mekelweg", 4)),
List("TPM", Tuple2("Jafaalaan", 5)),
List("AE", Tuple2("Kluyverweg", 1))
List( )
```

`mapValues`

: Transform the values part\(mapVal(kv: [(K,V)], f: V\rightarrow U): [(K,U)]\)

`groupByKey`

: Group the values for each key into a single sequence.\(groupByKey(kv: [(K,V)]) : [(K, [V])]\)

`reduceByKey`

: Combine all elements mapped by the same key into one\(reduceByKey(kv: [(K,V)], f: (V,V) \rightarrow V) : [(K, V)]\)

`join`

: Return a sequence containing all pairs of elements with matching keys\(join(kv1: [(K,V)], kv2: [(K,W)]) : [(K, (V,W))]\)

`mapValues`

: With `mapValues`

we can apply a transformation and keep data local.

`def mapValues[U](f: (V) => U): RDD[(K, U)]`

`reduceByKey`

: Reducing values by key allows us to avoid moving data among nodes. This is because we can reduce locally and only distribute the *results* of the reduction for further aggregation. This is also why `f`

does not allow us to change the reduction type.

`def reduceByKey(f: (V, V) => V): RDD[(K, V)]`

Supose we have a dataset of addresses

```
case class Addr(k: String, street: String, num: Int)
val addr = List(
Addr("EWI", "Mekelweg", 4),
Addr("EWI", "Van Mourik Broekmanweg", 6),
Addr("TPM", "Jafaalaan", 5),
Addr("AE", "Kluyverweg", 1)
)
```

and a dataset of deans

```
case class Dean(k: String, name: String, surname: String)
val deans = List(
Dean("EWI", "John", "Schmitz"),
Dean("TPM", "Hans", "Wamelink")
)
```

**Q**: Define a method `deanAddresses`

to retrieve a list of address of each dean.

```
def deanAddresses : List[Dean, List[Addr]] =
map {d => (d, addr.filter(a => a.k == d.k))} deans.
```

In practice, we get the following results

```
// EEEWWWW
List(
(Dean(EWI,John,Schmitz), List(
Addr(EWI,Mekelweg,4),
Addr(EWI,Van Mourik Broekmanweg,6))
),
(Dean(TPM,Hans,Wamelink),List(
Addr(TPM,Jafaalaan,5))
)
)
```

This is OK, but a more practical result would be a `List[(Dean, Addr)]`

. For this, we need to *flatten* the internal sequence.

`flatMap`

: Map and flatten in one step\(flatMap(xs: [A], f: A \rightarrow [B]): [B]\)

`flatMap`

enables us to combine two data collections and return a new collection with flattened values.

```
def deanAddresses2: Seq[(Dean, Addr)] =
flatMap(d =>
deans.filter(a => a.k.==(d.k)).
addr.map(v => (d, v)))
```

In Scala, `flatMap`

is special

```
def deanAddresses2: Seq[(Dean, Addr)] = {
for (
d <- deans;filter(a => a.k == d.k)
v <- addr.yield (d, v)
) }
```

A KV pair is an alternative form of a relation, indexed by a key. We can always convert between the two

`val relation : Set[Tuple3[Int, Int, String]] = ???`

Convert the above relation to a KV pair

```
val kvpair : Set[Tuple2[Int, Tuple2[Int, String]]] =
map(x => (x._1, (x._2, x._3))) relation.
```

Convert the KV pair back to a relation

```
val relation2: Set[Tuple3[Int, Int, String]] =
map(x=> (x._1, x._2._1, x._2._2)) kvpair.
```

This means that any operation we can do on relations, we can also do on KV pairs.

One of the key characteristics of data processing is that data is never modified in place. Instead, we apply operations that create new versions of the data, without modifying the original version.

*Immutability* is a general concept that expands in much of the data processing stack.