Framework for easy parallelized processing (pwkit.parallel)

A framework making it easy to write functions that can perform computations in parallel.

Use this framework if you are writing a function that you would like to perform some of its work in parallel, using multiple CPUs at once. First, you must design the parallel part of the function’s operation to be implementable in terms of the standard library map() function. Then, give your function an optional parallel=True keyword argument and use the make_parallel_helper() function from this module like so:

from pwkit.parallel import make_parallel_helper

def my_parallelizable_function(arg1, arg1, parallel=True):
    # Get a "parallel helper" object that can provide us with a parallelized
    # "map" function. The caller specifies how the parallelization is done;
    # we don't have to know the details.
    phelp = make_parallel_helper(parallel)
    ...

    # When used as a context manager, the helper provides a function that
    # acts like the standard library function "map", except it may
    # parallelize its operation.
    with phelp.get_map() as map:
       results1 = map(my_subfunc1, subargs1)
       ...
       results2 = map(my_subfunc2, subargs2)

    ... do stuff with results1 and results2 ...

Passing parallel=True to a function defined this way will cause it to parallelize map calls across all cores. Passing parallel=0.5 will cause it to use about half your machine. Passing parallel=False will cause it to use serial processing. The helper must be used as a context manager (via the with statement) because the parallel computation may involve creating and destroying heavyweight resources (namely, child processes).

Along with standard ParallelHelper.get_map(), ParallelHelper instances support a “partially-Pickling” map-like function ParallelHelper.get_ppmap() that works around Pickle-related limitations in the multiprocessing library.

Main Interface

The most important parts of this module are the make_parallel_helper() function and the interface defined by the abstract ParallelHelper class.

make_parallel_helper(parallel_arg, **kwargs)

Return a ParallelHelper object that can be used for easy parallelization of computations.

ParallelHelper()

Object that helps genericize the setup needed for parallel computations.

pwkit.parallel.make_parallel_helper(parallel_arg, **kwargs)[source]

Return a ParallelHelper object that can be used for easy parallelization of computations. parallel_arg is an object that lets the caller easily specify the kind of parallelization they are interested in. Allowed values are:

False

Serial processing only.

True

Parallel processing using all available cores.

1

Equivalent to False.

Other positive integer

Parallel processing using the specified number of cores.

x, 0 < x < 1

Parallel processing using about x * N cores, where N is the total number of cores in the system. Note that the meanings of 0.99 and 1 as arguments are very different.

ParallelHelper instance

Returns the instance.

The **kwargs are passed on to the appropriate ParallelHelper constructor, if the caller wants to do something tricky.

Expected usage is:

from pwkit.parallel import make_parallel_helper

def sub_operation(arg):
    ... do some computation ...
    return result

def my_parallelizable_function(arg1, arg2, parallel=True):
    phelp = make_parallel_helper(parallel)

    with phelp.get_map() as map:
        op_results = map(sub_operation, args)

    ... reduce "op_results" in some way ...
    return final_result

This means that my_parallelizable_function doesn’t have to worry about all of the various fancy things the caller might want to do in terms of special parallel magic.

Note that sub_operation above must be defined in a stand-alone fashion because of the way Python’s multiprocessing module works. This can be worked around somewhat with the special ParallelHelper.get_ppmap() variant. This returns a “partially-Pickling” map operation — with a different calling signature — that allows un-Pickle-able values to be used. See the documentation for serial_ppmap() for usage information.

class pwkit.parallel.ParallelHelper[source]

Object that helps genericize the setup needed for parallel computations. Each method returns a context manager that wraps up any resource allocation and deallocation that may need to occur to make the parallelization happen under the hood.

ParallelHelper objects should be obtained by calling make_parallel_helper(), not direct construction, unless you have special needs. See the documentation of that function for an example of the general usage pattern.

Once you have a ParallelHelper instance, usage should be something like:

with phelp.get_map() as map:
    results_arr = map(my_function, my_args)

The partially-Pickling map works around a limitation in the multiprocessing library. This library spawns subprocesses and executes parallel tasks by sending them to the subprocesses, which means that the data describing the task must be pickle-able. There are hacks so that you can pass functions defined in the global namespace but they’re pretty much useless in production code. The “partially-Pickling map” works around this by using a different method that allows some arguments to the map operation to avoid being pickled. (Instead, they are directly inherited by os.fork()-ed subprocesses.) See the docs for serial_ppmap() for usage information.

get_map()[source]

Get a context manager that yields a function with the same call signature as the standard library function map(). Its results are the same, but it may evaluate the mapped function in parallel across multiple threads or processes — the calling function should not have to particularly care about the details. Example usage is:

