Parsing multiline records in Scala

By default Spark creates a single element per line. It means that in your case every record is spread over multiple elements which, as stated by Daniel Darabos in the comments, can be processed by different workers. Since it looks like your data is relatively regular and separated by an empty line you should be … Read more

A list as a key for PySpark’s reduceByKey

Try this: rdd.map(lambda (k, v): (tuple(k), v)).groupByKey() Since Python lists are mutable it means that cannot be hashed (don’t provide __hash__ method): >>> a_list = [1, 2, 3] >>> a_list.__hash__ is None True >>> hash(a_list) Traceback (most recent call last): File “<stdin>”, line 1, in <module> TypeError: unhashable type: ‘list’ Tuples from the other hand … Read more

How to extract an element from a array in pyspark

Create sample data: from pyspark.sql import Row x = [Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])] rdd = sc.parallelize([Row(col1=”xx”, col2=”yy”, col3=”zz”, col4=[123,234])]) df = spark.createDataFrame(rdd) df.show() #+—-+—-+—-+———-+ #|col1|col2|col3| col4| #+—-+—-+—-+———-+ #| xx| yy| zz|[123, 234]| #+—-+—-+—-+———-+ Use getItem to extract element from the array column as this, in your actual case replace col4 with collect_set(TIMESTAMP): df = df.withColumn(“col5”, … Read more

Spark groupByKey alternative

groupByKey is fine for the case when we want a “smallish” collection of values per key, as in the question. TL;DR The “do not use” warning on groupByKey applies for two general cases: 1) You want to aggregate over the values: DON’T: rdd.groupByKey().mapValues(_.sum) DO: rdd.reduceByKey(_ + _) In this case, groupByKey will waste resouces materializing … Read more

How do I get a SQL row_number equivalent for a Spark RDD?

The row_number() over (partition by … order by …) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames. Create a test DataFrame: from pyspark.sql import Row, functions as F testDF = sc.parallelize( (Row(k=”key1″, v=(1,2,3)), Row(k=”key1″, v=(1,4,7)), Row(k=”key1″, v=(2,2,3)), Row(k=”key2″, v=(5,5,5)), Row(k=”key2″, v=(5,5,9)), Row(k=”key2″, v=(7,5,5)) ) ).toDF() Add the partitioned row number: from pyspark.sql.window import … Read more

Spark parquet partitioning : Large number of files

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark)) Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing … Read more

How to find spark RDD/Dataframe size?

If you are simply looking to count the number of rows in the rdd, do: val distFile = sc.textFile(file) println(distFile.count) If you are interested in the bytes, you can use the SizeEstimator: import org.apache.spark.util.SizeEstimator println(SizeEstimator.estimate(distFile)) https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/SizeEstimator.html

How to read from hbase using spark

A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java : import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(“HBaseRead”).setMaster(“local[2]”) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val … Read more