Spark: Inconsistent performance number in scaling number of cores

Theoretical limitations I assume you are familiar Amdahl’s law but here is a quick reminder. Theoretical speedup is defined as followed : where : s – is the speedup of the parallel part. p – is fraction of the program that can be parallelized. In practice theoretical speedup is always limited by the part that … Read more

Write a file in hdfs with Java

an alternative to @Tariq’s asnwer you could pass the URI when getting the filesystem import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.conf.Configuration import java.net.URI import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Progressable import java.io.BufferedWriter import java.io.OutputStreamWriter Configuration configuration = new Configuration(); FileSystem hdfs = FileSystem.get( new URI( “hdfs://localhost:54310” ), configuration ); Path file = new Path(“hdfs://localhost:54310/s2013/batch/table.html”); if ( hdfs.exists( file )) { hdfs.delete( … Read more

hadoop No FileSystem for scheme: file

This is a typical case of the maven-assembly plugin breaking things. Why this happened to us Different JARs (hadoop-commons for LocalFileSystem, hadoop-hdfs for DistributedFileSystem) each contain a different file called org.apache.hadoop.fs.FileSystem in their META-INFO/services directory. This file lists the canonical classnames of the filesystem implementations they want to declare (This is called a Service Provider … Read more

How to turn off INFO logging in Spark?

Just execute this command in the spark directory: cp conf/log4j.properties.template conf/log4j.properties Edit log4j.properties: # Set everything to be logged to the console log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO Replace at the first line: log4j.rootCategory=INFO, console by: … Read more

Execute Hive Query with IN clause parameters in parallel

There is no need to read the same data many times in separate queries to achieve better parallelism. Tune proper mapper and reducer parallelism for the same. First of all, enable PPD with vectorizing, use CBO and Tez: SET hive.optimize.ppd=true; SET hive.optimize.ppd.storage=true; SET hive.vectorized.execution.enabled=true; SET hive.vectorized.execution.reduce.enabled = true; SET hive.cbo.enable=true; set hive.stats.autogather=true; set hive.compute.query.using.stats=true; set … Read more

Read whole text files from a compression in Spark

One possible solution is to read data with binaryFiles and extract content manually. Scala: import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.spark.input.PortableDataStream import scala.util.Try import java.nio.charset._ def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try { val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open)) Stream.continually(Option(tar.getNextTarEntry)) // Read until next exntry is null .takeWhile(_.isDefined) // flatten .flatMap(x => x) // … Read more

merge output files after reduce phase

Instead of doing the file merging on your own, you can delegate the entire merging of the reduce output files by calling: hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt Note This combines the HDFS files locally. Make sure you have enough disk space before running

While writing to hdfs path getting error java.io.IOException: Failed to rename

You can do all the selects in one single job, get all the selects and union in a single table. Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> { String query = “select $item as itemCol , avg($item) as mean groupBy year”; return sparkSession.sql(query); }).reduce((a, b) -> a.union(b)).get saveDsToHdfs(hdfsPath, resultDs );

Primary keys with Apache Spark

Scala: If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data: import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType} val df = sc.parallelize(Seq( (“a”, -1.0), (“b”, -2.0), (“c”, -3.0))).toDF(“foo”, “bar”) Extract schema for further usage: val schema = df.schema Add id field: val rows = … Read more