MPI Reading from a text file

Text isn’t a great format for parallel processing exactly because you don’t know ahead of time where (say) line 25001 begins. So these sorts of problems are often dealt with ahead of time through some preprocessing step, either building an index or partitioning the file into the appropriate number of chunks for each process to read.

If you really want to do it through MPI, I’d suggest using MPI-IO to read in overlapping chunks of the text file onto the various processors, where the overlap is much longer than you expect your longest line to be, and then have each processor agree on where to start; eg, you could say that the first (or last) new line in the overlap region shared by processes N and N+1 is where process N leaves off and N+1 starts.

To follow this up with some code,

#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
    
void parprocess(MPI_File *in, MPI_File *out, const int rank, const int size, const int overlap) {
    MPI_Offset globalstart;
    int mysize;
    char *chunk;
    
    /* read in relevant chunk of file into "chunk",
     * which starts at location in the file globalstart
     * and has size mysize 
     */
    {
        MPI_Offset globalend;
        MPI_Offset filesize;
    
        /* figure out who reads what */
        MPI_File_get_size(*in, &filesize);
        filesize--;  /* get rid of text file eof */
        mysize = filesize/size;
        globalstart = rank * mysize;
        globalend   = globalstart + mysize - 1;
        if (rank == size-1) globalend = filesize-1;
    
        /* add overlap to the end of everyone's chunk except last proc... */
        if (rank != size-1)
            globalend += overlap;
    
        mysize =  globalend - globalstart + 1;
    
        /* allocate memory */
        chunk = malloc( (mysize + 1)*sizeof(char));
    
        /* everyone reads in their part */
        MPI_File_read_at_all(*in, globalstart, chunk, mysize, MPI_CHAR, MPI_STATUS_IGNORE);
        chunk[mysize] = '\0';
    }
    
    
    /*
     * everyone calculate what their start and end *really* are by going 
     * from the first newline after start to the first newline after the
     * overlap region starts (eg, after end - overlap + 1)
     */
    
    int locstart=0, locend=mysize-1;
    if (rank != 0) {
        while(chunk[locstart] != '\n') locstart++;
        locstart++;
    }
    if (rank != size-1) {
        locend-=overlap;
        while(chunk[locend] != '\n') locend++;
    }
    mysize = locend-locstart+1;
    
    /* "Process" our chunk by replacing non-space characters with '1' for
     * rank 1, '2' for rank 2, etc... 
     */
    
    for (int i=locstart; i<=locend; i++) {
        char c = chunk[i];
        chunk[i] = ( isspace(c) ? c : '1' + (char)rank );
    }

    
    /* output the processed file */
    
    MPI_File_write_at_all(*out, (MPI_Offset)(globalstart+(MPI_Offset)locstart), &(chunk[locstart]), mysize, MPI_CHAR, MPI_STATUS_IGNORE);
    
    return;
}
    
int main(int argc, char **argv) {
    
    MPI_File in, out;
    int rank, size;
    int ierr;
    const int overlap = 100;
    
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    
    if (argc != 3) {
        if (rank == 0) fprintf(stderr, "Usage: %s infilename outfilename\n", argv[0]);
        MPI_Finalize();
        exit(1);
    }
    
    ierr = MPI_File_open(MPI_COMM_WORLD, argv[1], MPI_MODE_RDONLY, MPI_INFO_NULL, &in);
    if (ierr) {
        if (rank == 0) fprintf(stderr, "%s: Couldn't open file %s\n", argv[0], argv[1]);
        MPI_Finalize();
        exit(2);
    }
    
    ierr = MPI_File_open(MPI_COMM_WORLD, argv[2], MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &out);
    if (ierr) {
        if (rank == 0) fprintf(stderr, "%s: Couldn't open output file %s\n", argv[0], argv[2]);
        MPI_Finalize();
        exit(3);
    }
    
    parprocess(&in, &out, rank, size, overlap);
    
    MPI_File_close(&in);
    MPI_File_close(&out);
    
    MPI_Finalize();
    return 0;
}

Running this on a narrow version of the text of the question, we get

$ mpirun -n 3 ./textio foo.in foo.out
$ paste foo.in foo.out
Hi guys I am learning to            11 1111 1 11 11111111 11
program in MPI and I came           1111111 11 111 111 1 1111
across this question. Lets          111111 1111 111111111 1111
say I have a .txt file with         111 1 1111 1 1111 1111 1111
100,000 rows/lines, how do          1111111 11111111111 111 11
I chunk them for processing         1 11111 1111 111 1111111111
by 4 processors? i.e. I want        22 2 22222222222 2222 2 2222
to let processor 0 take care        22 222 222222222 2 2222 2222
of the processing for lines         22 222 2222222222 222 22222
0-25000, processor 1 to take        22222222 222222222 2 22 2222
care of 25001-50000 and so          2222 22 22222222222 222 22
on. I did some searching and        333 3 333 3333 333333333 333
did came across MPI_File_seek       333 3333 333333 3333333333333
but I am not sure can it work       333 3 33 333 3333 333 33 3333
on .txt and supports fscanf         33 3333 333 33333333 333333
afterwards.                         33333333333

Leave a Comment