Parallel processing of a large .csv file in Python

This might be too late, but just for future users I’ll post anyway. Another poster mentioned using multiprocessing. I can vouch for it and can go into more detail. We deal with files in the hundreds of MB/several GB every day using Python. So it’s definitely up to the task. Some of files we deal with aren’t CSVs, so the parsing can be fairly complex and take longer than the disk access. However, the methodology is the same no matter what file type.

You can process pieces of the large files concurrently. Here’s pseudo code of how we do it:

import os, multiprocessing as mp

# process file function
def processfile(filename, start=0, stop=0):
    if start == 0 and stop == 0:
        ... process entire file...
    else:
        with open(file, 'r') as fh:
            fh.seek(start)
            lines = fh.readlines(stop - start)
            ... process these lines ...

    return results

if __name__ == "__main__":

    # get file size and set chuck size
    filesize = os.path.getsize(filename)
    split_size = 100*1024*1024

    # determine if it needs to be split
    if filesize > split_size:

        # create pool, initialize chunk start location (cursor)
        pool = mp.Pool(cpu_count)
        cursor = 0
        results = []
        with open(file, 'r') as fh:

            # for every chunk in the file...
            for chunk in xrange(filesize // split_size):

                # determine where the chunk ends, is it the last one?
                if cursor + split_size > filesize:
                    end = filesize
                else:
                    end = cursor + split_size

                # seek to end of chunk and read next line to ensure you 
                # pass entire lines to the processfile function
                fh.seek(end)
                fh.readline()

                # get current file location
                end = fh.tell()

                # add chunk to process pool, save reference to get results
                proc = pool.apply_async(processfile, args=[filename, cursor, end])
                results.append(proc)

                # setup next chunk
                cursor = end

        # close and wait for pool to finish
        pool.close()
        pool.join()

        # iterate through results
        for proc in results:
            processfile_result = proc.get()

    else:
        ...process normally...

Like I said, that’s only pseudo code. It should get anyone started who needs to do something similar. I don’t have the code in front of me, just doing it from memory.

But we got more than a 2x speed up from this on the first run without fine tuning it. You can fine tune the number of processes in the pool and how large the chunks are to get an even higher speed up depending on your setup. If you have multiple files as we do, create a pool to read several files in parallel. Just be careful no to overload the box with too many processes.

Note: You need to put it inside an “if main” block to ensure infinite processes aren’t created.

Leave a Comment