Random numbers generation in PySpark

So the actual problem here is relatively simple. Each subprocess in Python inherits its state from its parent: len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect())) # 1 Since parent state has no reason to change in this particular scenario and workers have a limited lifespan, state of every child will be exactly the same on each run.

SparkSQL on pyspark: how to generate time series?

EDIT This creates a dataframe with one row containing an array of consecutive dates: from pyspark.sql.functions import sequence, to_date, explode, col spark.sql(“SELECT sequence(to_date(‘2018-01-01’), to_date(‘2018-03-01’), interval 1 month) as date”) +——————————————+ | date | +——————————————+ | [“2018-01-01″,”2018-02-01″,”2018-03-01″] | +——————————————+ You can use the explode function to “pivot” this array into rows: spark.sql(“SELECT sequence(to_date(‘2018-01-01’), to_date(‘2018-03-01’), interval 1 … Read more

Apache Spark — Assign the result of UDF to multiple dataframe columns

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType: from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType schema = StructType([ StructField(“foo”, FloatType(), False), StructField(“bar”, FloatType(), False) ]) def udf_test(n): return (n / 2, … Read more

spark.ml StringIndexer throws ‘Unseen label’ on fit()

Unseen label is a generic message which doesn’t correspond to a specific column. Most likely problem is with a following stage: StringIndexer(inputCol=”lang”, outputCol=”lang_idx”) with pl-PL present in train(“lang”) and not present in test(“lang”). You can correct it using setHandleInvalid with skip: from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, “foo”), (2, “bar”)]).toDF([“k”, “v”]) test = sc.parallelize([(3, … Read more

How to add a SparkListener from pySpark in Python?

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure: . ├── build.sbt └── src └── main └── scala └── net └── zero323 └── spark └── examples └── listener ├── Listener.scala ├── … Read more