Spark Datasets

RDDs and structure

RDDs only see binary blobs with an attached type

Databases can do many optimizations because they know the data types for each field

How to efficiently join?

D: Say that we have an RDD of 100k people and another of 300k addresses. We need to get the details of all people that live in Delft, along with their addresses. Which of the following joins will be faster?

val people : RDD[Person]
val addresses: RDD[(Int, Address)]

//Option 1
people.keyBy(_.id).join(addresses.filter(x._2._2.city == "Delft"))

//Option 2
people.keyBy(_.id).join(addresses).filter(x._2._2.city == "Delft")

//Option 3
people.keyBy(_.id).cartesian(addresses).filter(x._2._2.city == "Delft")

Join 1 is the fastest. However, why do we have to do this optimization ourselves? Because Spark does not know the schema and what the \(\lambda\)s that are args to filter do!

Spark SQL

In Spark SQL, we trade some of the freedom provided by the RDD API to enable:

  • declarativity, in the form of SQL
  • automatic optimizations, similar to ones provided by databases
    • execution plan optimizations
    • data movement/partitioning optimizations

The price we have to pay is to bring our data to a (semi-)tabular format and describe its schema. Then, we let relational algebra work for us.

Spark SQL basics

SparkSQL is a library build on top of Spark RDDs. It provides two main abstractions:

  • Datasets, collections of strongly-typed objects. Scala/Java only!
  • Dataframes, essentially a Dataset[Row], where Row \(\approx\) Array[Object]. Equivalent to R or Pandas Dataframes
  • SQL syntax

It offers a query optimizer (Catalyst) and an off-heap data cache (Tungsteen).

It can directly connect and use structured data sources (e.g. SQL databases) and can import CSV, JSON, Parquet, Avro and data formats by inferring their schema.

How efficient is Spark SQL?

A blog post by MySQL experts Percona wanted to find the number of delayed files per airline using the air trafic dataset.

Running the query on MySQL, it took 19 mins.

On the same server, they used Spark SQL to connect to MySQL, partitioned the Dataframe that resulted from the connection and run the query in Spark SQL. It took 192 secs!

This was the result of Catalyst rewriting the SQL query: instead of 1 complex query, SparkSQL run 24 parallel ones using range conditions to restrict the examined data volumes. MySQL cannot do this!

The SparkSession

Similarly to normal Spark, SparkSQL needs a context object to invoke its functionality. This is the SparkSession.

If a SparkContext object exists, it is straightforward to get a SparkSession

val ss = new SparkSession(sc)

Creating Data Frames and Datasets

  1. From RDDs containing tuples, e.g. RDD[(String, Int, String)]
import spark.implicits._

val df = rdd.toDF("name", "id", "address")
  1. From RDDs with known complex types, e.g. RDD[Person]
val df = persons.toDF() // Columns names/types are infered!
  1. From RDDs, with manual schema definition
val schema = StructType(Array(
  StructField("level", StringType, nullable = true),
  StructField("date", DateType, nullable = true),
  StructField("client_id", IntegerType, nullable = true),
  StructField("stage", StringType, nullable = true),
  StructField("msg", StringType, nullable = true),
))

val rowRDD = sc.textFile("ghtorrent-log.txt").
             map(_.split(" ")).
             map(r => Row(r(0), new Date(r(1)), r(2).toInt,
                          r(3), r(4)))

val logDF = spark.createDataFrame(rowRDD, schema)

  1. By reading (semi-)structured data files
val df = spark.read.json("examples/src/main/resources/people.json")

or

df = sqlContext.read.csv("/datasets/pullreqs.csv", sep=",", header=True,
                          inferSchema=True)

RDDs and Datasets

Both RDDs and Datasets:

  1. Are stronlgy typed (they include a type parameter, i.e. RDD[T])
  2. Contain objects that need to be serialized

The main difference is that Datasets use special Encoders to convert the data in compact internal formats that Spark can use directly when applying operations such as map or filter.

The internal format is very efficient; it is not uncommon to have in-memory data that use less memory space than their on disk format.

Moral: when in doubt, provide a schema!

Columns

Columns in DataFrames/Sets can be accessed by name:

df["team_size"]
df.team_size

or in Scala

df("team_size")
$"team_size" //scala only

Columns are defined by expressions. The API overides language operators to return expression objects. For example, the following:

df("team_size") + 1

is syntactic sugar for

spark.sql.expressions.Add(df("team_size"), lit(1).expr)

Data frame operations

DataFrames/Datasets include all typical relational algebra operations:

  • Projection
df.select("project_name").show()
df.drop("project_name", "pullreq_id").show()
  • Selection
df.filter(df.team_size.between(1,4)).show()

Joins

Dataframes can be joined irrespective of the underlying implementation, as long as they share a key.

people = sqlContext.read.csv("people.csv")
department = sqlContext.read.jdbc("jdbc:mysql://company/departments")

people.filter(people.age > 30).\
       join(department, people.deptId == department.id)

All types of joins are supported:

  • Left outer:
people.join(department, people.deptId == department.id,
            how = left_outer)`
  • Full outer:
people.join(department, people.deptId == department.id,
            how = full_outer)

Grouping and Aggregations

When groupBy is called on a Dataframe, it is (conceptually) split in a key/value structure, where key is the different values of the column groupped upon and value are rows containing each individual value.

Same as SQL, we can only apply aggregate functions on groupped Dataframes

df.groupBy(df.project_name).mean("lifetime_minutes").show()

SparkSQL under the covers

Why is SparkSQL so fast? Because it uses optimisation and code generation.

The key to both features is that code passed to higher order functions (e.g. the predicate to filter) is syntactic sugar to generate expression trees.

df.filter(df("team_size") > (3 + 1))

is converted to

df.filter(GreaterThan(
            UnresolvedAttribute("team_size"),
            Add(Literal(3) + Literal(1))))

This corresponds to an Abstract Syntax Tree

Optimization

The optimizer uses tree patterns to simply the AST. For example, the following tree:

val ast = GreaterThan(
            UnresolvedAttribute("team_size"),
            Add(Literal(3) + Literal(1)))

can be simplified to:

val opt = ast.transform {
  case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}
GreaterThan(UnresolvedAttribute("team_size"),
            Literal(4))

Catalyst knows 10s (100s?) such optimizations. The purpose is to minimize work done when the query is evaluated on real data.

Code generation

Given an expression tree and a context (the Dataframe), Catalyst exploits Scala’s compiler ability to manipulate ASTs as Strings.

def compile(node: Node): AST = node match {
  case Literal(value) => q"$value"
  case Attribute(name) => q"row.get($name)"
  case GreaterThan(left, right) =>
        q"${compile(left)} > ${compile(right)}"
}

The previous expression tree is converted (roughly!) to

df.filter(row => row.get("team_size") > 4)

and gets compliled at runtime to JVM code.

Deriving an execution plan

Catalyst performs the following steps:

  • Analysis, to get to know the types of column expressions. This will, e.g. resolve UnresolvedAttribute("team_size") to Attribute("team_size") of type Int(and would also check whether team_size exists in df)
  • Rule optimization, as described above
  • Physical optimization, to minimize data movement
  • Code generation, as described before

The end result is native code that runs in optimal locations on top of an RDD.

Bibliography

[1]
M. Armbrust et al., “SparkSQL: Relational data processing in spark,” in Proceedings of the 2015 ACM SIGMOD international conference on management of data, 2015, pp. 1383–1394.