PySpark Evaluation

It happens because of Python late binding and is not (Py)Spark specific. i will be looked-up when lambda p : int(p) + i is used, not when it is defined. Typically it means when it is called but in this particular context it is when it is serialized to be send to the workers. You … Read more

Running custom Java class in PySpark

In PySpark try the following from py4j.java_gateway import java_import java_import(sc._gateway.jvm,”org.foo.module.Foo”) func = sc._gateway.jvm.Foo() func.fooMethod() Make sure that you have compiled your Java code into a runnable jar and submit the spark job like so spark-submit –driver-class-path “name_of_your_jar_file.jar” –jars “name_of_your_jar_file.jar” name_of_your_python_file.py

Scala-Spark Dynamically call groupby and agg with parameter values

Your code is almost correct – with two issues: The return type of your function is DataFrame, but the last line is aggregated.show(), which returns Unit. Remove the call to show to return aggregated itself, or just return the result of agg immediately DataFrame.groupBy expects arguments as follows: col1: String, cols: String* – so you … Read more

PySpark: compute row maximum of the subset of columns and add to an exisiting dataframe

Let’s start with a couple of imports from pyspark.sql.functions import col, lit, coalesce, greatest Next define minus infinity literal: minf = lit(float(“-inf”)) Map columns and pass the result to greatest: rowmax = greatest(*[coalesce(col(x), minf) for x in [‘v2′,’v3′,’v4’]]) Finally withColumn: df1.withColumn(“rowmax”, rowmax) with result: +—+—+—+—-+——+ | v1| v2| v3| v4|rowmax| +—+—+—+—-+——+ |foo|1.0|3.0|null| 3.0| |bar|2.0|2.0| -10| … Read more

How to vectorize DataFrame columns for ML algorithms?

You can simply foldLeft over the Array of columns: val transformed: DataFrame = df.columns.foldLeft(df)((df, arg) => str(arg, df)) Still, I will argue that it is not a good approach. Since src discards StringIndexerModel it cannot be used when you get new data. Because of that I would recommend using Pipeline: import org.apache.spark.ml.Pipeline val transformers: Array[org.apache.spark.ml.PipelineStage] … Read more

Pyspark 2.4.0, read avro from kafka with read stream – Python

You can include spark-avro package, for example using –packages (adjust versions to match spark installation): bin/pyspark –packages org.apache.spark:spark-avro_2.11:2.4.0 and provide your own wrappers: from pyspark.sql.column import Column, _to_java_column def from_avro(col, jsonFormatSchema): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = getattr(getattr(avro, “package$”), “MODULE$”).from_avro return Column(f(_to_java_column(col), jsonFormatSchema)) def to_avro(col): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = … Read more

How to calculate Median in spark sqlContext for column of data type double

For non integral values you should use percentile_approx UDF: import org.apache.spark.mllib.random.RandomRDDs val df = RandomRDDs.normalRDD(sc, 1000, 10, 1).map(Tuple1(_)).toDF(“x”) df.registerTempTable(“df”) sqlContext.sql(“SELECT percentile_approx(x, 0.5) FROM df”).show // +——————–+ // | _c0| // +——————–+ // |0.035379710486199915| // +——————–+ On a side not you should use GROUP BY not PARTITION BY. Latter one is used for window functions and … Read more