Graphs and other forms of hierarchical data structures appear every time a system models a dependency relationship. Common big graphs are:
A graph (\(G\)) comprises nodes (\(V\)) and edges (\(E\)). Both can carry metadata. We represent graphs as:
List[(V, List[V])]
where
each tuple represents a node and its connections to other nodes.List[(V, V)]
of node pairs that
represents and edgeGraph components: subgraphs in which any two vertices are connected to each other by paths.
Strongly connected component: The largest sub-graph with a path from each node to every other node
Triangles or polygons: sub-graphs with 3 or more nodes, where each node is connect to at most 2 others
Spanning trees: A sub-graph that contains all nodes and the minimum number of edges
To process graphs, we can:
For really big graphs, our options are somewhat limited
Not all applications need to process billions of nodes and trillions of edges. For small to medium sized graphs (< 500M edges), existing tools can go a long way.
CREATE TABLE nodes (
id INTEGER,
...
metadata
)
CREATE TABLE edges (
INTEGER,
src INTEGER,
target ...,
metadata CONSTRAINT src_fkey
FOREIGN KEY (src) REFERENCES nodes(id),
CONSTRAINT target_fkey
FOREIGN KEY (target_id) REFERENCES nodes(id)
)
We model graphs as node pairs. Nodes and edges have metadata.
WITH RECURSIVE transitive_closure (a, b, distance, path) AS
(SELECT a, b, 1 as distance, a || '->' || b AS path
FROM edges
UNION ALL
SELECT tc.a, e.b, tc.distance + 1, tc.path || '->' || e.b
FROM edges e
JOIN transitive_closure tc ON e.a = tc.b
WHERE a.metadata = ... -- Traversal filters on nodes/edges
and tc.path NOT LIKE '%->' || e.b || '->%'
)SELECT * FROM transitive_closure
Recursive queries have a starting clause that is called on and a recursion clause
Given that we (blue node) are direct friends with the yellow nodes, we could recommend second level (red) friends as potential new connections.
WITH RECURSIVE transitive_closure (a, b, distance, path) AS
(-- Find the yellow nodes
SELECT a, b, 1 as distance, a || '->' || b AS path
FROM edges
WHERE a = src -- the blue node
UNION ALL
-- Find the red nodes
SELECT tc.a, e.b, tc.distance + 1, tc.path || '->' || e.b
FROM edges e
JOIN transitive_closure tc ON e.a = tc.b
WHERE tc.path NOT LIKE '%->' || e.b || '->%'
AND tc.distance < 2 -- don't recurse into white nodes
)SELECT a, b FROM transitive_closure
GROUP BY a, b
HAVING MIN(distance) = 2 -- only report red nodes
The base expression will find all directly connected nodes, while the second will recurse into their first level descendants.
Graph databases are specialized RDBMs for storing recursive data structures and support CRUD operations on them, while maintaining transactional consistency (ACID or otherwise).
The most commonly used language for graph databases is Cypher, the base language for Neo4J.
:User{'foo'})
MATCH (n-[:FRIENDS_WITH*2]-m
MATCH nWHERE NOT n-[:FRIENDS_WITH]-m
RETURN m
Graphs are an inherently recursive data structures, which means that computations may have dependencies to previous computation steps (and thus they are not trivially parallelizable).
Pagerank is a centrality measure based on the idea that nodes are important if multiple important nodes point to them. For node \(p_i\), its Page rank is recursively defined as
\[ PR(p_i; 0) = \frac{1}{N} \\ PR(p_i; t+1) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PR (p_j; t)}{L(p_j)} \]
where \(d\) is a dumping factor (usually set 0.85) and \(M(p_i)\) are the nodes pointing to \(p_i\) We notice that each node updates other nodes by propagating its state.
val links: RDD[(V,List(E))] = ....cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap {
case (links, rank) =>
val size = links.size
.map(url => (links, rank / size))
links}
= contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
ranks }
The computation is iterative and side-effecting and therefore non-parallelizable. To make it side-effect free, we need to write each step of the computation to external storage.
Graph compression attempts to exploit graph characteristics to create an in-memory compressed edge representation.
Can compress billion edge graphs to ~3 bits / edge (down from 64).
Succinct representation enables good locality properties and fast traversals.
For more information look at this presentation by Paolo Boldi and Sebastiano Vigna.
The BSP model is a general model for parallel algorithms.
It assumes that a system has:
BSP computation is organized in supersteps. A superstep comprises three phases:
Pregel (by Google) is a distributed graph processing framework
Open source implmentations: Apache Giraph and GraphX.
BSP programs run until the programs stop themselves. Termination works as follows
Graphs are stored as adjacency lists, partitioned (using hash partitioning) and distributed using a network filesystem
Leader: Maintains a mapping between data partitions and cluster node. Implements the BSP barrier
Worker: For each vertex, it maintains the following in memory:
The worker applies all computationally intensive operations.
def pregel[A](
// Initialization message
: A,
initialMsg// Max super steps
: Int = Int.MaxValue,
maxIter: EdgeDirection = EdgeDirection.Out,
activeDir// Program to update the vertex
: (VertexId, VD, A) => VD,
vprog// Program to determine edges to send a message to
: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
sendMsg// Program to combine incoming messages
: (A, A) => A
mergeMsg) : Graph[V, E]
Spark uses its underlying fault tolerance, check pointing, partitioning and communication mechanisms to store the graph. Halting is determined by examining if the vertex is sending / receiving messages.
val pagerankGraph: Graph[Double, Double] = graph
.mapVertices((id, attr) => 1.0) // Initial Pagerank for nodes
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
+ (1.0 - resetProb) * msgSum
resetProb def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0
// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
, sendMessage, messageCombiner) vertexProgram
From Pregel.scala in Apache Spark
The fault tolerance model is reminiscent of Spark.
Periodically, the leader instructs the workers to save the state of their in-memory data to persistent storage
Worker failure detected through keep-alive messages the leader issues to workers
In case of failure, the leader reassigns graph partitions to live workers; they reload their partition state from the most recently available checkpoint