Tuning parameters for implicit pyspark.ml ALS matrix factorization model through pyspark.ml CrossValidator

Ignoring technical issues, strictly speaking neither method is correct given the input generated by ALS with implicit feedback. you cannot use RegressionEvaluator because, as you already know, prediction can be interpreted as a confidence value and is represented as a floating point number in range [0, 1] and label column is just an unbound integer. … Read more

How to map features from the output of a VectorAssembler back to the column names in Spark ML?

As of today Spark doesn’t provide any method that can do it for you, so if you have to create your own. Let’s say your data looks like this: import random random.seed(1) df = sc.parallelize([( random.choice([0.0, 1.0]), random.choice([“a”, “b”, “c”]), random.choice([“foo”, “bar”]), random.randint(0, 100), random.random(), ) for _ in range(100)]).toDF([“label”, “x1”, “x2”, “x3”, “x4”]) and … Read more

How to find mean of grouped Vector columns in Spark SQL?

Spark >= 2.4 You can use Summarizer: import org.apache.spark.ml.stat.Summarizer val dfNew = df.as[(Int, org.apache.spark.mllib.linalg.Vector)] .map { case (group, v) => (group, v.asML) } .toDF(“group”, “features”) dfNew .groupBy($”group”) .agg(Summarizer.mean($”features”).alias(“means”)) .show(false) +—–+——————————————————————–+ |group|means | +—–+——————————————————————–+ |1 |[8.740630742016827E12,2.6124956666260462E14,3.268714653521495E14] | |6 |[2.1153266920139112E15,2.07232483974322592E17,6.2715161747245427E17]| |3 |[6.3781865566442836E13,8.359124419656149E15,1.865567821598214E14] | |5 |[4.270201403521642E13,6.561211706745676E13,8.395448246737938E15] | |9 |[3.577032684241448E16,2.5432362841314468E16,2.3744826986293008E17] | |4 |[2.339253775419023E14,8.517531902022505E13,3.055115780965264E15] | |8 |[8.029924756674456E15,7.284873600992855E17,3.08621303029924E15] | |7 |[3.2275104122699105E15,7.5472363442090208E16,7.022556624056291E14] … Read more

How to define a custom aggregation function to sum a column of Vectors?

Spark >= 3.0 You can use Summarizer with sum import org.apache.spark.ml.stat.Summarizer df .groupBy($”id”) .agg(Summarizer.sum($”vec”).alias(“vec”)) Spark <= 3.0 Personally I wouldn’t bother with UDAFs. There are more than verbose and not exactly fast (Spark UDAF with ArrayType as bufferSchema performance issues) Instead I would simply use reduceByKey / foldByKey: import org.apache.spark.sql.Row import breeze.linalg.{DenseVector => BDV} import … Read more

MatchError while accessing vector column in Spark 2.0

This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but o.a.s.ml.linalg.VectorUDT and are mapped locally to subclasses of o.a.s.ml.linalg.Vector. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0. You can convert between to “old” using Vectors.fromML: import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import … Read more

Create a custom Transformer in PySpark ML

Can I extend the default one? Not really. Default Tokenizer is a subclass of pyspark.ml.wrapper.JavaTransformer and, same as other transfromers and estimators from pyspark.ml.feature, delegates actual processing to its Scala counterpart. Since you want to use Python you should extend pyspark.ml.pipeline.Transformer directly. import nltk from pyspark import keyword_only ## < 2.0 -> pyspark.ml.util.keyword_only from pyspark.ml … Read more

Dropping a nested column from Spark DataFrame

It is just a programming exercise but you can try something like this: import org.apache.spark.sql.{DataFrame, Column} import org.apache.spark.sql.types.{StructType, StructField} import org.apache.spark.sql.{functions => f} import scala.util.Try case class DFWithDropFrom(df: DataFrame) { def getSourceField(source: String): Try[StructField] = { Try(df.schema.fields.filter(_.name == source).head) } def getType(sourceField: StructField): Try[StructType] = { Try(sourceField.dataType.asInstanceOf[StructType]) } def genOutputCol(names: Array[String], source: String): Column = … Read more

How to access element of a VectorUDT column in a Spark DataFrame?

Convert output to float: from pyspark.sql.types import DoubleType from pyspark.sql.functions import lit, udf def ith_(v, i): try: return float(v[i]) except ValueError: return None ith = udf(ith_, DoubleType()) Example usage: from pyspark.ml.linalg import Vectors df = sc.parallelize([ (1, Vectors.dense([1, 2, 3])), (2, Vectors.sparse(3, [1], [9])) ]).toDF([“id”, “features”]) df.select(ith(“features”, lit(1))).show() ## +—————–+ ## |ith_(features, 1)| ## +—————–+ … Read more

How to handle categorical features with spark-ml?

I just wanted to complete Holden’s answer. Since Spark 2.3.0,OneHotEncoder has been deprecated and it will be removed in 3.0.0. Please use OneHotEncoderEstimator instead. In Scala: import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer} val df = Seq((0, “a”, 1), (1, “b”, 2), (2, “c”, 3), (3, “a”, 4), (4, “a”, 4), (5, “c”, 3)).toDF(“id”, “category1”, “category2”) val … Read more