Reading file as single record in hadoop

Take a look into this input format.This is an input format for reading multiple files in a single map task.Exactly one (unsplit) file will be read by each record passed to the mapper. The WholeFileRecordReader takes care of sending one file content as one value. The key returned is NullWritable and value is the content of each file as whole. Now you can use this and run your mapreduce job and see how many mappers actually run and check the output your are getting is correct or not.

Records are constructed from WholeFileRecordReaders.

    public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{

        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }

/**
   * Creates a CombineFileRecordReader to read each file assigned to this InputSplit.
   * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore
   * is expected to specify multiple files.
   *
   * @param split The InputSplit to read.  Throws an IllegalArgumentException if this is
   *        not a CombineFileSplit.
   * @param context The context for this task.
   * @return a CombineFileRecordReader to process each file in split.
   *         It will read each file with a WholeFileRecordReader.
   * @throws IOException if there is an error.
   */

    @Override
    public RecordReader<NullWritable, Text> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException {

        if (!(split instanceof CombineFileSplit)) {
              throw new IllegalArgumentException("split must be a CombineFileSplit");
            }
            return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
    }

    }

Here above you can WholeFileRecordReader is used which is as follows :-

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);

      /** The path to the file to read. */
      private final Path mFileToRead;
      /** The length of this file. */
      private final long mFileLength;

      /** The Configuration. */
      private final Configuration mConf;

      /** Whether this FileSplit has been processed. */
      private boolean mProcessed;
      /** Single Text to store the file name of the current file. */
    //  private final Text mFileName;
      /** Single Text to store the value of this file (the value) when it is read. */
      private final Text mFileText;

      /**
       * Implementation detail: This constructor is built to be called via
       * reflection from within CombineFileRecordReader.
       *
       * @param fileSplit The CombineFileSplit that this will read from.
       * @param context The context for this task.
       * @param pathToProcess The path index from the CombineFileSplit to process in this record.
       */
      public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
          Integer pathToProcess) {
        mProcessed = false;
        mFileToRead = fileSplit.getPath(pathToProcess);
        mFileLength = fileSplit.getLength(pathToProcess);
        mConf = context.getConfiguration();

        assert 0 == fileSplit.getOffset(pathToProcess);
        if (LOG.isDebugEnabled()) {
          LOG.debug("FileToRead is: " + mFileToRead.toString());
          LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());

          try {
            FileSystem fs = FileSystem.get(mConf);
            assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
          } catch (IOException ioe) {
            // oh well, I was just testing.
          }
        }

    //    mFileName = new Text();
        mFileText = new Text();
      }

      /** {@inheritDoc} */
      @Override
      public void close() throws IOException {
        mFileText.clear();
      }

      /**
       * Returns the absolute path to the current file.
       *
       * @return The absolute path to the current file.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
      }

      /**
       * <p>Returns the current value.  If the file has been read with a call to NextKeyValue(),
       * this returns the contents of the file as a BytesWritable.  Otherwise, it returns an
       * empty BytesWritable.</p>
       *
       * <p>Throws an IllegalStateException if initialize() is not called first.</p>
       *
       * @return A BytesWritable containing the contents of the file to read.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public Text getCurrentValue() throws IOException, InterruptedException {
        return mFileText;
      }

      /**
       * Returns whether the file has been processed or not.  Since only one record
       * will be generated for a file, progress will be 0.0 if it has not been processed,
       * and 1.0 if it has.
       *
       * @return 0.0 if the file has not been processed.  1.0 if it has.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public float getProgress() throws IOException, InterruptedException {
        return (mProcessed) ? (float) 1.0 : (float) 0.0;
      }

      /**
       * All of the internal state is already set on instantiation.  This is a no-op.
       *
       * @param split The InputSplit to read.  Unused.
       * @param context The context for this task.  Unused.
       * @throws IOException never.
       * @throws InterruptedException never.
       */
      @Override
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        // no-op.
      }

      /**
       * <p>If the file has not already been read, this reads it into memory, so that a call
       * to getCurrentValue() will return the entire contents of this file as Text,
       * and getCurrentKey() will return the qualified path to this file as Text.  Then, returns
       * true.  If it has already been read, then returns false without updating any internal state.</p>
       *
       * @return Whether the file was read or not.
       * @throws IOException if there is an error reading the file.
       * @throws InterruptedException if there is an error.
       */
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!mProcessed) {
          if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
          }
          byte[] contents = new byte[(int) mFileLength];

          FileSystem fs = mFileToRead.getFileSystem(mConf);
          FSDataInputStream in = null;
          try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

          } finally {
            IOUtils.closeStream(in);
          }
          mProcessed = true;
          return true;
        }
        return false;
      }

}

The following is your driver code:-

public int run(String[] arg) throws Exception {
    Configuration conf=getConf();
    FileSystem fs = FileSystem.get(conf);
    //estimate reducers
    Job job = new Job(conf);
    job.setJarByClass(WholeFileDriver.class);
    job.setJobName("WholeFile");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(WholeFileInputFormat.class);
    job.setMapperClass(WholeFileMapper.class);
    job.setNumReduceTasks(0);

    FileInputFormat.addInputPath(job, new Path(arg[0]));
    Path output=new Path(arg[1]);
    try {
        fs.delete(output, true);
    } catch (IOException e) {
        LOG.warn("Failed to delete temporary path", e);
    }
    FileOutputFormat.setOutputPath(job, output);

    boolean ret=job.waitForCompletion(true);
    if(!ret){
        throw new Exception("Job Failed");
    }

Leave a Comment