High Performance Python (from Training at EuroPython 2011) by Ian Ozsvald - HTML preview

PLEASE NOTE: This is an HTML preview only and some elements such as links or page numbers may be incorrect.
Download the book in PDF, ePub, Kindle for a complete version.

CHAPTER

TWENTY

 

MULTIPROCESSING

The multiprocessing module lets us send work units out as new Python processes on our local machine (it won’t send jobs over a network). For jobs that require little or no interprocess communication it is ideal.

We need to split our input lists into shorter work lists which can be sent to the new processes, we’ll then need to combine the results back into a single output list.

We have to split our q and z lists into shorter chunks, we’ll make one sub-list per CPU. On my MacBook I have two cores so we’ll split the 250,000 items into two 125,000 item lists. If you only have one CPU you can hard-code nbr_chunks to e.g. 2 or 4 to see the effect.

In the code below we use a list comprehension to make sub-lists for q and z, the initial if test handles cases where the number of work chunks would leave a remainder of work (e.g. with 100 items and nbr_chunks  =  3 we’d have 33 items of work with one left over without the if handler).

# split work list into continguous chunks, one per CPU

# build this into chunks which we’ll apply to map_async

nbr_chunks=multiprocessing.cpu_count() # or hard-code e.g. 4

chunk_size=len(q)/nbr_chunks

 

# split our long work list into smaller chunks

# make sure we handle the edge case where nbr_chunks doesn’t evenly fit into len(q)

importmath

if len(q)%nbr_chunks!=0:

    # make sure we get the last few items of data when we have

    # an odd size to chunks (e.g. len(q) == 100 and nbr_chunks == 3

    nbr_chunks+=1

chunks=[(q[x * chunk_size:(x+1) * chunk_size],maxiter,z[x* chunk_size:(x+1) * chunk_size]) \

for x in xrange(nbr_chunks)]

print chunk_size,len(chunks),len(chunks[0][0])

Before setting up sub-processes we should verify that the chunks of work still produce the expected output.  We’ll iterate over each chunk in sequence, run the calculate_z calculation and then join the returned result with the growing output list.  This lets us confirm that the numerical progression occurs exactly as before (if it doesn’t - there’s a bug in your code!).  This is a useful sanity check before the possible complications of race conditions and ordering come to play with multi-processing code.

You could try to run the chunks in reverse (and join the output list in reverse too!) to confirm that there aren’t any order-dependent bugs in the code.

# just use this to verify the chunking code, we’ll replace it in a moment

output=[]

for chunk in chunks:

    res=calculate_z_serial_purepython(chunk)

    output+=res

Now we’ll run the same calculations in parallel (so the execution time will roughly halve on my dual-core). First we create a p  =  multiprocessing.Pool of Python processes (by default we have as many items in the Pool as we have CPUs). Next we use p.map_async to send out copies of our function and a tuple of input arguments.

Remember that we have to receive a tuple of input arguments in calculate_z (shown in the example below) so we have to unpack them first.

Finally we ask for po.get() which is a blocking operation - we get a list of results for that chunk when the operation has completed. We then join these sub-lists with output to get our full output list as before.

importmultiprocessing

...

def calculate_z_serial_purepython(chunk): # NOTE we receive a tuple of input arguments

    q, maxiter, z=chunk

    ...

...

# use this to run the chunks in parallel

# create a Pool which will create Python processes

p=multiprocessing.Pool()

start_time=datetime.datetime.now()

# send out the work chunks to the Pool

# po is a multiprocessing.pool.MapResult

po=p.map_async(calculate_z_serial_purepython, chunks)

# we get a list of lists back, one per chunk, so we have to

# flatten them back together

# po.get() will block until results are ready and then

# return a list of lists of results

results=po.get() # [[ints...], [ints...], []]

output=[]

for res in results:

    output+=res

end_time=datetime.datetime.now()

Note that we may not achieve a 2* speed-up on a dual core CPU as there will be an overhead in the first (serial) process when creating the work chunks and then a second overhead when the input data is sent to the new process, then the result has to be sent back. The sending of data involves a pickle operation which adds extra overhead. On our 8MB problem we can see a small slowdown.

If you refer back to the speed timings at the start of the report you’ll see that we don’t achieve a doubling of speed, indeed the ParallelPython example (next) runs faster. This is to do with how the multiprocessing module safely prepares the remote execution environment, it does reduce the speed-up you can achieve if your jobs are short-lived.