What is streaming?

Big data is big

  • Typical processing scenario for big data:

    • Aggregate data in intermediate storage
    • Run batch job overnight, store results in permanent storage
    • Use Spark for interactive exploration of recent data

Assumes that the value of the data is hidden in it (“needle in haystack”)

Data is NOT static

Running processes generate data continuously, users need to continously monitor processes. The fact that we use mostly static data is due to legacy constraints.

Static data vs time

Bounded and unbounded datasets

  • Bounded Data: A dataset that can be enumerated and/or iterated upon
    • Students in TI2736-B
    • Countries in the world
  • Unbounded Data: A dataset that we can only enumerate given a snapshot. Unbounded data does not have a size property.
    • Natural numbers
    • Facebook/Twitter posts

Q: Most datasets are:

you guessed right, unbounded.

Stream and batch processing

  • Batch processing applies an algorithm on a bounded dataset to produce a single result at the end
    • Unix, Map/Reduce and Spark are batch processing systems
  • Stream processing applies an algorithm on continuosly updating data and continuously creates results
    • Flink and Storm are stream processing systems
    • Natural fit for unbouded datasets
    • Bounded data are usually a time-restricted view of unbounded data

Use cases for stream procesing

  • Intrusion and fraud detection
  • Algorithmic trading
  • Process monitoring (e.g. production processes, or logs)
  • Traffic monitoring
  • When we can discard raw data and prefer to store aggregates

What changes faster over time; data or code?

If \(\frac{\Delta d}{\Delta t} \gg \frac{\Delta c}{\Delta t}\), this is a streaming problem

If \(\frac{\Delta c}{\Delta t} \gg \frac{\Delta d}{\Delta t}\), this is an exploration problem

— Joe Hellerstein

Data in streaming processing

Some real-world examples of such data include:

[11/Oct/2018:09:02:41 +0000] "GET /pub/developer-testing-in-IDE.pdf HTTP/1.1" 200 8232808 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11A465 Safari/9537.53 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
[11/Oct/2018:09:04:36 +0000] "GET /courses/bigdata/spark.html HTTP/1.1" 200 2145339 "-" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36"
[11/Oct/2018:09:06:20 +0000] "GET /atom.xml HTTP/1.1" 200 255069 "-" "Gwene/1.0 (The gwene.org rss-to-news gateway)"
[11/Oct/2018:09:08:37 +0000] "GET /pub/eval-quality-of-open-source-software.pdf HTTP/1.1" 200 1306751 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

Unix for stream processing

Suppose we want to calculate the total number of users on our system per hour every hour.

tail -f log.txt |               # Monitor interactions
sed -e …        |               # Extract user from logline
sort | uniq     |               # Unique users
wc -l           |               # Get user count
xargs -I {} echo -n `date` {}   # Print with timestamp

Q When will the above command finish?

Never! There is no way to tell tail that we want it to aggregate data per hour and make the pipeline recompute when tail emits.

What can we learn from Unix?

Unix streaming

Unix has many components required for stream processing:

  • Streaming data acquisition: tail or pipe
  • Intermediate storage: pipes
  • Ways of applying functions on streaming data

It is missing:

  • Splitting streams in batches (windowing)
  • Recomputing when new batches arrive (triggers)

Stream processing in a nutshell

Stream processing is a set of techniques and corresponding systems that process timestamped events. For a stream processing system to work, we need two major components:

  • A component that acquires events from producers and forwards it to consumers
  • A component that processes events

To make stream processing viable in the real world, both components must be scalable, distributable and fault-tolerant.

Messaging systems

The fundamental entity that stream processing deals with is an event. Events are produced by continuous processes and in order to be processed they must be consumed.

Messaging systems solve the problem of connecting producers to consumers.

Unix again

tail -f log.txt | wc -l

tail is the producer and wc is the consumer. The messaging system is the pipe. A pipe has the following functionality

  • Reads data from the producer and buffers it
  • Blocks the producer when the buffer is full
  • Notifies the consumer that data is available

