Requirements for converting Spark dataframe to Pandas/R dataframe

toPandas (PySpark) / as.data.frame (SparkR)

Data has to be collected before local data frame is created. For example toPandas method looks as follows:

def toPandas(self):
    import pandas as pd
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)

You need Python, optimally with all the dependencies, installed on each node.

SparkR counterpart (as.data.frame) is simply an alias for collect.

To summarize in both cases data is collected to the driver node and converted to the local data structure (pandas.DataFrame and base::data.frame in Python and R respectively).

Vectorized user defined functions

Since Spark 2.3.0 PySpark also provides a set of pandas_udf (SCALAR, GROUPED_MAP, GROUPED_AGG) which operate in parallel on chunks of data defined by

  • Partitions in case of SCALAR variant
  • Grouping expression in case of GROUPED_MAP and GROUPED_AGG.

Each chunk is represented by

  • One or more pandas.core.series.Series in case of SCALAR and GROUPED_AGG variants.
  • A single pandas.core.frame.DataFrame in case of GROUPED_MAP variant.

Similarly, since Spark 2.0.0, SparkR provides dapply and gapply functions operating on data.frames defined by partitions and grouping expressions respectively.

Aforementioned functions:

  • Don’t collect to the driver. Unless data contains only a single partition (i.e. with coalesce(1)) or grouping expression is trivial (i.e. groupBy(lit(1))) there is no single node bottleneck.
  • Load respective chunks in memory of the corresponding executor. As a result it is limited by the size of individual chunks / memory available on each executor.

Leave a Comment