combine text from multiple rows in pyspark

One option is to use pyspark.sql.functions.collect_list() as the aggregate function. from pyspark.sql.functions import collect_list grouped_df = spark_df.groupby(‘category’).agg(collect_list(‘name’).alias(“name”)) This will collect the values for name into a list and the resultant output will look like: grouped_df.show() #+———+———+ #|category |name | #+———+———+ #|A |[A1, A2] | #|B |[B1, B2] | #+———+———+ Update 2019-06-10: If you wanted your … Read more

Manually create a pyspark dataframe

Simple dataframe creation: df = spark.createDataFrame( [ (1, “foo”), # create your data here, be consistent in the types. (2, “bar”), ], [“id”, “label”] # add your column names here ) df.printSchema() root |– id: long (nullable = true) |– label: string (nullable = true) df.show() +—+—–+ | id|label| +—+—–+ | 1| foo| | 2| … Read more

Reading parquet files from multiple directories in Pyspark

A little late but I found this while I was searching and it may help someone else… You might also try unpacking the argument list to spark.read.parquet() paths=[‘foo’,’bar’] df=spark.read.parquet(*paths) This is convenient if you want to pass a few blobs into the path argument: basePath=”s3://bucket/” paths=[‘s3://bucket/partition_value1=*/partition_value2=2017-04-*’, ‘s3://bucket/partition_value1=*/partition_value2=2017-05-*’ ] df=spark.read.option(“basePath”,basePath).parquet(*paths) This is cool cause you don’t … Read more

Applying a Window function to calculate differences in pySpark

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this: from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions … Read more