Pipes implement the publish / subscribe model for 1 producer to 1 consumer.

Publish / Subscribe

Publish/subscribe systems connect multiple producers to multiple consumers.

  • Direct messaging systems use simple network communication (usually UDP) to broadcast messages to multiple consumers. They are fast, but require the producers/consumers to deal with data loss. Example: ZeroMQ

  • Message brokers or queues are centralized systems that sit between producers and consumers and deal with the complexities of reliable message delivery.

Broker-based messaging

The producers send messages in any of the following modes:

  • Fire and forget. The broker acks the message immediately
  • Transaction-based: The broker writes the message to permanent storage prior to ack’ing it.

The broker:

  • Buffers the messages, spilling to disk as necessary
  • Routes the messages to the appropriate queues
  • Notifies consumers when messages have arrived

The consumers:

  • Subscribe to a queue that contains the desired messages
  • Ack the message receipt (or successful processing)

Messaging patterns

Work queue pattern Competing workers: Multiple consumers read from a single queue, competing for incoming messages

Fan out pattern

Fan out: Each consumer has a queue of its own. Incoming messages are replicated on all queues

Topics pattern

Message routing: The producer assigns keys to msg metadata. The consumer creates topic queues by specifying the keys it is interested to receive messages for.

Broker-based example: GHTorrent

GHTorrent architecture

GHTorrent uses topic queues to decouple the following the GitHub event stream from the retrieval of items linked from events. Events are written to the RabbitMQ broker with a routing key according to their event type; a configurable number of data retrieval processes subscribes to those queues.

Drawbacks of broker-based messaging

Broker-based messaging is widely used and well understood. It has however one drawback: after a message is received, it dissapears. This leads to lost opportunities:

  • We cannot reprocess messages (e.g. when a new application version is installed)
  • We cannot prove that a message was delivered

Q: How can we solve those problems?

A: Instead of just forwarding messages, we can store and forward them.

Log-based messaging systems

A log is an append-only data structure stored on disk. We can exploit logs to implement messaging systems:

  • Producers append messages to the log,
  • All consumers connect to the log and pull messages from it. A new client starts processing from the beginning of the log.
  • To maximize performance, the broker partitions and distributes the log to a cluster of machines.
  • The broker keeps track of the current message offset for each consumer per partition

Log-based messaging overview

A generic partitioned log system

Kafka

Kafka use at Uber

Kafka is the most well known log server. It is being used both to aggregate and store raw events and as an intermediary between systems.

Programming models for stream processing

What programming models for streams enable processing of events to derive (some form of) state.

Read also this comprehensive blog post by Martin Kleppmann.

Event sourcing and CQS

Capture all changes to an application state as a sequence of events.

Instead of mutating the application state (e.g. in a database), we store the event that causes the mutation in an immutable log. The application state is generated by processing the events. This enables us to:

  • Use specialized systems for scaling writes (e.g. Kafka) and reads (e.g. Redis), while the application remains stateless.
  • Provide separate, continuously updated views of the application state (e.g. per user, per location etc)
  • Regenerate the application state at any point in time by reprocessing events

Reactive programming

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change (Wikipedia).

Reactive APIs model event sources as infinite collections on which observers subscribe to receive events.

Observable.from(TwitterSource).    // List of tweets
filter{_.location == 'NL'}.        // Do some filtering
flatMap{t => GeolocateService(t)}. // Precise geolocation
groupBy{loc => loc.city}.          // Group results per city
flatMap{grp => grp.map(v => (grp.key, v))}.
subscribe(println)

Example is in the Reactive Extensions (Rx) formulation. .NET Rx and Java 9 (Flow), include facilities for reactive programming.

The Dataflow model

The Dataflow model was introduced by Akidau et al. [1] as a generic implementation of the MillWheel system [2]. Flink was heavily inspired by it.

