Typical processing scenario for big data:
Assumes that the value of the data is hidden in it (“needle in haystack”)
Running processes generate data continuously, users need to continously monitor processes. The fact that we use mostly static data is due to legacy constraints.
Q: Most datasets are:
you guessed right, unbounded.
What changes faster over time; data or code?
If \(\frac{\Delta d}{\Delta t} \gg \frac{\Delta c}{\Delta t}\), this is a streaming problem
If \(\frac{\Delta c}{\Delta t} \gg \frac{\Delta d}{\Delta t}\), this is an exploration problem
— Joe Hellerstein
Some real-world examples of such data include:
[11/Oct/2018:09:02:41 +0000] "GET /pub/developer-testing-in-IDE.pdf HTTP/1.1" 200 8232808 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11A465 Safari/9537.53 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
[11/Oct/2018:09:04:36 +0000] "GET /courses/bigdata/spark.html HTTP/1.1" 200 2145339 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
[11/Oct/2018:09:06:20 +0000] "GET /atom.xml HTTP/1.1" 200 255069 "-" "Gwene/1.0 (The gwene.org rss-to-news gateway)"
[11/Oct/2018:09:08:37 +0000] "GET /pub/eval-quality-of-open-source-software.pdf HTTP/1.1" 200 1306751 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
Suppose we want to calculate the total number of users on our system per hour every hour.
tail -f log.txt | # Monitor interactions
sed -e … | # Extract user from logline
sort | uniq | # Unique users
wc -l | # Get user count
xargs -I {} echo -n `date` {} # Print with timestamp
Q When will the above command finish?
Never! There is no way to tell tail
that we want it to
aggregate data per hour and make the pipeline recompute when
tail
emits.
Unix has many components required for stream processing:
tail
or
pipe
It is missing:
Stream processing is a set of techniques and corresponding systems that process timestamped events. For a stream processing system to work, we need two major components:
To make stream processing viable in the real world, both components must be scalable, distributable and fault-tolerant.
The fundamental entity that stream processing deals with is an event. Events are produced by continuous processes and in order to be processed they must be consumed.
Messaging systems solve the problem of connecting producers to consumers.
tail -f log.txt | wc -l
tail
is the producer and wc
is the
consumer. The messaging system is the pipe. A pipe has the
following functionality
Pipes implement the publish / subscribe model for 1 producer to 1 consumer.
Publish/subscribe systems connect multiple producers to multiple consumers.
Direct messaging systems use simple network communication (usually UDP) to broadcast messages to multiple consumers. They are fast, but require the producers/consumers to deal with data loss. Example: ZeroMQ
Message brokers or queues are centralized systems that sit between producers and consumers and deal with the complexities of reliable message delivery.
The producers send messages in any of the following modes:
The broker:
The consumers:
Competing workers: Multiple consumers read from a single queue, competing for incoming messages
Fan out: Each consumer has a queue of its own. Incoming messages are replicated on all queues
Message routing: The producer assigns keys to msg metadata. The consumer creates topic queues by specifying the keys it is interested to receive messages for.
GHTorrent uses topic queues to decouple the following the GitHub event stream from the retrieval of items linked from events. Events are written to the RabbitMQ broker with a routing key according to their event type; a configurable number of data retrieval processes subscribes to those queues.
Broker-based messaging is widely used and well understood. It has however one drawback: after a message is received, it dissapears. This leads to lost opportunities:
Q: How can we solve those problems?
A: Instead of just forwarding messages, we can store and forward them.
A log is an append-only data structure stored on disk. We can exploit logs to implement messaging systems:
Kafka is the most well known log server. It is being used both to aggregate and store raw events and as an intermediary between systems.
What programming models for streams enable processing of events to derive (some form of) state.
Read also this comprehensive blog post by Martin Kleppmann.
Capture all changes to an application state as a sequence of events.
Instead of mutating the application state (e.g. in a database), we store the event that causes the mutation in an immutable log. The application state is generated by processing the events. This enables us to:
Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change (Wikipedia).
Reactive APIs model event sources as infinite collections on which observers subscribe to receive events.
Observable.from(TwitterSource). // List of tweets
{_.location == 'NL'}. // Do some filtering
filter{t => GeolocateService(t)}. // Precise geolocation
flatMap{loc => loc.city}. // Group results per city
groupBy{grp => grp.map(v => (grp.key, v))}.
flatMapsubscribe(println)
Example is in the Reactive Extensions (Rx) formulation. .NET Rx and Java 9 (Flow), include facilities for reactive programming.
The Dataflow model was introduced by Akidau et al. [1] as a generic implementation of the MillWheel system [2]. Flink was heavily inspired by it.
The DataFlow model attempts to explain stream processing in four dimensions:
In streaming systems, we have two notions of time:
D: Describe a scenario where those are different.
Applications that calculate streaming aggregates (e.g. avg rainfall per country per hour) don’t care much about the event order.
Applications with precise timing requirements (e.g. bank transactions, fraud detection) care about event time. Events may however enter the system delayed or out of order.
If processing (wall-clock) time is \(t\)
Element-wise ops apply a function to each
individual message. This is the equivalent of map
or
flatMap
in batch systems.
Aggregations group multiple events together and
apply a reduction (e.g. fold
or max
) on
them.
map
\(Stream[A].map(x: A \rightarrow B) : Stream[B]\)
Convert types of stream elements
// Rx
Observable.from(List(1,2,3)).map(x => 10 * x)
// Flink
.fromCollection(List(1,2,3)).map(x => 10 * x) env
filter
\(Stream[A].filter(x: A \rightarrow Boolean) : Stream[A]\)
Only keep events that satisfy the predicate.
// Rx
Observable.from(List(2,30,22,5,60,1)).map(x => x > 10)
// Flink
.fromCollection(List(2,30,22,5,60,1)).map(x => x > 10) env
merge
\(Stream[A].merge(b: Stream[B >: A]) : Stream[B]\)
Emit a stream that combines all events from both streams.
// Rx
val a = Observable.from(List(20,40,60,80,100))
val b = Observable.from(List(1,1))
.merge(b)
a
// Flink
val a = env.fromCollection(List(20,40,60,80,100))
val b = env.fromCollection((List(1,1))
.union(b) a
flatMap
\(flatMap(f: A \rightarrow Stream[B]) : Stream[B]\)
Apply f
on all elements of Stream[A]
and
flatten any nested results to a new stream.
// Rx
Observable.from(List('foo', 'bar')).
flatMap(x => Observable.from(x.toCharArray))
// Flink
.fromCollection(List('foo', 'bar'))
envflatMap(x => x.toCharArray)
keyBy
\(Stream[A].keyBy(f: A \rightarrow K): Stream[(K, Stream[K])]\)
Partition a stream using a discriminator function and produce (a stream of) streams that emit the partitioned data.
keyBy
(or groupBy
in Rx) creates
partitioned streams that can be processed in parallel. Moreover, keys
are required for various operations combining data.
join
Stream[A].join(b: Stream[B],
kl: A => K,
kr: B => K,
rs: (A,B) => R): Stream[R]
Join streams \(A\) and \(B\). Key selector functions kl
and kr
extract keys of type \(K\), on which the join operation is
performed. On each joined pair, the result selector function
rs
is applied to derive the result type.
Find commits that caused exceptions and notify the authors.
case class StackEntry(file: String, line: Int)
case class Exception(exception: String, entries: Seq[StackEntry])
case class DiffLine(file: String, line: Int, content: ...)
case class Commit(author: String, diff: Seq[DiffLine])
val logs : Stream[(Exception, StackEntry)] = env.socketTextStream(host, port).
flatMap(e => for(s <- e.entries) yield (e, s))
val diffs : Stream[(Commit, Diff)] = env.GitRepoSource(...).
flatMap(c => for(d <- c.diff) yield(c, d))
.join(diffs).
logswhere(stackEntry => stackEntry.file).
equalTo(diff => diff._2._file).
apply((log, diff) => (diff._1.author, diff._1.commit.sha, log.exception)).
map(e => sendEmail(...))
Q: How can a stream processor execute this?
A: Presumably, stack traces is a faster stream than commits; joining will require all keys to be kept in memory (per processing node). Theoretically, this requires infinite memory.
Stream[A].aggregate(f: AggregationFunction[A, T, B]): Stream[B]
trait AggregationFunction[IN, ACC, OUT] {
def createAccumulator(): ACC
def add(value: IN, acc: ACC): ACC // Type conversion IN -> ACC
def getResult(acc: ACC): OUT // Type conversion ACC -> OUT
def merge(a: ACC, b: ACC): ACC
}
Aggregations group multiple events together and apply a reduction on them.
Q: How can we aggregate events, when our event stream is infinite?
Hint: remember the Unix example from before.
A: We can create event groups by time (e.g. every 2 minutes) or by count (e.g., every 100 events).
Windows are static size (e.g., 1000 events) or time-length (e.g., 10 secs) “batches” of data.
Session windows are dynamically sized windows that aggregate batches of (typically) user activity. Windows end after session gap time.
// Count number of tweets per user per minute
.map(t => (t.user_id, 1))
tweets.keyBy(x => x._1)
.timeWindow(Time.minutes(1))
.reduce((a,b) => a._2 + b._2)
Every minute, this will produce a list of pairs like so:
(323, 1)
(44332, 4)
(212, 32)
...
// Number of clicks per user session
case class Click(id: Integer, link: String, ...)
.map(c => (c.id, 1))
clickStream.keyBy(x => x._1)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.sum(1)
When a session terminates, we get results like:
(323, 1)
(44332, 4)
(212, 32)
...
There are 2 things to remember when using event-time windows.
Buffering: Aggregation functions are applied when the window finishes (see \(\color{green}{\textsf{When}}\)). This means that in-flight events need to be buffered in RAM and spilled to disk.
Completeness: Given that events may arrive out of order, how can we know that a window is ready to be materialized and what do we do with out of order events?
A trigger defines when in processing time are the results of a window materialized / processed. Two types of triggers can be defined:
Per-record triggers fire after \(x\) records in a window have been encountered.
Aligned delay triggers fire after a specified amount of time has passed across all active windows (aka micro-batching).
Unaligned delay triggers fire after a specified amount of time has passed after the first event in a single window.
Click on the links to watch Akidau’s excellent visualizations
Event-time processors need to determine when event time has progressed enough so that they can trigger windows. When reprocessing events from storage, a system might process weeks of event-time data in seconds; relying on processing time to trigger windows is not enough.
Watermarks flow as part of the data stream and carry a timestamp. They are a declaration that by a point in the stream, all events carrying a timestamp up to the watermark timestamp should have arrived.
Watermarks allow late messages to be processed up to a specified amount of (event-time) delay (allowed lateness).
As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.
In certain complex cases, a combination of triggers and watermarks flowing may cause a window to be materialized multiple times. In such cases, we can discard, accumulate or accumulate and retract the window results.
For further reading, consult the Beam documentation.
Messaging systems are purposed to move data from produces to consumers, in a scalable and fault-tolerant way.
However, they do not process the moved data; this is the job of dedicated, stream processing systems.
There is nothing that fundamentally disallows event streams from acting as a database.
The main difference between streams and databases is that databases contain state, whereas streams contain state modifications. Therefore, databases can be updated, while streams can be appended.
Micro-batching: Aggregate data in batches of configurable (processing-time) duration
Event-based streaming: Process events one by one
Event-time systems can emulate micro-batching by setting an aligned delay trigger to keyed windows.
Spark Streaming is an example of a micro-batching
architecture. Spark Streaming breaks the input data into batches (of
x
seconds processing time length) and schedules those in
the cluster using the exact same mechanisms for fault tolerance as
normal RDDs.
.socketTextStream("localhost", 9999).
sscflatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
Latency
Programming model
In Flink, a program is first compiled to a data-flow graph. Each node in the DFG represents a task, and can be scheduled within a task manager (essentially, a JVM instance).
A DFG consists of a Source, a Sink and intermediate computations. A sink cannot be a source: this means that 2 Flink computations cannot exchange data directly.
Imagine that for each item we process, we would like to keep a counter.
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
match {
count case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
The mapWithState
operator takes and returns an optional
state, which the stream processor must maintain.
In addition, many operators, for example windowing and aggregation ones, are inherently stateful.
Q: As the processing graph is distributed, we need a consistent, fault-tolerant global view of the counter. How can we implement this?
A: An idea would be to pause all operators, start a 2-phase commit process and restart the processing when all nodes are committed.
But we can do better than that!
The Chandy-Lamport algorithm [3] can be used to capture consistent global snapshots. It models a distributed system as a graph of processes that have input and output channels, overseen by a snapshot initiator:
Flink takes incremental snapshots by interleaving epoch markers with messages [4]. It assumes that input streams are durably logged and repeatable (e.g., with Kafka). Operators wait for the same epoch markers from all channels before they take a snapshot.
The following guarantees are offered by streaming systems
Flink supports exactly once. To do so, it requires the source to support event replay on request and the sink to be transactional. Both requirements are satisfied by Apache Kafka.
This work is (c) 2017, 2018, 2019, 2020 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.