Encode and assemble multiple features in PySpark

Spark >= 2.3, >= 3.0

Since Spark 2.3 OneHotEncoder is deprecated in favor of OneHotEncoderEstimator. If you use a recent release please modify encoder code

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

In Spark 3.0 this variant has been renamed to OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

Additionally StringIndexer has been extended to support multiple input columns:

StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])

Spark < 2.3

Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:

from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
  row("0", 3.0, DenseVector([0, 2.1, 1.0])),
  row("1", 1.0, DenseVector([0, 1.1, 1.0])),
  row("1", -1.0, DenseVector([0, 3.4, 0.0])),
  row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

First of all StringIndexer.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## |     0| 3.0|           0.0|
## |     1| 1.0|           1.0|
## |     1|-1.0|           1.0|
## |     0|-3.0|           0.0|
## +------+----+--------------+

Next OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## |     0| 3.0|           0.0|(1,[0],[1.0])|
## |     1| 1.0|           1.0|    (1,[],[])|
## |     1|-1.0|           1.0|    (1,[],[])|
## |     0|-3.0|           0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["gender_vector", "bar", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
    .fit(encoded_df)
    .transform(encoded_df))

final_df = assembler.transform(encoded_df)

If bar contained categorical variables you could use VectorIndexer to set required metadata:

from pyspark.ml.feature import VectorIndexer

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")

but it is not the case here.

Finally you can wrap all of that using pipelines:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer and VectorIndexer.

Another way to get a comparable output is RFormula which:

RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

from pyspark.ml.feature import RFormula

rf = RFormula(formula="~ gender +  bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)

As you can see it is much more concise, but harder to compose doesn’t allow much customization. Nevertheless the result for a simple pipeline like this one will be identical:

final_df_rf.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+


final_df.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+

Regarding your questions:

make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)

It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.

take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

Note:

For Spark 1.x replace pyspark.ml.linalg with pyspark.mllib.linalg.

Leave a Comment