The DataFlow model attempts to explain stream processing in four dimensions:

  • \(\color{orange}{\sf{What}}\): results are being computed
  • \(\color{blue}{\sf{Where}}\): in event time they are being computed
  • \(\color{green}{\sf{When}}\): in processing time they are materialized
  • \(\color{red}{\sf{How}}\): earlier results relate to later refinements

Time in stream processing

In streaming systems, we have two notions of time:

  • Processing time: the time at which events are observed in the system
  • Event time: the time at which events occurred

D: Describe a scenario where those are different.

Applications that calculate streaming aggregates (e.g. avg rainfall per country per hour) don’t care much about the event order.

Applications with precise timing requirements (e.g. bank transactions, fraud detection) care about event time. Events may however enter the system delayed or out of order.

Event Time skew

Event time skew

If processing (wall-clock) time is \(t\)

  • Skew is calculated \(t - s\), where \(s\) is the timestamp of the latest event processed
  • Lag is calculated as \(t - s\), where \(s\) is the actual timestamp of an event

\(\color{orange}{\sf{What:}}\) operations on streams

  • Element-wise ops apply a function to each individual message. This is the equivalent of map or flatMap in batch systems.

  • Aggregations group multiple events together and apply a reduction (e.g. fold or max) on them.

Element-wise operations: map

\(Stream[A].map(x: A \rightarrow B) : Stream[B]\)

Convert types of stream elements

Map

// Rx
Observable.from(List(1,2,3)).map(x => 10 * x)

// Flink
env.fromCollection(List(1,2,3)).map(x => 10 * x)

Element-wise operations: filter

\(Stream[A].filter(x: A \rightarrow Boolean) : Stream[A]\)

Only keep events that satisfy the predicate.

Filtering

// Rx
Observable.from(List(2,30,22,5,60,1)).map(x => x > 10)

// Flink
env.fromCollection(List(2,30,22,5,60,1)).map(x => x > 10)

Element-wise operations: merge

\(Stream[A].merge(b: Stream[B >: A]) : Stream[B]\)

Emit a stream that combines all events from both streams.

Merging streams

// Rx
val a = Observable.from(List(20,40,60,80,100))
val b = Observable.from(List(1,1))
a.merge(b)

