# -*- mode: python; coding: utf-8 -*-
# Copyright 2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.
"""A framework making it easy to write functions that can perform computations
in parallel.
Use this framework if you are writing a function that you would like to
perform some of its work in parallel, using multiple CPUs at once. First, you
must design the parallel part of the function's operation to be implementable
in terms of the standard library :func:`map` function. Then, give your
function an optional ``parallel=True`` keyword argument and use the
:func:`make_parallel_helper` function from this module like so::
from pwkit.parallel import make_parallel_helper
def my_parallelizable_function(arg1, arg1, parallel=True):
# Get a "parallel helper" object that can provide us with a parallelized
# "map" function. The caller specifies how the parallelization is done;
# we don't have to know the details.
phelp = make_parallel_helper(parallel)
...
# When used as a context manager, the helper provides a function that
# acts like the standard library function "map", except it may
# parallelize its operation.
with phelp.get_map() as map:
results1 = map(my_subfunc1, subargs1)
...
results2 = map(my_subfunc2, subargs2)
... do stuff with results1 and results2 ...
Passing ``parallel=True`` to a function defined this way will cause it to
parallelize ``map`` calls across all cores. Passing ``parallel=0.5`` will
cause it to use about half your machine. Passing ``parallel=False`` will cause
it to use serial processing. The helper must be used as a context manager (via
the ``with`` statement) because the parallel computation may involve creating
and destroying heavyweight resources (namely, child processes).
Along with standard :meth:`ParallelHelper.get_map`, :class:`ParallelHelper`
instances support a "partially-Pickling" `map`-like function
:meth:`ParallelHelper.get_ppmap` that works around Pickle-related limitations
in the :mod:`multiprocessing` library.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str('make_parallel_helper').split()
import functools, signal
from multiprocessing.pool import Pool
from multiprocessing import Process, Queue, TimeoutError
import six
from six.moves import range
def _initializer_wrapper(actual_initializer, *rest):
"""We ignore SIGINT. It's up to our parent to kill us in the typical condition
of this arising from ``^C`` on a terminal. If someone is manually killing
us with that signal, well... nothing will happen.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
if actual_initializer is not None:
actual_initializer(*rest)
[docs]class InterruptiblePool(Pool):
"""A modified version of `multiprocessing.pool.Pool` that has better
behavior with regard to KeyboardInterrupts in the `map` method. Parameters:
processes
The number of worker processes to use; defaults to the number of CPUs.
initializer
Either None, or a callable that will be invoked by each worker
process when it starts.
initargs
Arguments for `initializer`.
kwargs
Extra arguments. Python 2.7 supports a `maxtasksperchild` parameter.
Python's multiprocessing.Pool class doesn't interact well with
KeyboardInterrupt signals, as documented in places such as:
- `<http://stackoverflow.com/questions/1408356/>`_
- `<http://stackoverflow.com/questions/11312525/>`_
- `<http://noswap.com/blog/python-multiprocessing-keyboardinterrupt>`_
Various workarounds have been shared. Here, we adapt the one proposed in
the last link above, by John Reese, and shared as
- `<https://github.com/jreese/multiprocessing-keyboardinterrupt/>`_
This version is a drop-in replacement for multiprocessing.Pool ... as long
as the map() method is the only one that needs to be interrupt-friendly.
"""
wait_timeout = 3600
def __init__(self, processes=None, initializer=None, initargs=(), **kwargs):
new_initializer = functools.partial(_initializer_wrapper, initializer)
super(InterruptiblePool, self).__init__(processes, new_initializer,
initargs, **kwargs)
def map(self, func, iterable, chunksize=None):
"""Equivalent of `map` built-in, without swallowing KeyboardInterrupt.
func
The function to apply to the items.
iterable
An iterable of items that will have `func` applied to them.
"""
# The key magic is that we must call r.get() with a timeout, because a
# Condition.wait() without a timeout swallows KeyboardInterrupts.
r = self.map_async(func, iterable, chunksize)
while True:
try:
return r.get(self.wait_timeout)
except TimeoutError:
pass
except KeyboardInterrupt:
self.terminate()
self.join()
raise
# Other exceptions propagate up.
[docs]class ParallelHelper(object):
"""Object that helps genericize the setup needed for parallel computations.
Each method returns a context manager that wraps up any resource
allocation and deallocation that may need to occur to make the
parallelization happen under the hood.
:class:`ParallelHelper` objects should be obtained by calling
:func:`make_parallel_helper`, not direct construction, unless you have
special needs. See the documentation of that function for an example of
the general usage pattern.
Once you have a :class:`ParallelHelper` instance, usage should be
something like::
with phelp.get_map() as map:
results_arr = map(my_function, my_args)
The partially-Pickling map works around a limitation in the
multiprocessing library. This library spawns subprocesses and executes
parallel tasks by sending them to the subprocesses, which means that the
data describing the task must be pickle-able. There are hacks so that you
can pass functions defined in the global namespace but they're pretty much
useless in production code. The "partially-Pickling map" works around this
by using a different method that allows some arguments to the map
operation to avoid being pickled. (Instead, they are directly inherited by
:func:`os.fork`-ed subprocesses.) See the docs for :func:`serial_ppmap` for
usage information.
"""
[docs] def get_map(self):
"""Get a *context manager* that yields a function with the same call signature
as the standard library function :func:`map`. Its results are the
same, but it may evaluate the mapped function in parallel across
multiple threads or processes --- the calling function should not have
to particularly care about the details. Example usage is::
with phelp.get_map() as map:
results_arr = map(my_function, my_args)
The passed function and its arguments must be Pickle-able. The alternate
method :meth:`get_ppmap` relaxes this restriction somewhat.
"""
raise NotImplementedError('get_map() not available')
[docs] def get_ppmap(self):
"""Get a *context manager* that yields a "partially-pickling map function". It
can be used to perform a parallelized :func:`map` operation with some
un-pickle-able arguments.
The yielded function has the signature of :func:`serial_ppmap`. Its
behavior is functionally equivalent to the following code, except that
the calls to ``func`` may happen in parallel::
def ppmap(func, fixed_arg, var_arg_iter):
return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]
The arguments to the ``ppmap`` function are:
*func*
A callable taking three arguments and returning a Pickle-able value.
*fixed_arg*
Any value, even one that is not pickle-able.
*var_arg_iter*
An iterable that generates Pickle-able values.
The arguments to your ``func`` function, which actually does the
interesting computations, are:
*index*
The 0-based index number of the item being processed; often this can
be ignored.
*fixed_arg*
The same *fixed_arg* that was passed to ``ppmap``.
*var_arg*
The *index*'th item in the *var_arg_iter* iterable passed to
``ppmap``.
This variant of the standard :func:`map` function exists to allow the
parallel-processing system to work around :mod:`pickle`-related
limitations in the :mod:`multiprocessing` library.
"""
raise NotImplementedError('get_ppmap() not available')
[docs]class VacuousContextManager(object):
"""A context manager that just returns a static value and doesn't do anything
clever with exceptions.
"""
def __init__(self, value):
self.value = value
def __enter__(self):
return self.value
def __exit__(self, etype, evalue, etb):
return False
[docs]def serial_ppmap(func, fixed_arg, var_arg_iter):
"""A serial implementation of the "partially-pickling map" function returned
by the :meth:`ParallelHelper.get_ppmap` interface. Its arguments are:
*func*
A callable taking three arguments and returning a Pickle-able value.
*fixed_arg*
Any value, even one that is not pickle-able.
*var_arg_iter*
An iterable that generates Pickle-able values.
The functionality is::
def serial_ppmap(func, fixed_arg, var_arg_iter):
return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]
Therefore the arguments to your ``func`` function, which actually does the
interesting computations, are:
*index*
The 0-based index number of the item being processed; often this can
be ignored.
*fixed_arg*
The same *fixed_arg* that was passed to ``ppmap``.
*var_arg*
The *index*'th item in the *var_arg_iter* iterable passed to
``ppmap``.
"""
return [func(i, fixed_arg, x) for i, x in enumerate(var_arg_iter)]
[docs]class SerialHelper(ParallelHelper):
"""A :class:`ParallelHelper` that actually does serial processing."""
def __init__(self, chunksize=None):
# We accept and discard some of the multiprocessing kwargs that turn
# into noops so that we can present a uniform API.
pass
def get_map(self):
return VacuousContextManager(map)
def get_ppmap(self):
return VacuousContextManager(serial_ppmap)
[docs]def multiprocessing_ppmap_worker(in_queue, out_queue, func, fixed_arg):
"""Worker for the :mod:`multiprocessing` ppmap implementation. Strongly
derived from code posted on StackExchange by "klaus se":
`<http://stackoverflow.com/a/16071616/3760486>`_.
"""
while True:
i, var_arg = in_queue.get()
if i is None:
break
out_queue.put((i, func(i, fixed_arg, var_arg)))
[docs]class MultiprocessingPoolHelper(ParallelHelper):
"""A :class:`ParallelHelper` that parallelizes computations using Python's
:class:`multiprocessing.Pool` with a configurable number of processes.
Actually, we use a wrapped version of :class:`multiprocessing.Pool` that
handles :exc:`KeyboardInterrupt` exceptions more helpfully.
"""
class InterruptiblePoolContextManager(object):
def __init__(self, methodname, methodkwargs={}, **kwargs):
self.methodname = methodname
self.methodkwargs = methodkwargs
self.kwargs = kwargs
def __enter__(self):
from functools import partial
self.pool = InterruptiblePool(**self.kwargs)
func = getattr(self.pool, self.methodname)
return partial(func, **self.methodkwargs)
def __exit__(self, etype, evalue, etb):
self.pool.terminate()
self.pool.join()
return False
def __init__(self, chunksize=None, **pool_kwargs):
self.chunksize = chunksize
self.pool_kwargs = pool_kwargs
def get_map(self):
return self.InterruptiblePoolContextManager('map',
{'chunksize': self.chunksize},
**self.pool_kwargs)
def _ppmap(self, func, fixed_arg, var_arg_iter):
"""The multiprocessing implementation of the partially-Pickling "ppmap"
function. This doesn't use a Pool like map() does, because the whole
problem is that Pool chokes on un-Pickle-able values. Strongly derived
from code posted on StackExchange by "klaus se":
`<http://stackoverflow.com/a/16071616/3760486>`_.
This implementation could definitely be improved -- that's basically
what the Pool class is all about -- but this gets us off the ground
for those cases where the Pickle limitation is important.
XXX This deadlocks if a child process crashes!!! XXX
"""
n_procs = self.pool_kwargs.get('processes')
if n_procs is None:
# Logic copied from multiprocessing.pool.Pool.__init__()
try:
from multiprocessing import cpu_count
n_procs = cpu_count()
except NotImplementedError:
n_procs = 1
in_queue = Queue(1)
out_queue = Queue()
procs = [Process(target=multiprocessing_ppmap_worker,
args=(in_queue, out_queue, func, fixed_arg))
for _ in range(n_procs)]
for p in procs:
p.daemon = True
p.start()
i = -1
for i, var_arg in enumerate(var_arg_iter):
in_queue.put((i, var_arg))
n_items = i + 1
result = [None] * n_items
for p in procs:
in_queue.put((None, None))
for _ in range(n_items):
i, value = out_queue.get()
result[i] = value
for p in procs:
p.join()
return result
def get_ppmap(self):
return VacuousContextManager(self._ppmap)
[docs]def make_parallel_helper(parallel_arg, **kwargs):
"""Return a :class:`ParallelHelper` object that can be used for easy
parallelization of computations. *parallel_arg* is an object that lets the
caller easily specify the kind of parallelization they are interested in.
Allowed values are:
False
Serial processing only.
True
Parallel processing using all available cores.
1
Equivalent to ``False``.
Other positive integer
Parallel processing using the specified number of cores.
x, 0 < x < 1
Parallel processing using about ``x * N`` cores, where N is the total
number of cores in the system. Note that the meanings of ``0.99`` and ``1``
as arguments are very different.
:class:`ParallelHelper` instance
Returns the instance.
The ``**kwargs`` are passed on to the appropriate :class:`ParallelHelper`
constructor, if the caller wants to do something tricky.
Expected usage is::
from pwkit.parallel import make_parallel_helper
def sub_operation(arg):
... do some computation ...
return result
def my_parallelizable_function(arg1, arg2, parallel=True):
phelp = make_parallel_helper(parallel)
with phelp.get_map() as map:
op_results = map(sub_operation, args)
... reduce "op_results" in some way ...
return final_result
This means that ``my_parallelizable_function`` doesn't have to worry about
all of the various fancy things the caller might want to do in terms of
special parallel magic.
Note that ``sub_operation`` above must be defined in a stand-alone fashion
because of the way Python's :mod:`multiprocessing` module works. This can
be worked around somewhat with the special
:meth:`ParallelHelper.get_ppmap` variant. This returns a
"partially-Pickling" map operation --- with a different calling signature
--- that allows un-Pickle-able values to be used. See the documentation
for :func:`serial_ppmap` for usage information.
"""
if parallel_arg is True: # note: (True == 1) is True
return MultiprocessingPoolHelper(**kwargs)
if parallel_arg is False or parallel_arg == 1:
return SerialHelper(**kwargs)
if parallel_arg > 0 and parallel_arg < 1:
from multiprocessing import cpu_count
n = int(round(parallel_arg * cpu_count()))
return MultiprocessingPoolHelper(processes=n, **kwargs)
if isinstance(parallel_arg, ParallelHelper):
return parallel_arg
if isinstance(parallel_arg, six.integer_types):
return MultiprocessingPoolHelper(processes=parallel_arg, **kwargs)
raise ValueError('don\'t understand make_parallel_helper() argument %r'
% parallel_arg)