Framework for easy parallelized processing (pwkit.parallel
)¶
A framework making it easy to write functions that can perform computations in parallel.
Use this framework if you are writing a function that you would like to
perform some of its work in parallel, using multiple CPUs at once. First, you
must design the parallel part of the function’s operation to be implementable
in terms of the standard library map()
function. Then, give your
function an optional parallel=True
keyword argument and use the
make_parallel_helper()
function from this module like so:
from pwkit.parallel import make_parallel_helper
def my_parallelizable_function(arg1, arg1, parallel=True):
# Get a "parallel helper" object that can provide us with a parallelized
# "map" function. The caller specifies how the parallelization is done;
# we don't have to know the details.
phelp = make_parallel_helper(parallel)
...
# When used as a context manager, the helper provides a function that
# acts like the standard library function "map", except it may
# parallelize its operation.
with phelp.get_map() as map:
results1 = map(my_subfunc1, subargs1)
...
results2 = map(my_subfunc2, subargs2)
... do stuff with results1 and results2 ...
Passing parallel=True
to a function defined this way will cause it to
parallelize map
calls across all cores. Passing parallel=0.5
will
cause it to use about half your machine. Passing parallel=False
will cause
it to use serial processing. The helper must be used as a context manager (via
the with
statement) because the parallel computation may involve creating
and destroying heavyweight resources (namely, child processes).
Along with standard ParallelHelper.get_map()
, ParallelHelper
instances support a “partially-Pickling” map-like function
ParallelHelper.get_ppmap()
that works around Pickle-related limitations
in the multiprocessing
library.
Main Interface¶
The most important parts of this module are the make_parallel_helper()
function and the interface defined by the abstract ParallelHelper
class.
|
Return a |
Object that helps genericize the setup needed for parallel computations. |
- pwkit.parallel.make_parallel_helper(parallel_arg, **kwargs)[source]¶
Return a
ParallelHelper
object that can be used for easy parallelization of computations. parallel_arg is an object that lets the caller easily specify the kind of parallelization they are interested in. Allowed values are:- False
Serial processing only.
- True
Parallel processing using all available cores.
- 1
Equivalent to
False
.- Other positive integer
Parallel processing using the specified number of cores.
- x, 0 < x < 1
Parallel processing using about
x * N
cores, where N is the total number of cores in the system. Note that the meanings of0.99
and1
as arguments are very different.ParallelHelper
instanceReturns the instance.
The
**kwargs
are passed on to the appropriateParallelHelper
constructor, if the caller wants to do something tricky.Expected usage is:
from pwkit.parallel import make_parallel_helper def sub_operation(arg): ... do some computation ... return result def my_parallelizable_function(arg1, arg2, parallel=True): phelp = make_parallel_helper(parallel) with phelp.get_map() as map: op_results = map(sub_operation, args) ... reduce "op_results" in some way ... return final_result
This means that
my_parallelizable_function
doesn’t have to worry about all of the various fancy things the caller might want to do in terms of special parallel magic.Note that
sub_operation
above must be defined in a stand-alone fashion because of the way Python’smultiprocessing
module works. This can be worked around somewhat with the specialParallelHelper.get_ppmap()
variant. This returns a “partially-Pickling” map operation — with a different calling signature — that allows un-Pickle-able values to be used. See the documentation forserial_ppmap()
for usage information.
- class pwkit.parallel.ParallelHelper[source]¶
Object that helps genericize the setup needed for parallel computations. Each method returns a context manager that wraps up any resource allocation and deallocation that may need to occur to make the parallelization happen under the hood.
ParallelHelper
objects should be obtained by callingmake_parallel_helper()
, not direct construction, unless you have special needs. See the documentation of that function for an example of the general usage pattern.Once you have a
ParallelHelper
instance, usage should be something like:with phelp.get_map() as map: results_arr = map(my_function, my_args)
The partially-Pickling map works around a limitation in the multiprocessing library. This library spawns subprocesses and executes parallel tasks by sending them to the subprocesses, which means that the data describing the task must be pickle-able. There are hacks so that you can pass functions defined in the global namespace but they’re pretty much useless in production code. The “partially-Pickling map” works around this by using a different method that allows some arguments to the map operation to avoid being pickled. (Instead, they are directly inherited by
os.fork()
-ed subprocesses.) See the docs forserial_ppmap()
for usage information.- get_map()[source]¶
Get a context manager that yields a function with the same call signature as the standard library function
map()
. Its results are the same, but it may evaluate the mapped function in parallel across multiple threads or processes — the calling function should not have to particularly care about the details. Example usage is:with phelp.get_map() as map: results_arr = map(my_function, my_args)
The passed function and its arguments must be Pickle-able. The alternate method
get_ppmap()
relaxes this restriction somewhat.
- get_ppmap()[source]¶
Get a context manager that yields a “partially-pickling map function”. It can be used to perform a parallelized
map()
operation with some un-pickle-able arguments.The yielded function has the signature of
serial_ppmap()
. Its behavior is functionally equivalent to the following code, except that the calls tofunc
may happen in parallel:def ppmap(func, fixed_arg, var_arg_iter): return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]
The arguments to the
ppmap
function are:- func
A callable taking three arguments and returning a Pickle-able value.
- fixed_arg
Any value, even one that is not pickle-able.
- var_arg_iter
An iterable that generates Pickle-able values.
The arguments to your
func
function, which actually does the interesting computations, are:- index
The 0-based index number of the item being processed; often this can be ignored.
- fixed_arg
The same fixed_arg that was passed to
ppmap
.- var_arg
The index’th item in the var_arg_iter iterable passed to
ppmap
.
This variant of the standard
map()
function exists to allow the parallel-processing system to work aroundpickle
-related limitations in themultiprocessing
library.
Implementation Details¶
Some of these classes and functions may be useful for other modules, but in
generally you need only concern yourself with the make_parallel_helper()
function and ParallelHelper
base class.
|
A |
|
A serial implementation of the "partially-pickling map" function returned by the |
|
A |
|
Worker for the |
|
A modified version of multiprocessing.pool.Pool that has better behavior with regard to KeyboardInterrupts in the map method. |
|
A context manager that just returns a static value and doesn't do anything clever with exceptions. |
- class pwkit.parallel.SerialHelper(chunksize=None)[source]¶
A
ParallelHelper
that actually does serial processing.
- pwkit.parallel.serial_ppmap(func, fixed_arg, var_arg_iter)[source]¶
A serial implementation of the “partially-pickling map” function returned by the
ParallelHelper.get_ppmap()
interface. Its arguments are:- func
A callable taking three arguments and returning a Pickle-able value.
- fixed_arg
Any value, even one that is not pickle-able.
- var_arg_iter
An iterable that generates Pickle-able values.
The functionality is:
def serial_ppmap(func, fixed_arg, var_arg_iter): return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]
Therefore the arguments to your
func
function, which actually does the interesting computations, are:- index
The 0-based index number of the item being processed; often this can be ignored.
- fixed_arg
The same fixed_arg that was passed to
ppmap
.- var_arg
The index’th item in the var_arg_iter iterable passed to
ppmap
.
- class pwkit.parallel.MultiprocessingPoolHelper(chunksize=None, **pool_kwargs)[source]¶
A
ParallelHelper
that parallelizes computations using Python’smultiprocessing.Pool
with a configurable number of processes. Actually, we use a wrapped version ofmultiprocessing.Pool
that handlesKeyboardInterrupt
exceptions more helpfully.
- pwkit.parallel.multiprocessing_ppmap_worker(in_queue, out_queue, func, fixed_arg)[source]¶
Worker for the
multiprocessing
ppmap implementation. Strongly derived from code posted on StackExchange by “klaus se”: http://stackoverflow.com/a/16071616/3760486.
- class pwkit.parallel.InterruptiblePool(processes=None, initializer=None, initargs=(), **kwargs)[source]¶
A modified version of multiprocessing.pool.Pool that has better behavior with regard to KeyboardInterrupts in the map method. Parameters:
- processes
The number of worker processes to use; defaults to the number of CPUs.
- initializer
Either None, or a callable that will be invoked by each worker process when it starts.
- initargs
Arguments for initializer.
- kwargs
Extra arguments. Python 2.7 supports a maxtasksperchild parameter.
Python’s multiprocessing.Pool class doesn’t interact well with KeyboardInterrupt signals, as documented in places such as:
Various workarounds have been shared. Here, we adapt the one proposed in the last link above, by John Reese, and shared as
This version is a drop-in replacement for multiprocessing.Pool … as long as the map() method is the only one that needs to be interrupt-friendly.