with phelp.get_map() as map:
    results_arr = map(my_function, my_args)

The passed function and its arguments must be Pickle-able. The alternate method get_ppmap() relaxes this restriction somewhat.

get_ppmap()[source]

Get a context manager that yields a “partially-pickling map function”. It can be used to perform a parallelized map() operation with some un-pickle-able arguments.

The yielded function has the signature of serial_ppmap(). Its behavior is functionally equivalent to the following code, except that the calls to func may happen in parallel:

def ppmap(func, fixed_arg, var_arg_iter):
    return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]

The arguments to the ppmap function are:

func

A callable taking three arguments and returning a Pickle-able value.

fixed_arg

Any value, even one that is not pickle-able.

var_arg_iter

An iterable that generates Pickle-able values.

The arguments to your func function, which actually does the interesting computations, are:

index

The 0-based index number of the item being processed; often this can be ignored.

fixed_arg

The same fixed_arg that was passed to ppmap.

var_arg

The index’th item in the var_arg_iter iterable passed to ppmap.

This variant of the standard map() function exists to allow the parallel-processing system to work around pickle-related limitations in the multiprocessing library.

Implementation Details

Some of these classes and functions may be useful for other modules, but in generally you need only concern yourself with the make_parallel_helper() function and ParallelHelper base class.

SerialHelper([chunksize])

A ParallelHelper that actually does serial processing.

serial_ppmap(func, fixed_arg, var_arg_iter)

A serial implementation of the "partially-pickling map" function returned by the ParallelHelper.get_ppmap() interface.

MultiprocessingPoolHelper([chunksize])

A ParallelHelper that parallelizes computations using Python's multiprocessing.Pool with a configurable number of processes.

multiprocessing_ppmap_worker(in_queue, ...)

Worker for the multiprocessing ppmap implementation.

InterruptiblePool([processes, initializer, ...])

A modified version of multiprocessing.pool.Pool that has better behavior with regard to KeyboardInterrupts in the map method.

VacuousContextManager(value)

A context manager that just returns a static value and doesn't do anything clever with exceptions.

class pwkit.parallel.SerialHelper(chunksize=None)[source]

A ParallelHelper that actually does serial processing.

pwkit.parallel.serial_ppmap(func, fixed_arg, var_arg_iter)[source]

A serial implementation of the “partially-pickling map” function returned by the ParallelHelper.get_ppmap() interface. Its arguments are:

func

A callable taking three arguments and returning a Pickle-able value.

fixed_arg

Any value, even one that is not pickle-able.

var_arg_iter

An iterable that generates Pickle-able values.

The functionality is:

def serial_ppmap(func, fixed_arg, var_arg_iter):
    return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]

Therefore the arguments to your func function, which actually does the interesting computations, are:

index

The 0-based index number of the item being processed; often this can be ignored.

fixed_arg

The same fixed_arg that was passed to ppmap.

var_arg

The index’th item in the var_arg_iter iterable passed to ppmap.

class pwkit.parallel.MultiprocessingPoolHelper(chunksize=None, **pool_kwargs)[source]

A ParallelHelper that parallelizes computations using Python’s multiprocessing.Pool with a configurable number of processes. Actually, we use a wrapped version of multiprocessing.Pool that handles KeyboardInterrupt exceptions more helpfully.

pwkit.parallel.multiprocessing_ppmap_worker(in_queue, out_queue, func, fixed_arg)[source]

Worker for the multiprocessing ppmap implementation. Strongly derived from code posted on StackExchange by “klaus se”: http://stackoverflow.com/a/16071616/3760486.

class pwkit.parallel.InterruptiblePool(processes=None, initializer=None, initargs=(), **kwargs)[source]

A modified version of multiprocessing.pool.Pool that has better behavior with regard to KeyboardInterrupts in the map method. Parameters:

processes

The number of worker processes to use; defaults to the number of CPUs.

initializer

Either None, or a callable that will be invoked by each worker process when it starts.

initargs

Arguments for initializer.

kwargs

Extra arguments. Python 2.7 supports a maxtasksperchild parameter.

Python’s multiprocessing.Pool class doesn’t interact well with KeyboardInterrupt signals, as documented in places such as:

Various workarounds have been shared. Here, we adapt the one proposed in the last link above, by John Reese, and shared as

This version is a drop-in replacement for multiprocessing.Pool … as long as the map() method is the only one that needs to be interrupt-friendly.

class pwkit.parallel.VacuousContextManager(value)[source]

A context manager that just returns a static value and doesn’t do anything clever with exceptions.