// Flink
val a = env.fromCollection(List(20,40,60,80,100))
val b = env.fromCollection((List(1,1))
a.union(b)

Element-wise operations: flatMap

\(flatMap(f: A \rightarrow Stream[B]) : Stream[B]\)

Apply f on all elements of Stream[A] and flatten any nested results to a new stream.

// Rx
Observable.from(List('foo', 'bar')).
           flatMap(x => Observable.from(x.toCharArray))

// Flink
env.fromCollection(List('foo', 'bar'))
    flatMap(x => x.toCharArray)

Element-wise operations: keyBy

\(Stream[A].keyBy(f: A \rightarrow K): Stream[(K, Stream[T])]\)

Partition a stream using a discriminator function and produce (a stream of) streams that emit the partitioned data.

Stream keyBy

keyBy (or groupBy in Rx) creates partitioned streams that can be processed in parallel. Moreover, keys are required for various operations combining data.

Element-wise operations: join

Stream[A].join(b: Stream[B],
               kl: A => K,
               kr: B => K,
               rs: (A,B) => R): Stream[R]

Join streams \(A\) and \(B\). Key selector functions kl and kr extract keys of type \(K\), on which the join operation is performed. On each joined pair, the result selector function rs is applied to derive the result type.

Stream join

Stream joining example

Find commits that caused exceptions and notify the authors.

case class StackEntry(file: String, line: Int)
case class Exception(exception: String, entries: Seq[StackEntry])
case class DiffLine(file: String, line: Int, content: ...)
case class Commit(author: String, diff: Seq[DiffLine])

val logs : Stream[(Exception, StackEntry)] = env.socketTextStream(host, port).
  flatMap(e => for(s <- e.entries) yield (e, s))
val diffs : Stream[(Commit, Diff)] = env.GitRepoSource(...).
  flatMap(c => for(d <- c.diff) yield(c, d))

logs.join(diffs).
     where(stackEntry => stackEntry.file).
     equalTo(diff => diff._2._file).
     apply((log, diff) => (diff._1.author, diff._1.commit.sha, log.exception)).
     map(e => sendEmail(...))

Q: How can a stream processor execute this?

A: Presumably, stack traces is a faster stream than commits; joining will require all keys to be kept in memory (per processing node). Theoretically, this requires infinite memory.

Aggregations / Reductions

Stream[A].aggregate(f: AggregationFunction[A, T, B]): Stream[B]

trait AggregationFunction[IN, ACC, OUT] {
  def createAccumulator(): ACC
  def add(value: IN, acc: ACC): ACC // Type conversion IN -> ACC
  def getResult(acc: ACC): OUT      // Type conversion ACC -> OUT
  def merge(a: ACC, b: ACC): ACC
}

Aggregations group multiple events together and apply a reduction on them.

Q: How can we aggregate events, when our event stream is infinite?

Hint: remember the Unix example from before.

A: We can create event groups by time (e.g. every 2 minutes) or by count (e.g., every 100 events).

\(\color{blue}{\textsf{Where:}}\) Streaming Windows

Windowing in streaming systems

Windows are static size (e.g., 1000 events) or time-length (e.g., 10 secs) “batches” of data.

Session windows

Session windows

Session windows are dynamically sized windows that aggregate batches of (typically) user activity. Windows end after session gap time.

Example: Aggregating via windows

Out of order events

// Count number of tweets per user per hour
tweets.map(t => (t.user_id, 1))
      .keyBy(x => t._1)
      .timeWindow(Time.minutes(1))
      .reduce((a,b) => a._2 + b._2)

Every hour, this will produce a list of pairs like so:

(323, 1)
(44332, 4)
(212, 32)
...

Example: Using session windows

Session Windows

// Number of clicks per user session
case class Click(id: Integer, link: String, ...)
clickStream.map(c => (c.id, 1))
           .keyBy(x => x._1)
           .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
           .sum(1)

When a session terminates, we get results like:

(323, 1)
(44332, 4)
(212, 32)
...

Windows: Things to remember

There are 2 things to remember when using event-time windows.

  • Buffering: Aggregation functions are applied when the window finishes (see \(\color{green}{\textsf{When}}\)). This means that in-flight events need to be buffered in RAM and spilled to disk.

  • Completeness: Given that events may arrive out of order, how can we know that a window is ready to be materialized and what do we do with out of order events?

\(\color{green}{\textsf{When:}}\) Window Triggers

A trigger defines when in processing time are the results of a window materialized / processed. Two types of triggers can be defined:

Click on the links to watch Akidau’s excellent visualizations

Watermarks

Event-time processors need to determine when event time has progressed enough so that they can trigger windows. When reprocessing events from storage, a system might process weeks of event-time data in seconds; relying on processing time to trigger windows is not enough.

Watermarks flow as part of the data stream and carry a timestamp. They are a declaration that by a point in the stream, all events carrying a timestamp up to the watermark timestamp should have arrived.

Watermarks allow late messages to be processed up to a specified amount of (event-time) delay (allowed lateness).

Watermarks in parallel streams

Watermarks

As the watermarks flow through the streaming program, they advance the event time at the operators where they arrive. Whenever an operator advances its event time, it generates a new watermark downstream for its successor operators.

\(\color{red}{\textsf{How:}}\) Window Refinements

In certain complex cases, a combination of triggers and watermarks flowing may cause a window to be materialized multiple times. In such cases, we can discard, accumulate or accumulate and retract the window results.

For further reading, consult the Beam documentation.

Stream processing systems

Messaging systems are purposed to move data from produces to consumers, in a scalable and fault-tolerant way.

However, they do not process the moved data; this is the job of dedicated, stream processing systems.

The stream as a database

There is nothing that fundamentally disallows event streams from acting as a database.

  • Events can be filtered and transformed
  • Event streams can be joined with other event streams
  • Event streams can be aggregated (given time constraints)
  • Event streams can be replicated on other hosts for scaling and fault tolerance

The main difference between streams and databases is that databases contain state, whereas streams contain state modifications. Therefore, databases can be updated, while streams can be appended.

Approaches to processing streams

  • Micro-batching: Aggregate data in batches of configurable (processing-time) duration

  • Event-based streaming: Process events one by one

Event-time systems can emulate micro-batching by setting an aligned delay trigger to keyed windows.

Apache Spark Streaming

Spark Streaming is an example of a micro-batching architecture. Spark Streaming breaks the input data into batches (of x seconds processing time length) and schedules those in the cluster using the exact same mechanisms for fault tolerance as normal RDDs.

ssc.socketTextStream("localhost", 9999).
  flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

Micro-batch

Issues with micro-batching

  • Latency

    • The micro-batch computation is triggered after the batch times out
    • Each batch needs to be scheduled, libraries need to be loaded, connections need to be open etc
  • Programming model

    • No clean separation of mechanism from business logic
    • Changing the micro-batch size leads to different results

More streaming: Kafka Streams cluster

Kafka Stream architecture

Stateful streaming

Imagine that for each item we process, we would like to keep a counter.

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

The mapWithState operator takes and returns an optional state, which the stream processor must maintain.

Implicit states

In addition, many operators, for example windowing and aggregation ones, are inherently stateful.

Stateful vs Stateless processing

Q: As the processing graph is distributed, we need a consistent, fault-tolerant global view of the counter. How can we implement this?

A: An idea would be to pause all operators, start a 2-phase commit process and restart the processing when all nodes are committed.

But we can do better than that!

The Chandy-Lamport Algorithm

The Chandy-Lamport algorithm [3] can be used to capture consistent global snapshots. It models a distributed system as a graph of processes that have input and output channels, overseen by a snapshot initiator:

  • The snapshot initiator saves its local state and sends a marker to all its output channels
  • All receiving nodes: i) save their local state and the state of the channel that delivered the marker, ii) forward the marker to all outgoing channels
  • When the marker reaches the initiator, the snapshot is done

