File: //lib/python3/dist-packages/genshi/input.py
# -*- coding: utf-8 -*-
#
# Copyright (C) 2006-2009 Edgewall Software
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://genshi.edgewall.org/wiki/License.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://genshi.edgewall.org/log/.
"""Support for constructing markup streams from files, strings, or other
sources.
"""
from itertools import chain
import codecs
from xml.parsers import expat
import six
from six.moves import html_entities as entities, html_parser as html
from genshi.core import Attrs, QName, Stream, stripentities
from genshi.core import START, END, XML_DECL, DOCTYPE, TEXT, START_NS, \
END_NS, START_CDATA, END_CDATA, PI, COMMENT
from genshi.compat import StringIO, BytesIO
__all__ = ['ET', 'ParseError', 'XMLParser', 'XML', 'HTMLParser', 'HTML']
__docformat__ = 'restructuredtext en'
def ET(element):
"""Convert a given ElementTree element to a markup stream.
:param element: an ElementTree element
:return: a markup stream
"""
tag_name = QName(element.tag.lstrip('{'))
attrs = Attrs([(QName(attr.lstrip('{')), value)
for attr, value in element.items()])
yield START, (tag_name, attrs), (None, -1, -1)
if element.text:
yield TEXT, element.text, (None, -1, -1)
for child in element:
for item in ET(child):
yield item
yield END, tag_name, (None, -1, -1)
if element.tail:
yield TEXT, element.tail, (None, -1, -1)
class ParseError(Exception):
"""Exception raised when fatal syntax errors are found in the input being
parsed.
"""
def __init__(self, message, filename=None, lineno=-1, offset=-1):
"""Exception initializer.
:param message: the error message from the parser
:param filename: the path to the file that was parsed
:param lineno: the number of the line on which the error was encountered
:param offset: the column number where the error was encountered
"""
self.msg = message
if filename:
message += ', in ' + filename
Exception.__init__(self, message)
self.filename = filename or '<string>'
self.lineno = lineno
self.offset = offset
class XMLParser(object):
"""Generator-based XML parser based on roughly equivalent code in
Kid/ElementTree.
The parsing is initiated by iterating over the parser object:
>>> parser = XMLParser(StringIO('<root id="2"><child>Foo</child></root>'))
>>> for kind, data, pos in parser:
... print('%s %s' % (kind, data))
START (QName('root'), Attrs([(QName('id'), '2')]))
START (QName('child'), Attrs())
TEXT Foo
END child
END root
"""
_entitydefs = ['<!ENTITY %s "&#%d;">' % (name, value) for name, value in
entities.name2codepoint.items()]
_external_dtd = u'\n'.join(_entitydefs).encode('utf-8')
def __init__(self, source, filename=None, encoding=None):
"""Initialize the parser for the given XML input.
:param source: the XML text as a file-like object
:param filename: the name of the file, if appropriate
:param encoding: the encoding of the file; if not specified, the
encoding is assumed to be ASCII, UTF-8, or UTF-16, or
whatever the encoding specified in the XML declaration
(if any)
"""
self.source = source
self.filename = filename
# Setup the Expat parser
parser = expat.ParserCreate(encoding, '}')
parser.buffer_text = True
# Python 3 does not have returns_unicode
if hasattr(parser, 'returns_unicode'):
parser.returns_unicode = True
parser.ordered_attributes = True
parser.StartElementHandler = self._handle_start
parser.EndElementHandler = self._handle_end
parser.CharacterDataHandler = self._handle_data
parser.StartDoctypeDeclHandler = self._handle_doctype
parser.StartNamespaceDeclHandler = self._handle_start_ns
parser.EndNamespaceDeclHandler = self._handle_end_ns
parser.StartCdataSectionHandler = self._handle_start_cdata
parser.EndCdataSectionHandler = self._handle_end_cdata
parser.ProcessingInstructionHandler = self._handle_pi
parser.XmlDeclHandler = self._handle_xml_decl
parser.CommentHandler = self._handle_comment
# Tell Expat that we'll handle non-XML entities ourselves
# (in _handle_other)
parser.DefaultHandler = self._handle_other
parser.SetParamEntityParsing(expat.XML_PARAM_ENTITY_PARSING_ALWAYS)
parser.UseForeignDTD()
parser.ExternalEntityRefHandler = self._build_foreign
self.expat = parser
self._queue = []
def parse(self):
"""Generator that parses the XML source, yielding markup events.
:return: a markup event stream
:raises ParseError: if the XML text is not well formed
"""
def _generate():
try:
bufsize = 4 * 1024 # 4K
done = False
while 1:
while not done and len(self._queue) == 0:
data = self.source.read(bufsize)
if not data: # end of data
if hasattr(self, 'expat'):
self.expat.Parse('', True)
del self.expat # get rid of circular references
done = True
else:
if isinstance(data, six.text_type):
data = data.encode('utf-8')
self.expat.Parse(data, False)
for event in self._queue:
yield event
self._queue = []
if done:
break
except expat.ExpatError as e:
msg = str(e)
raise ParseError(msg, self.filename, e.lineno, e.offset)
return Stream(_generate()).filter(_coalesce)
def __iter__(self):
return iter(self.parse())
def _build_foreign(self, context, base, sysid, pubid):
parser = self.expat.ExternalEntityParserCreate(context)
parser.ParseFile(BytesIO(self._external_dtd))
return 1
def _enqueue(self, kind, data=None, pos=None):
if pos is None:
pos = self._getpos()
if kind is TEXT:
# Expat reports the *end* of the text event as current position. We
# try to fix that up here as much as possible. Unfortunately, the
# offset is only valid for single-line text. For multi-line text,
# it is apparently not possible to determine at what offset it
# started
if '\n' in data:
lines = data.splitlines()
lineno = pos[1] - len(lines) + 1
offset = -1
else:
lineno = pos[1]
offset = pos[2] - len(data)
pos = (pos[0], lineno, offset)
self._queue.append((kind, data, pos))
def _getpos_unknown(self):
return (self.filename, -1, -1)
def _getpos(self):
return (self.filename, self.expat.CurrentLineNumber,
self.expat.CurrentColumnNumber)
def _handle_start(self, tag, attrib):
attrs = Attrs([(QName(name), value) for name, value in
zip(*[iter(attrib)] * 2)])
self._enqueue(START, (QName(tag), attrs))
def _handle_end(self, tag):
self._enqueue(END, QName(tag))
def _handle_data(self, text):
self._enqueue(TEXT, text)
def _handle_xml_decl(self, version, encoding, standalone):
self._enqueue(XML_DECL, (version, encoding, standalone))
def _handle_doctype(self, name, sysid, pubid, has_internal_subset):
self._enqueue(DOCTYPE, (name, pubid, sysid))
def _handle_start_ns(self, prefix, uri):
self._enqueue(START_NS, (prefix or '', uri))
def _handle_end_ns(self, prefix):
self._enqueue(END_NS, prefix or '')
def _handle_start_cdata(self):
self._enqueue(START_CDATA)
def _handle_end_cdata(self):
self._enqueue(END_CDATA)
def _handle_pi(self, target, data):
self._enqueue(PI, (target, data))
def _handle_comment(self, text):
self._enqueue(COMMENT, text)
def _handle_other(self, text):
if text.startswith('&'):
# deal with undefined entities
try:
text = six.unichr(entities.name2codepoint[text[1:-1]])
self._enqueue(TEXT, text)
except KeyError:
filename, lineno, offset = self._getpos()
error = expat.error('undefined entity "%s": line %d, column %d'
% (text, lineno, offset))
error.code = expat.errors.XML_ERROR_UNDEFINED_ENTITY
error.lineno = lineno
error.offset = offset
raise error
def XML(text):
"""Parse the given XML source and return a markup stream.
Unlike with `XMLParser`, the returned stream is reusable, meaning it can be
iterated over multiple times:
>>> xml = XML('<doc><elem>Foo</elem><elem>Bar</elem></doc>')
>>> print(xml)
<doc><elem>Foo</elem><elem>Bar</elem></doc>
>>> print(xml.select('elem'))
<elem>Foo</elem><elem>Bar</elem>
>>> print(xml.select('elem/text()'))
FooBar
:param text: the XML source
:return: the parsed XML event stream
:raises ParseError: if the XML text is not well-formed
"""
return Stream(list(XMLParser(StringIO(text))))
class HTMLParser(html.HTMLParser, object):
"""Parser for HTML input based on the Python `HTMLParser` module.
This class provides the same interface for generating stream events as
`XMLParser`, and attempts to automatically balance tags.
The parsing is initiated by iterating over the parser object:
>>> parser = HTMLParser(BytesIO(u'<UL compact><LI>Foo</UL>'.encode('utf-8')), encoding='utf-8')
>>> for kind, data, pos in parser:
... print('%s %s' % (kind, data))
START (QName('ul'), Attrs([(QName('compact'), 'compact')]))
START (QName('li'), Attrs())
TEXT Foo
END li
END ul
"""
_EMPTY_ELEMS = frozenset(['area', 'base', 'basefont', 'br', 'col', 'frame',
'hr', 'img', 'input', 'isindex', 'link', 'meta',
'param'])
def __init__(self, source, filename=None, encoding=None):
"""Initialize the parser for the given HTML input.
:param source: the HTML text as a file-like object
:param filename: the name of the file, if known
:param filename: encoding of the file; ignored if the input is unicode
"""
html.HTMLParser.__init__(self)
self.source = source
self.filename = filename
self.encoding = encoding
self._queue = []
self._open_tags = []
def parse(self):
"""Generator that parses the HTML source, yielding markup events.
:return: a markup event stream
:raises ParseError: if the HTML text is not well formed
"""
def _generate():
if self.encoding:
reader = codecs.getreader(self.encoding)
source = reader(self.source)
else:
source = self.source
try:
bufsize = 4 * 1024 # 4K
done = False
while 1:
while not done and len(self._queue) == 0:
data = source.read(bufsize)
if not data: # end of data
self.close()
done = True
else:
if not isinstance(data, six.text_type):
raise UnicodeError("source returned bytes, but no encoding specified")
self.feed(data)
for kind, data, pos in self._queue:
yield kind, data, pos
self._queue = []
if done:
open_tags = self._open_tags
open_tags.reverse()
for tag in open_tags:
yield END, QName(tag), pos
break
except html.HTMLParseError as e:
msg = '%s: line %d, column %d' % (e.msg, e.lineno, e.offset)
raise ParseError(msg, self.filename, e.lineno, e.offset)
return Stream(_generate()).filter(_coalesce)
def __iter__(self):
return iter(self.parse())
def _enqueue(self, kind, data, pos=None):
if pos is None:
pos = self._getpos()
self._queue.append((kind, data, pos))
def _getpos(self):
lineno, column = self.getpos()
return (self.filename, lineno, column)
def handle_starttag(self, tag, attrib):
fixed_attrib = []
for name, value in attrib: # Fixup minimized attributes
if value is None:
value = name
fixed_attrib.append((QName(name), stripentities(value)))
self._enqueue(START, (QName(tag), Attrs(fixed_attrib)))
if tag in self._EMPTY_ELEMS:
self._enqueue(END, QName(tag))
else:
self._open_tags.append(tag)
def handle_endtag(self, tag):
if tag not in self._EMPTY_ELEMS:
while self._open_tags:
open_tag = self._open_tags.pop()
self._enqueue(END, QName(open_tag))
if open_tag.lower() == tag.lower():
break
def handle_data(self, text):
self._enqueue(TEXT, text)
def handle_charref(self, name):
if name.lower().startswith('x'):
text = six.unichr(int(name[1:], 16))
else:
text = six.unichr(int(name))
self._enqueue(TEXT, text)
def handle_entityref(self, name):
try:
text = six.unichr(entities.name2codepoint[name])
except KeyError:
text = '&%s;' % name
self._enqueue(TEXT, text)
def handle_pi(self, data):
if data.endswith('?'):
data = data[:-1]
try:
target, data = data.split(None, 1)
except ValueError:
# PI with no data
target = data
data = ''
self._enqueue(PI, (target.strip(), data.strip()))
def handle_comment(self, text):
self._enqueue(COMMENT, text)
def HTML(text, encoding=None):
"""Parse the given HTML source and return a markup stream.
Unlike with `HTMLParser`, the returned stream is reusable, meaning it can be
iterated over multiple times:
>>> html = HTML('<body><h1>Foo</h1></body>', encoding='utf-8')
>>> print(html)
<body><h1>Foo</h1></body>
>>> print(html.select('h1'))
<h1>Foo</h1>
>>> print(html.select('h1/text()'))
Foo
:param text: the HTML source
:return: the parsed XML event stream
:raises ParseError: if the HTML text is not well-formed, and error recovery
fails
"""
if isinstance(text, six.text_type):
# If it's unicode text the encoding should be set to None.
# The option to pass in an incorrect encoding is for ease
# of writing doctests that work in both Python 2.x and 3.x.
return Stream(list(HTMLParser(StringIO(text), encoding=None)))
return Stream(list(HTMLParser(BytesIO(text), encoding=encoding)))
def _coalesce(stream):
"""Coalesces adjacent TEXT events into a single event."""
textbuf = []
textpos = None
for kind, data, pos in chain(stream, [(None, None, None)]):
if kind is TEXT:
textbuf.append(data)
if textpos is None:
textpos = pos
else:
if textbuf:
yield TEXT, ''.join(textbuf), textpos
del textbuf[:]
textpos = None
if kind:
yield kind, data, pos