Source code for pwkit.slurp

# -*- mode: python; coding: utf-8 -*-
# Copyright 2015 Peter Williams <peter@newton.cx> and collaborators
# Licensed under the MIT License.

"""pwkit.slurp - run a program and capture its output."""

from __future__ import absolute_import, division, print_function, unicode_literals

__all__ = str ('Event Redirection Slurper').split ()

import fcntl, os, signal, six, subprocess, sys
from select import select, error as selecterror

from . import Holder

try:
    from subprocss import DEVNULL as _DEVNULL
except ImportError:
    _DEVNULL = subprocess.STDOUT - 1

@Holder
class Event (object):
    Stdout = 'stdout'
    Stderr = 'stderr'
    ForwardedSignal = 'forwarded-signal'
    Timeout = 'timeout'


@Holder
class Redirection (object):
    Pipe = subprocess.PIPE
    Stdout = subprocess.STDOUT
    DevNull = _DEVNULL


signals_for_child = [
    signal.SIGHUP,
    signal.SIGINT,
    signal.SIGQUIT,
    signal.SIGTERM,
    signal.SIGUSR1,
    signal.SIGUSR2,
]


class SlurperIterator (object):
    def __init__ (self, parent):
        self.parent = parent

    def __iter__ (self):
        return self

    def __next__ (self): # Python 3
        if not len (self.parent._files):
            raise StopIteration ()
        return self.parent._next_lowlevel ()

    next = __next__ # Python 2


def _decode_streams (event_source, which_events, encoding):
    from codecs import getincrementaldecoder
    decoders = {}

    for etype, edata in event_source:
        if etype not in which_events:
            yield etype, edata
            continue

        dec = decoders.get (etype)
        if dec is None:
            dec = decoders[etype] = getincrementaldecoder (encoding) ()

        final = not len (edata)
        result = dec.decode (edata, final)
        if len (result):
            yield etype, result # no false EOF indicators

        if final:
            yield etype, edata # make sure we have an EOF signal


def _linebreak_streams (event_source, which_events):
    partials = {}

    for etype, edata in event_source:
        if etype not in which_events:
            yield etype, edata
            continue

        if not len (edata):
            # EOF on this stream.
            trailer = partials.get (etype, edata)
            if len (trailer):
                yield etype, trailer
            yield etype, edata
            continue

        lines = (partials.get (etype, edata * 0) + edata).split (edata.__class__ (b'\n'))
        for line in lines[:-1]:
            yield etype, line
        partials[etype] = lines[-1]


[docs]class Slurper (object): _chunksize = 1024 def __init__ (self, argv=None, env=None, cwd=None, propagate_signals=True, timeout=10, linebreak=False, encoding=None, stdin=Redirection.DevNull, stdout=Redirection.Pipe, stderr=Redirection.Pipe, executable=None, subproc_factory=None): if subproc_factory is None: subproc_factory = subprocess.Popen self.subproc_factory = subproc_factory self.proc = None self.argv = argv self.env = env self.cwd = cwd self.propagate_signals = propagate_signals self.timeout = timeout self.linebreak = linebreak self.encoding = encoding self.stdin = stdin self.stdout = stdout self.stderr = stderr self.executable = executable def __enter__ (self): self._prev_handlers = {} self._other_events = [] self._file_event_types = {} self._files = [] stdin = self.stdin if stdin == Redirection.DevNull: stdin = open (os.devnull, 'r') stdout = self.stdout if stdout == Redirection.DevNull: stdout = open (os.devnull, 'w') stderr = self.stderr if stderr == Redirection.DevNull: stderr = open (os.devnull, 'w') self.proc = self.subproc_factory ( self.argv, env=self.env, executable=self.executable, cwd=self.cwd, stdin=stdin, stdout=stdout, stderr=stderr, shell=False, ) if self.propagate_signals: def handle (signum, frame): self.proc.send_signal (signum) self._other_events.insert (0, (Event.ForwardedSignal, signum)) for signum in signals_for_child: self._prev_handlers[signum] = signal.signal (signum, handle) if stdout == Redirection.Pipe: self._file_event_types[self.proc.stdout.fileno ()] = Event.Stdout self._files.append (self.proc.stdout) if stderr == Redirection.Pipe: self._file_event_types[self.proc.stderr.fileno ()] = Event.Stderr self._files.append (self.proc.stderr) for fd in self._files: fl = fcntl.fcntl (fd.fileno (), fcntl.F_GETFL) fcntl.fcntl (fd.fileno (), fcntl.F_SETFL, fl | os.O_NONBLOCK) return self def _next_lowlevel (self): if len (self._other_events): return self._other_events.pop () while True: try: rd, wr, er = select (self._files, [], [], self.timeout) break except selecterror as e: # if EINTR or EAGAIN, try again; we won't get EINTR unless # we're forwarding signals, since otherwise it'll show up as a # KeyboardInterrupt. "e.args[0]" is the only way to get errno. if e.args[0] not in (4, 11): raise for fd in rd: chunk = fd.read (self._chunksize) if not len (chunk): self._files.remove (fd) return (self._file_event_types[fd.fileno ()], chunk) return (Event.Timeout, None) def __iter__ (self): result = SlurperIterator (self) if self.encoding is not None: which = frozenset ((Event.Stdout, Event.Stderr)) result = _decode_streams (result, which, self.encoding) if self.linebreak: which = frozenset ((Event.Stdout, Event.Stderr)) result = _linebreak_streams (result, which) return result def __exit__ (self, etype, evalue, etb): self.proc.wait () for signum, prev_handler in six.iteritems (self._prev_handlers): signal.signal (signum, prev_handler) return False