# -*- mode: python; coding: utf-8 -*-
# Copyright 2015 Peter Williams <peter@newton.cx> and collaborators
# Licensed under the MIT License.
"""pwkit.slurp - run a program and capture its output."""
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str("Event Redirection Slurper").split()
import fcntl, os, signal, subprocess
from select import select, error as selecterror
from . import Holder
try:
from subprocess import DEVNULL as _DEVNULL
except ImportError:
_DEVNULL = subprocess.STDOUT - 1
@Holder
class Event(object):
Stdout = "stdout"
Stderr = "stderr"
ForwardedSignal = "forwarded-signal"
Timeout = "timeout"
@Holder
class Redirection(object):
Pipe = subprocess.PIPE
Stdout = subprocess.STDOUT
DevNull = _DEVNULL
signals_for_child = [
signal.SIGHUP,
signal.SIGINT,
signal.SIGQUIT,
signal.SIGTERM,
signal.SIGUSR1,
signal.SIGUSR2,
]
class SlurperIterator(object):
def __init__(self, parent):
self.parent = parent
def __iter__(self):
return self
def __next__(self): # Python 3
if not len(self.parent._files):
raise StopIteration()
return self.parent._next_lowlevel()
next = __next__ # Python 2
def _decode_streams(event_source, which_events, encoding):
from codecs import getincrementaldecoder
decoders = {}
for etype, edata in event_source:
if etype not in which_events:
yield etype, edata
continue
dec = decoders.get(etype)
if dec is None:
dec = decoders[etype] = getincrementaldecoder(encoding)()
final = not len(edata)
result = dec.decode(edata, final)
if len(result):
yield etype, result # no false EOF indicators
if final:
yield etype, edata # make sure we have an EOF signal
def _linebreak_streams(event_source, which_events):
partials = {}
for etype, edata in event_source:
if etype not in which_events:
yield etype, edata
continue
if not len(edata):
# EOF on this stream.
trailer = partials.get(etype, edata)
if len(trailer):
yield etype, trailer
yield etype, edata
continue
lines = (partials.get(etype, edata * 0) + edata).split(edata.__class__(b"\n"))
for line in lines[:-1]:
yield etype, line
partials[etype] = lines[-1]
[docs]
class Slurper(object):
_chunksize = 1024
def __init__(
self,
argv=None,
env=None,
cwd=None,
propagate_signals=True,
timeout=10,
linebreak=False,
encoding=None,
stdin=Redirection.DevNull,
stdout=Redirection.Pipe,
stderr=Redirection.Pipe,
executable=None,
subproc_factory=None,
):
if subproc_factory is None:
subproc_factory = subprocess.Popen
self.subproc_factory = subproc_factory
self.proc = None
self.argv = argv
self.env = env
self.cwd = cwd
self.propagate_signals = propagate_signals
self.timeout = timeout
self.linebreak = linebreak
self.encoding = encoding
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.executable = executable
def __enter__(self):
self._prev_handlers = {}
self._other_events = []
self._file_event_types = {}
self._files = []
stdin = self.stdin
if stdin == Redirection.DevNull:
stdin = open(os.devnull, "r")
stdout = self.stdout
if stdout == Redirection.DevNull:
stdout = open(os.devnull, "w")
stderr = self.stderr
if stderr == Redirection.DevNull:
stderr = open(os.devnull, "w")
self.proc = self.subproc_factory(
self.argv,
env=self.env,
executable=self.executable,
cwd=self.cwd,
stdin=stdin,
stdout=stdout,
stderr=stderr,
shell=False,
)
if self.propagate_signals:
def handle(signum, frame):
self.proc.send_signal(signum)
self._other_events.insert(0, (Event.ForwardedSignal, signum))
for signum in signals_for_child:
self._prev_handlers[signum] = signal.signal(signum, handle)
if stdout == Redirection.Pipe:
self._file_event_types[self.proc.stdout.fileno()] = Event.Stdout
self._files.append(self.proc.stdout)
if stderr == Redirection.Pipe:
self._file_event_types[self.proc.stderr.fileno()] = Event.Stderr
self._files.append(self.proc.stderr)
for fd in self._files:
fl = fcntl.fcntl(fd.fileno(), fcntl.F_GETFL)
fcntl.fcntl(fd.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
return self
def _next_lowlevel(self):
if len(self._other_events):
return self._other_events.pop()
while True:
try:
rd, wr, er = select(self._files, [], [], self.timeout)
break
except selecterror as e:
# if EINTR or EAGAIN, try again; we won't get EINTR unless
# we're forwarding signals, since otherwise it'll show up as a
# KeyboardInterrupt. "e.args[0]" is the only way to get errno.
if e.args[0] not in (4, 11):
raise
for fd in rd:
chunk = fd.read(self._chunksize)
if not len(chunk):
self._files.remove(fd)
return (self._file_event_types[fd.fileno()], chunk)
return (Event.Timeout, None)
def __iter__(self):
result = SlurperIterator(self)
if self.encoding is not None:
which = frozenset((Event.Stdout, Event.Stderr))
result = _decode_streams(result, which, self.encoding)
if self.linebreak:
which = frozenset((Event.Stdout, Event.Stderr))
result = _linebreak_streams(result, which)
return result
def __exit__(self, etype, evalue, etb):
self.proc.wait()
for signum, prev_handler in self._prev_handlers.items():
signal.signal(signum, prev_handler)
return False