How does Hadoop process records split across block boundaries?

Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits, so a look at FileInputFormat gives the following info:

  • For each input file, get the file length, the block size and calculate the split size as max(minSize, min(maxSize, blockSize)) where maxSize corresponds to mapred.max.split.size and minSize is mapred.min.split.size.
  • Divide the file into different FileSplits based on the split size calculated above. What’s important here is that each FileSplit is initialized with a start parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

After that, if you look at the LineRecordReader which is defined by the TextInputFormat, that’s where the lines are handled:

  • When you initialize your LineRecordReader it tries to instantiate a LineReader which is an abstraction to be able to read lines over FSDataInputStream. There are 2 cases:
  • If there is a CompressionCodec defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.
  • If there is no codec however, that’s where things are interesting: if the start of your InputSplit is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

So since the splits are calculated in the client, the mappers don’t need to run in sequence, every mapper already knows if it neds to discard the first line or not.

So basically if you have 2 lines of each 100Mb in the same file, and to simplify let’s say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:

  • Split 1 containing the path and the hosts to this block. Initialized at start 200-200=0Mb, length 64Mb.
  • Split 2 initialized at start 200-200+64=64Mb, length 64Mb.
  • Split 3 initialized at start 200-200+128=128Mb, length 64Mb.
  • Split 4 initialized at start 200-200+192=192Mb, length 8Mb.
  • Mapper A will process split 1, start is 0 so don’t skip first line, and read a full line which goes beyond the 64Mb limit so needs remote read.
  • Mapper B will process split 2, start is != 0 so skip the first line after 64Mb-1byte, which corresponds to the end of line 1 at 100Mb which is still in split 2, we have 28Mb of the line in split 2, so remote read the remaining 72Mb.
  • Mapper C will process split 3, start is != 0 so skip the first line after 128Mb-1byte, which corresponds to the end of line 2 at 200Mb, which is end of file so don’t do anything.
  • Mapper D is the same as mapper C except it looks for a newline after 192Mb-1byte.

Leave a Comment