RDDs only see binary blobs with an attached type
Databases can do many optimizations because they know the data types for each field
D: Say that we have an RDD of 100k people and another of 300k addresses. We need to get the details of all people that live in Delft, along with their addresses. Which of the following joins will be faster?
Join 1 is the fastest. However, why do we have to do this optimization ourselves? Because Spark does not know the schema and what the \(\lambda\)s that are args to
In Spark SQL, we trade some of the freedom provided by the RDD API to enable:
The price we have to pay is to bring our data to a (semi-)tabular format and describe its schema. Then, we let relational algebra work for us.
SparkSQL is a library build on top of Spark RDDs. It provides two main abstractions:
Array[Object]. Equivalent to R or Pandas Dataframes
It offers a query optimizer (Catalyst) and an off-heap data cache (Tungsteen).
It can directly connect and use structured data sources (e.g. SQL databases) and can import CSV, JSON, Parquet, Avro and data formats by inferring their schema.
Running the query on MySQL, it took 19 mins.
On the same server, they used Spark SQL to connect to MySQL, partitioned the Dataframe that resulted from the connection and run the query in Spark SQL. It took 192 secs!
This was the result of Catalyst rewriting the SQL query: instead of 1 complex query, SparkSQL run 24 parallel ones using range conditions to restrict the examined data volumes. MySQL cannot do this!
Similarly to normal Spark, SparkSQL needs a context object to invoke its functionality. This is the
SparkContext object exists, it is straightforward to get a SparkSession
RDD[(String, Int, String)]
val schema = StructType(Array( StructField("level", StringType, nullable = true), StructField("date", DateType, nullable = true), StructField("client_id", IntegerType, nullable = true), StructField("stage", StringType, nullable = true), StructField("msg", StringType, nullable = true), )) val rowRDD = sc.textFile("ghtorrent-log.txt"). map(_.split(" ")). map(r => Row(r(0), new Date(r(1)), r(2).toInt, r(3), r(4))) val logDF = spark.createDataFrame(rowRDD, schema)
Both RDDs and Datasets:
The main difference is that Datasets use special
Encoders to convert the data in compact internal formats that Spark can use directly when applying operations such as
The internal format is very efficient; it is not uncommon to have in-memory data that use less memory space than their on disk format.
Moral: when in doubt, provide a schema!
Columns in DataFrames/Sets can be accessed by name:
or in Scala
Columns are defined by expressions. The API overides language operators to return expression objects. For example, the following:
is syntactic sugar for
DataFrames/Datasets include all typical relational algebra operations:
Dataframes can be joined irrespective of the underlying implementation, as long as they share a key.
All types of joins are supported:
groupBy is called on a Dataframe, it is (conceptually) split in a key/value structure, where key is the different values of the column groupped upon and value are rows containing each individual value.
Same as SQL, we can only apply aggregate functions on groupped Dataframes
Why is SparkSQL so fast? Because it uses optimisation and code generation.
The key to both features is that code passed to higher order functions (e.g. the predicate to
filter) is syntactic sugar to generate expression trees.
is converted to
This corresponds to an Abstract Syntax Tree
The optimizer uses tree patterns to simply the AST. For example, the following tree:
can be simplified to:
Given an expression tree and a context (the Dataframe), Catalyst exploits Scala’s compiler ability to manipulate ASTs as Strings.
The previous expression tree is converted (roughly!) to
and gets compliled at runtime to JVM code.
Catalyst performs the following steps:
Int(and would also check whether
The end result is native code that runs in optimal locations on top of an RDD.
 M. Armbrust et al., “SparkSQL: Relational data processing in spark,” in Proceedings of the 2015 acm sigmod international conference on management of data, 2015, pp. 1383–1394.
This work is (c) 2017, 2018, 2019, 2020 - onwards by TU Delft and Georgios Gousios and licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International license.