Build a hierarchy from a relational data-set using Pyspark

Shortest Path with Pyspark

The input data can be interpreted as a graph with the connections between currentnode and childnode. Then the question is what is the shortest path between the root node and all leaf nodes and is called single source shortest path.

Spark has Graphx to handle parallel computations of graphs. Unfortunately, GraphX does not provide a Python API (more details can be found here). A graph library with Python support is GraphFrames. GraphFrames uses parts of GraphX.

Both GraphX and GraphFrames provide an solution for sssp. Unfortunately again, both implementations return only the length of the shortest paths, not the paths themselves (GraphX and GraphFrames). But this answer provides an implementation of the algorithm for GraphX and Scala that also returns the paths. All three solutions use Pregel.

Translating the aforementioned answer to GraphFrames/Python:

1. Data preparation

Provide unique IDs for all nodes and change the column names so that they fit to the names described here

import pyspark.sql.functions as F

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 
Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+   

2. Create the GraphFrame

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3. Create UDFs that will form the single parts of the Pregel algorithm

The message type:

from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

The vertex program:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

The outgoing messages:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

Message aggregation:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4. Combine the parts

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)   

Remarks:

  • maxIter should be set to a value at least as large as the longest path. If the value is higher, the result will stay unchanged, but the computation time becomes longer. If the value is too small, the longer paths will be missing in the result. The current version of GraphFrames (0.8.0) does not support stopping the loop when no more new messages are sent.
  • checkpointInterval should be set to a value smaller than maxIter. The actual value depends on the data and the available hardware. When OutOfMemory exception occur or the Spark session hangs for some time, the value could be reduced.

The final result is a regular dataframe with the content

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

If necessary the non-leaf nodes could be filtered out here.

Leave a Comment