# -*- mode: python; coding: utf-8 -*-
# Copyright 2012-2014 Peter Williams <peter@newton.cx> and collaborators.
# Licensed under the MIT License.
"""A simple parser for ini-style files that's better than Python's
ConfigParser/configparser.
Functions:
read
Generate a stream of `pwkit.Holder` instances from an ini-format file.
mutate
Rewrite an ini file chunk by chunk.
write
Write a stream of `pwkit.Holder` instances to an ini-format file.
mutate_stream
Lower-level version; only operates on streams, not path names.
read_stream
Lower-level version; only operates on streams, not path names.
write_stream
Lower-level version; only operates on streams, not path names.
mutate_in_place
Rewrite an ini file specififed by its path name, in place.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
__all__ = str(
"""FileChunk InifileError mutate_in_place mutate_stream
mutate read_stream read write_stream write"""
).split()
import io, os, re
from . import Holder, PKError
sectionre = re.compile(r"^\[(.*)]\s*$")
keyre = re.compile(r"^(\S+)\s*=(.*)$") # leading space chomped later
escre = re.compile(r'^(\S+)\s*=\s*"(.*)"\s*$')
[docs]
class InifileError(PKError):
pass
[docs]
def read_stream(stream):
"""Python 3 compat note: we're assuming `stream` gives bytes not unicode."""
section = None
key = None
data = None
for fullline in stream:
line = fullline.split("#", 1)[0]
m = sectionre.match(line)
if m is not None:
# New section
if section is not None:
if key is not None:
section.set_one(key, data.strip())
key = data = None
yield section
section = Holder()
section.section = m.group(1)
continue
if len(line.strip()) == 0:
if key is not None:
section.set_one(key, data.strip())
key = data = None
continue
m = escre.match(fullline)
if m is not None:
if section is None:
raise InifileError("key seen without section!")
if key is not None:
section.set_one(key, data.strip())
key = m.group(1)
data = (
m.group(2).replace(r"\"", '"').replace(r"\n", "\n").replace(r"\\", "\\")
)
section.set_one(key, data)
key = data = None
continue
m = keyre.match(line)
if m is not None:
if section is None:
raise InifileError("key seen without section!")
if key is not None:
section.set_one(key, data.strip())
key = m.group(1)
data = m.group(2)
if not len(data):
data = " "
elif not data[-1].isspace():
data += " "
continue
if line[0].isspace() and key is not None:
data += line.strip() + " "
continue
raise InifileError("unparsable line: " + line[:-1])
if section is not None:
if key is not None:
section.set_one(key, data.strip())
yield section
def read(stream_or_path):
if isinstance(stream_or_path, str):
return read_stream(io.open(stream_or_path, "rt"))
return read_stream(stream_or_path)
# Writing
[docs]
def write_stream(stream, holders, defaultsection=None):
"""Very simple writing in ini format. The simple stringification of each value
in each Holder is printed, and no escaping is performed. (This is most
relevant for multiline values or ones containing pound signs.) `None` values are
skipped.
Arguments:
stream
A text stream to write to.
holders
An iterable of objects to write. Their fields will be
written as sections.
defaultsection=None
Section name to use if a holder doesn't contain a
`section` field.
"""
anybefore = False
for h in holders:
if anybefore:
print("", file=stream)
s = h.get("section", defaultsection)
if s is None:
raise ValueError("cannot determine section name for item <%s>" % h)
print("[%s]" % s, file=stream)
for k in sorted(x for x in h.__dict__.keys() if x != "section"):
v = h.get(k)
if v is None:
continue
print("%s = %s" % (k, v), file=stream)
anybefore = True
[docs]
def write(stream_or_path, holders, **kwargs):
"""Very simple writing in ini format. The simple stringification of each value
in each Holder is printed, and no escaping is performed. (This is most
relevant for multiline values or ones containing pound signs.) `None` values are
skipped.
Arguments:
stream
A text stream to write to.
holders
An iterable of objects to write. Their fields will be
written as sections.
defaultsection=None
Section name to use if a holder doesn't contain a
`section` field.
"""
if isinstance(stream_or_path, str):
return write_stream(io.open(stream_or_path, "wt"), holders, **kwargs)
else:
return write_stream(stream_or_path, holders, **kwargs)
# Parsing plus inline modification, preserving the file as much as possible.
#
# I'm pretty sure that this code gets the corner cases right, but it hasn't
# been thoroughly tested, and it's a little hairy ...
class FileChunk(object):
def __init__(self):
self.data = Holder()
self._lines = []
def _addLine(self, line, assoc):
self._lines.append((assoc, line))
def set(self, name, value):
newline = (("%s = %s" % (name, value)) + os.linesep).encode("utf8")
first = True
for i in range(len(self._lines)):
assoc, line = self._lines[i]
if assoc != name:
continue
if first:
self._lines[i] = (assoc, newline)
first = False
else:
# delete the line
self._lines[i] = (None, None)
if first:
# Need to append the line to the last block
for i in range(len(self._lines) - 1, -1, -1):
if self._lines[i][0] is not None:
break
self._lines.insert(i + 1, (name, newline))
def emit(self, stream):
for assoc, line in self._lines:
if line is None:
continue
stream.write(line)
[docs]
def mutate_stream(instream, outstream):
"""Python 3 compat note: we're assuming `stream` gives bytes not unicode."""
chunk = None
key = None
data = None
misclines = []
for fullline in instream:
line = fullline.split("#", 1)[0]
m = sectionre.match(line)
if m is not None:
# New chunk
if chunk is not None:
if key is not None:
chunk.data.set_one(key, data.strip())
key = data = None
yield chunk
chunk.emit(outstream)
chunk = FileChunk()
for miscline in misclines:
chunk._addLine(miscline, None)
misclines = []
chunk.data.section = m.group(1)
chunk._addLine(fullline, None)
continue
if len(line.strip()) == 0:
if key is not None:
chunk.data.set_one(key, data.strip())
key = data = None
if chunk is not None:
chunk._addLine(fullline, None)
else:
misclines.append(fullline)
continue
m = escre.match(fullline)
if m is not None:
if chunk is None:
raise InifileError("key seen without section!")
if key is not None:
chunk.data.set_one(key, data.strip())
key = m.group(1)
data = (
m.group(2).replace(r"\"", '"').replace(r"\n", "\n").replace(r"\\", "\\")
)
chunk.data.set_one(key, data)
chunk._addLine(fullline, key)
key = data = None
continue
m = keyre.match(line)
if m is not None:
if chunk is None:
raise InifileError("key seen without section!")
if key is not None:
chunk.data.set_one(key, data.strip())
key = m.group(1)
data = m.group(2)
if not data[-1].isspace():
data += " "
chunk._addLine(fullline, key)
continue
if line[0].isspace() and key is not None:
data += line.strip() + " "
chunk._addLine(fullline, key)
continue
raise InifileError("unparsable line: " + line[:-1])
if chunk is not None:
if key is not None:
chunk.data.set_one(key, data.strip())
yield chunk
chunk.emit(outstream)
def mutate(instream_or_path, outstream_or_path, outmode="wb"):
if isinstance(instream_or_path, str):
instream_or_path = io.open(instream_or_path, "rb")
if isinstance(outstream_or_path, str):
outstream_or_path = io.open(outstream_or_path, outmode)
return mutate_stream(instream_or_path, outstream_or_path)
def mutate_in_place(inpath):
from sys import exc_info
from os import rename, unlink
tmppath = inpath + ".new"
with io.open(inpath, "rb") as instream:
try:
with io.open(tmppath, "wb") as outstream:
for item in mutate_stream(instream, outstream):
yield item
rename(tmppath, inpath)
except:
try:
os.unlink(tmppath)
except Exception:
pass
raise