toPandas
(PySpark) / as.data.frame
(SparkR)
Data has to be collected before local data frame is created. For example toPandas
method looks as follows:
def toPandas(self):
import pandas as pd
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
You need Python, optimally with all the dependencies, installed on each node.
SparkR counterpart (as.data.frame
) is simply an alias for collect
.
To summarize in both cases data is collected
to the driver node and converted to the local data structure (pandas.DataFrame
and base::data.frame
in Python and R respectively).
Vectorized user defined functions
Since Spark 2.3.0 PySpark also provides a set of pandas_udf
(SCALAR
, GROUPED_MAP
, GROUPED_AGG
) which operate in parallel on chunks of data defined by
- Partitions in case of
SCALAR
variant - Grouping expression in case of
GROUPED_MAP
andGROUPED_AGG
.
Each chunk is represented by
- One or more
pandas.core.series.Series
in case ofSCALAR
andGROUPED_AGG
variants. - A single
pandas.core.frame.DataFrame
in case ofGROUPED_MAP
variant.
Similarly, since Spark 2.0.0, SparkR provides dapply
and gapply
functions operating on data.frames
defined by partitions and grouping expressions respectively.
Aforementioned functions:
- Don’t collect to the driver. Unless data contains only a single partition (i.e. with
coalesce(1)
) or grouping expression is trivial (i.e.groupBy(lit(1))
) there is no single node bottleneck. - Load respective chunks in memory of the corresponding executor. As a result it is limited by the size of individual chunks / memory available on each executor.