System view of snapshots

How Flink co-ordingates snapshots

Event processing guarantees

The following guarantees are offered by streaming systems

  • At most once an event will be processed once (if delivered at all)
  • At least once an event might flow through a system twice, in case of failure.
  • Exactly once an event only flows through a set of operators once.

Flink supports exactly once. To do so, it requires the source to support event replay on request and the sink to be transactional. Both requirements are satisfied by Apache Kafka.

Deployment view of a simple application

Image credits

Bibliography

[1]
T. Akidau et al., “The Dataflow model: A practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing,” Proceedings of the VLDB Endowment, vol. 8, no. 12, pp. 1792–1803, 2015.
[2]
T. Akidau et al., “MillWheel: Fault-tolerant stream processing at internet scale,” Proceedings of the VLDB Endowment, vol. 6, no. 11, pp. 1033–1044, 2013.
[3]
K. M. Chandy and L. Lamport, “Distributed snapshots: Determining global states of distributed systems,” ACM Transactions on Computer Systems (TOCS), vol. 3, no. 1, pp. 63–75, 1985.
[4]
P. Carbone, S. Ewen, G. Fóra, S. Haridi, S. Richter, and K. Tzoumas, “State management in apache flink: Consistent stateful distributed stream processing,” Proceedings of the VLDB Endowment, vol. 10, no. 12, pp. 1718–1729, 2017.
[5]
M. Kleppmann, Designing data-intensive applications. O’Reilly Media, Inc., 2017.
[6]
T. Akidau, S. Chernyak, and R. Lax, Streaming systems: The what, where, when, and how of large-scale data processing. O’Reilly, 2018.
[7]
M. Zaharia, T. Das, H. Li, T. Hunter, S. Shenker, and I. Stoica, “Discretized streams: Fault-tolerant streaming computation at scale,” in Proceedings of the twenty-fourth ACM symposium on operating systems principles, 2013, pp. 423–438.