##
## Name: scrubby.py
## Purpose: A tag-soup parser for HTML and other similar markup languages.
## Author: M. J. Fromberger
##
## Copyright (c) 2009-2010 Michael J. Fromberger. All Rights Reserved.
##
__version__ = "1.021"
import collections, functools, re, sys
# {{ Compatibility definitions
if sys.version_info[0] < 3:
def is_string(obj):
return isinstance(obj, basestring)
else:
def is_string(obj):
return isinstance(obj, str)
def is_callable(obj):
return isinstance(obj, collections.Callable)
# }}
# {{ Exception classes
class parse_error (Exception):
pass
class parse_failed (parse_error):
pass
# }}
# {{ @bounded(meth)
def bounded(meth):
"""Convert a scanner method taking a starting position and
returning a tuple of (tag, endpos), so that the resulting method
returns (tag, startpos, endpos). Abstracts the common bookkeeping
for several of the methods of markup_scanner.
"""
def wrapper(self, start, *args, **kw):
res = meth(self, start, *args, **kw)
if res is None:
raise parse_failed(meth.__name__, start, *args)
return res[0], start, res[1]
return functools.update_wrapper(wrapper, meth)
# }}
# {{ @unescape(meth)
def unescape(meth):
def wrapper(self, *args, **kw):
esc = self.parser.ENTITY_NAME_MAP
def replace_entity(m):
return esc.get(m.group('key'), m.group(0))
value = meth(self, *args, **kw)
return re.sub(r'&(?P\w+);', replace_entity, value)
return functools.update_wrapper(wrapper, meth)
# }}
# {{ class markup_scanner
class markup_scanner (object):
"""Implements a permissive lexical analyzer for HTML and
other similar markup languages.
Usage:
s = markup_scanner(input_string)
for tag, start_pos, end_pos, data in s.scan():
process(tag, start_pos, end_pos, data)
The components of each result are:
tag -- a string describing what the token is.
start_pos -- the starting offset of the token.
end_pos -- the offset one past the end of the token.
data -- token-specific data, or None.
The tag values are:
'eof' -- end of input.
'com', 'ucom' -- a comment (unterminated if 'ucom').
'dir', 'udir' -- a markup directive (e.g., ).
'start_tag' -- an opening tag (e.g., ).
'self_tag' -- a self-delimited tag (e.g.,
).
'end_tag' -- a closing tag (e.g., ).
'text' -- running text.
The data values are:
'start_tag' -- a dictionary giving tag name and attributes.
'self_tag' -- a dictionary giving tag name and attributes.
'text' -- a list of entities found in the text.
'dir', 'udir' -- a dictionary giving directive name and content.
Tag names ('name') are given as tuples of (start_pos, end_pos).
Tag attributes ('attribs') are given as a list of tuples, each
having one of the following forms:
('none', 0, 0) -- for bare attributes.
('name', s, e) -- for unquoted attribute values.
('str', s, e) -- for quoted attribute values, terminated.
('ustr', s, e) -- for quoted attribute values, unterminated.
All data values not described above will be None. A string or
comment value is "terminated" if its end marker occurs before the
end of input; otherwise it is unterminated.
"""
def __init__(self, src):
"""Create a new scanner using the string src as input."""
if not is_string(src):
raise TypeError("input must be a string")
self.input = src
def scan(self, start_pos = 0, **opts):
"""Produces a generator for each token found in the input,
starting at the specified starting offset. Thread-safe.
Throws parse_error if for some reason a token could not be
consumed from the input. This should not happen under
ordinary circumstances.
"""
end = start_pos
otag, ostart, oend, odata = '', 0, 0, None
while True:
tag, start, end, data = self.parse_one(end, **opts)
# Fold multiple consecutive text tokens into one.
if tag == otag == 'text':
oend = end
odata.extend(data)
else:
if otag:
yield otag, ostart, oend, odata
otag, ostart, oend, odata = tag, start, end, data
if tag == 'eof':
break
if otag:
yield otag, ostart, oend, odata
# --- Private methods ----------------------------------------------
# Conventions:
# scan_* methods return a tuple of "tag", start, end
# parse_* methods return a tuple of "tag", start, end, data
#
# All methods should return a valid tuple or throw parse_failed.
# The @bounded decorator can help you keep track of this.
def get_char(self, p):
return self.input[p] if 0 <= p < len(self.input) else ''
@bounded
def scan_string(self, p):
t = self.input
q = t[p]
p += 1
while self.get_char(p) not in (q, ''):
p += 1
if p < len(t):
return 'str', p + 1
else:
return 'ustr', p # Unterminated string
@bounded
def scan_name(self, p):
ig, u, v = self.scan_unquoted(p, '=')
return 'name', v
@bounded
def scan_unquoted(self, p, end_marks = '', allow_blank = False):
s = p ; ends = '\x00"\'>' + end_marks
while True:
c = self.get_char(p)
if (not c or
c in ends or
c.isspace() or
ord(c) < 32):
break
p += 1
ok = s <= p if allow_blank else s < p
if ok:
return 'unq', p
@bounded
def scan_space(self, p):
while self.get_char(p).isspace():
p += 1
return 'ws', p
@bounded
def scan_literal(self, p, value):
end = p + len(value)
if self.input[p:end] == value:
return 'lit', end
@bounded
def scan_entity(self, p):
while True:
c = self.get_char(p)
if not c or c.isspace():
return 'uent', p
elif c == ';':
return 'ent', p + 1
p += 1
@bounded
def scan_comment(self, p):
ig, u, v = self.scan_literal(p, '':
return 'com', v + 3
v += 1
# Unterminated comment
return 'ucom', v
def parse_attrib(self, p, **opts):
ntag, ns, ne = self.scan_name(p)
vtag, vs, ve = 'none', ne, ne
try:
ig, u, v = self.scan_space(ne)
ig, u, v = self.scan_literal(v, '=')
ig, u, v = self.scan_space(v)
if self.get_char(v) in ('"', "'"):
vtag, vs, ve = self.scan_string(v)
elif opts.get('allow_unquoted', True):
end_marks = '=' + opts.get('end_marks', '')
vtag, vs, ve = self.scan_unquoted(
v, end_marks, opts.get('allow_blank', True))
except parse_failed:
pass
return 'attr', p, ve, dict(
name = (ns, ne),
value = (vtag, vs, ve))
def parse_attribs(self, p, **opts):
data = []
v = p
while True:
ig, u, v = self.scan_space(v)
try:
ig, u, v, info = self.parse_attrib(v, **opts)
data.append((u, v, info))
except parse_failed:
break
return 'attrs', p, v, data
def parse_starttag(self, p, **opts):
ig, u, v = self.scan_literal(p, '<')
ntag, ns, ne = self.scan_name(v)
atag, atts, atte, data = self.parse_attribs(ne, **opts)
kind = 'start_tag'
if self.get_char(atte) == '/':
kind = 'self_tag'
atte += 1
if self.get_char(atte) != '>':
raise parse_failed("parse_starttag", p)
return kind, p, atte + 1, dict(
name = (ns, ne),
attribs = (atts, atte, data))
def parse_endtag(self, p, **opts):
ig, u, v = self.scan_literal(p, '')
ntag, ns, ne = self.scan_name(v)
ig, u, v = self.scan_space(ne)
if self.get_char(v) != '>':
raise parse_failed("parse_endtag", p)
return 'end_tag', p, v + 1, dict(
name = (ns, ne))
def parse_directive(self, p, end_mark, **opts):
ig, ns, ne = self.scan_name(p + 2) # skip past = len(t):
return 'eof', p, p, None
elif t[p] == '<':
np = self.get_char(p + 1)
try:
if np == '!':
try:
tag, u, v = self.scan_comment(p)
return tag, u, v, None
except parse_failed: pass
return self.parse_directive(p, '>', **opts)
elif np == '?':
return self.parse_directive(p, '?>', **opts)
elif np == '/':
return self.parse_endtag(p, **opts)
else:
return self.parse_starttag(p, **opts)
except parse_failed:
c += 1 # skip past an unconsumed "<"
# Falling through to here means we're in text data
while c < len(t) and t[c] != '<':
c += 1
return 'text', p, c, self.find_entities(p, c)
# }}
# {{ class markup_object
class markup_object (object):
"""Represents a markup object such as a tag, a comment, or a run of
text discovered by the markup parser.
Key attributes:
.obj_type -- string describing the type of markup object.
.start_pos -- offset of the beginning of the object in the source.
.end_pos -- offset of the end of the object in the source.
.source -- the literal text of the object.
.next_obj -- the next object in the input (or None).
.prev_obj -- the previous object in the input (or None).
.next_sib -- the right sibling of this object (or None).
.prev_sib -- the left sibling of this object (or None).
.parent -- the smallest enclosing tag (or None).
.partner -- the matching tag (for start/end tags).
.contents -- list of objects between this and its partner.
.siblings -- list of all the parent's children.
.path -- the tag path from the root to this object.
.linepos -- the (line, column) of this object in the source.
The parent of an object is defined as the closest tag of which the
object is a child, if any.
The .partner attribute is writable; you may set the partner of any
object to any other object. The partner relationship is not mutual;
object A may have partner B, but B may not have partner A. Setting
partner to None, or deleting partner, removes the relationship.
Partnerships between start/end tags are computed by the parser,
but you are free to manipulate them if you wish. The .contents of
are those objects occurring between (but not including) the object
and its partner; this list is empty for objects with no partner.
"""
def __init__(self, parser, id):
self.parser = parser
self.obj_id = id
def __reprtag__(self):
return '|'
def __repr__(self):
return '#<%s type=%r %s %d..%d>' % (
type(self).__name__,
self.obj_type, self.__reprtag__(),
self.start_pos, self.end_pos)
def __str__(self):
return self.source
@property
def obj_type(self):
obj = self.parser.get_object(self.obj_id)
return obj[0]
@property
def start_pos(self):
obj = self.parser.get_object(self.obj_id)
return obj[1]
@property
def end_pos(self):
obj = self.parser.get_object(self.obj_id)
return obj[2]
@property
def source(self):
return self.parser.get_substring(
self.start_pos, self.end_pos)
@property
def next_obj(self):
if self.obj_id < len(self.parser) - 1:
return self.parser.get_wrapper(self.obj_id + 1)
else:
return None
@property
def prev_obj(self):
if self.obj_id <= 0:
return None
else:
return self.parser.get_wrapper(self.obj_id - 1)
@property
def next_sib(self):
sibs = self.siblings
if sibs:
me = sibs.index(self)
if me < len(sibs) - 1:
return sibs[me + 1]
return None
@property
def prev_sib(self):
sibs = self.siblings
if sibs:
me = sibs.index(self)
if me > 0:
return sibs[me - 1]
return None
@property
def parent(self):
if hasattr(self, '_parent'):
return self._parent
# This is just one-dimensional breadth-first search,
# saturating at the ends of the line.
pre = self.obj_id - 1
post = self.obj_id + 1
while pre >= 0 or post < len(self.parser):
if pre >= 0:
t = self.parser.get_wrapper(pre)
if (t.obj_type == 'start_tag' and
self in t.contents and
(self.partner is None or
self.partner in t.contents)):
self._parent = t ; return t
pre -= 1
if post < len(self.parser):
t = self.parser.get_wrapper(post)
if (t.obj_type == 'end_tag' and
self in t.contents and
(self.partner is None or
self.partner in t.contents)):
self._parent = t.partner
return self._parent
post += 1
# If we don't find a parent, we won't cache it, since adding
# source later might create a viable parent.
return None
@property
def siblings(self):
if self.parent is None:
return list(s for s in self.parser.wrappers
if s.parent is None)
else:
return self.parent.children
@property
def partner(self):
m = self.parser.matches
w = self.parser.wrappers
if self.obj_id in m:
return w[m[self.obj_id]]
else:
return None
@partner.setter
def partner(self, other):
self.parser.set_partner(self.obj_id, other and other.obj_id)
@partner.deleter
def partner(self):
self.parser.set_partner(self.obj_id, None)
@property
def contents(self):
p = self.partner
if p is not None:
lo = min(self.obj_id, p.obj_id)
hi = max(self.obj_id, p.obj_id)
return list(self.parser.get_wrapper(id)
for id in range(lo + 1, hi))
return [] # default, no contents
@property
def path(self):
path = []
cur = self
while cur is not None:
path.append(cur)
cur = cur.parent
path.reverse()
return path
@property
def linepos(self):
return self.parser.get_linepos(self.start_pos)
def get_object_data(self):
obj = self.parser.get_object(self.obj_id)
return obj[3]
# }}
# {{ class markup_dir
class markup_dir (markup_object):
"""Represents an SGML or XML style processing directive.
Key attributes (in addition to those of markup_object):
.dir_name -- a string giving the directive name.
.innersource -- the source between the name and the end marker.
"""
@property
def dir_name(self):
data = self.get_object_data()
start, end = data['name']
return self.parser.get_substring(start, end)
@property
def innersource(self):
data = self.get_object_data()
start, end = data['content']
return self.parser.get_substring(start, end)
# }}
# {{ class markup_tag
class markup_tag (markup_object):
"""Represents a start tag, an end tag, or a self-contained tag.
Key attributes (in addition to those of markup_object):
.tag_name -- string giving the name of the tag.
.attributes -- a list of markup_attribute objects (for start tags).
.children -- a list of direct descendants of this tag.
.innersource -- the source text between this tag and its partner.
The contents of a start tag are defined as those objects fully
contained between the end point of the start tag and the start
point of its partner, if any; if no partner is defined, the tag is
considered to have no contents.
The contents of an end tag are the contents of its corresponding
partner, if any; if no partner is defined, the tag is considered
to have no contents.
Self-delimiting tags do not have contents.
The children of a tag are those contents of the tag whose parent
is the tag itself (i.e., only direct descendants are children).
"""
def __reprtag__(self):
return 'name=%r' % self.tag_name
def find(self, **opts):
"""As markup_parser.find(...) but considering only the
contents of this tag, if any.
"""
return self.parser.find(search_inside = self, **opts)
def first(self, **opts):
"""As markup_parser.first(...) but considering only the
contents of this tag, if any.
"""
return self.parser.first(search_inside = self, **opts)
@property
def tag_name(self):
data = self.get_object_data()
start, end = data['name']
return self.parser.get_substring(start, end)
@property
def children(self):
return list(obj for obj in self.contents
if obj.parent is self)
@property
def innersource(self):
if self.obj_type == 'start_tag':
p = self.partner
if p is not None:
return self.parser.get_substring(self.end_pos,
p.start_pos)
elif self.obj_type == 'end_tag':
p = self.partner
if p is not None:
return p.innersource
return '' # default, no content
# }}
# {{ class markup_atag
class markup_atag (markup_tag):
"""Represents a start tag or self-contained tag, which may have
attributes associated with it.
Key attributes (in addition to those of markup_tag):
.attributes -- a list of markup_attribute objects.
"""
def __getitem__(self, key):
return self.getattr(key)
def __contains__(self, key):
return key.lower() in self.keys()
@property
def attributes(self):
if getattr(self, "_attribs", None) is None:
data = self.get_object_data()
u, v, elts = data['attribs']
self._attribs = list(
markup_attribute(self.parser,
self.obj_id, p)
for p in range(len(elts)))
return self._attribs
def keys(self):
"""Return a list of the attribute names defined on this tag."""
return list(s.name.lower() for s in self.attributes)
def getattr(self, name, *default):
"""Get the attribute whose name is given. If absent, and a
default value is given, the default value is returned;
otherwise KeyError is raised.
"""
for attr in self.attributes:
if attr.name.lower() == name.lower():
return attr
else:
if default:
return default[0]
raise KeyError(name)
# }}
# {{ class markup_text
class markup_text (markup_object):
"""Defines a run of text found in the markup.
Key attributes (in addition to those of markup_object):
.entities -- a list of markup_entity objects found in the text.
"""
@property
def entities(self):
if getattr(self, '_entities', None) is None:
data = self.get_object_data()
self._entities = list(
markup_entity(self.parser,
self.obj_id, p)
for p in range(len(data)))
return self._entities
# }}
# {{ class markup_datum
class markup_datum (markup_object):
"""Base class for entities, attributes, and other data that hang
from other markup objects.
Key attributes (in addition to those of markup_object):
.parent -- the markup object that owns this datum.
"""
def __repr__(self):
return '#<%s tag=%d %s %d..%d>' % (
type(self).__name__, self.obj_id,
self.__reprtag__(),
self.start_pos, self.end_pos)
def __init__(self, parser, id, index):
super(markup_datum, self).__init__(parser, id)
self.obj_index = index
@property
def parent(self):
return self.parser.get_wrapper(self.obj_id)
def get_item_data(self):
data = self.get_object_data()
assert 0 <= self.obj_index < len(data)
return data[self.obj_index]
# }}
# {{ class markup_entity
class markup_entity (markup_datum):
"""Represents an escaped entity."""
@property
def start_pos(self):
kind, u, v = self.get_item_data()
return u
@property
def end_pos(self):
kind, u, v = self.get_item_data()
return v
@property
def source(self):
kind, u, v = self.get_item_data()
return self.parser.get_substring(u, v)
@property
def siblings(self):
return self.parent.entities
# }}
# {{ class markup_attribute
class markup_attribute (markup_datum):
"""Represents an attribute defined on a start tag.
Key attributes (in addition to those of markup_object):
.name -- string giving the name of the attribute
.value -- string giving the content of the attribute.
.name_start_pos -- start position of the name in the source text.
.name_end_pos -- end position of the name in the source text.
.value_start_pos -- start position of the value in the source text.
.value_end_pos -- end position of the value in the source text.
Note that the name and value start/end positions are not the same
as the .start_pos and .end_pos for the overall attribute except in
that .name_start_pos will usually be the same as .start_pos, and
.value_end_pos will usually be the same as .end_pos, modulo the
removal of quotation marks.
"""
def __reprtag__(self):
return 'name=%r' % self.name
def get_item_data(self):
data = self.get_object_data()
u, v, lst = data['attribs']
assert 0 <= self.obj_index < len(lst)
return lst[self.obj_index]
@property
def start_pos(self):
u, v, data = self.get_item_data()
return u
@property
def end_pos(self):
u, v, data = self.get_item_data()
return v
@property
def name(self):
u, v, data = self.get_item_data()
start, end = data['name']
return self.parser.get_substring(start, end)
@property
@unescape
def value(self):
u, v, data = self.get_item_data()
kind, start, end = data['value']
raw = self.parser.get_substring(start, end)
if kind == 'str':
return raw[1:-1] # remove quotation marks
elif kind == 'ustr':
return raw[1:] # remove left quotation mark
else:
return raw
@property
def name_start_pos(self):
u, v, data = self.get_item_data()
start, end = data['name']
return start
@property
def name_end_pos(self):
u, v, data = self.get_item_data()
start, end = data['name']
return end
@property
def value_start_pos(self):
u, v, data = self.get_item_data()
kind, start, end = data['value']
return start
@property
def value_end_pos(self):
u, v, data = self.get_item_data()
kind, start, end = data['value']
return end
@property
def siblings(self):
return self.parent.attributes
# }}
# {{ class markup_parser
class markup_parser (object):
"""A tag-soup parser for HTML and other similar markup langauges.
"""
# A set of tag names that should be considered singular by default,
# even if they are not syntactically singular.
SINGULAR_TAGS = set()
# A map of tag names to sets of tag names. If an open tag in the
# set is seen while the key tag is open, the key tag will be implicitly
# closed before the open tag is processed.
CLOSED_BY_OPEN = {}
# A map of tag names to sets of tag names. If a close tag in the set
# is seen while the key tag is open, the key tag will be implicitly
# closed before the close tag is processed.
CLOSED_BY_CLOSE = {}
# A set of tag names that should be implicitly closed if the end
# of input is discovered while the tag is open.
CLOSED_BY_EOF = set()
# A map of entity names to values, used when reading attribute values.
ENTITY_NAME_MAP = dict(amp = '&', lt = '<', gt = '>', quot = '"')
def __init__(self, src = None):
"""Initialize a new parser with the specified source text. If
the source is omitted, input may be given to the .parse()
method.
"""
self.input = None
if src is not None:
self.parse(src)
def parse(self, src):
"""Parse the specified source text. Modifies the internal
data structures of the parser, discarding any previous state.
"""
if not is_string(src):
raise TypeError("input must be a string")
self.input = src
scanner = markup_scanner(src)
self.objects = list(scanner.scan())
self.wrappers = self.make_wrappers()
self.find_partners()
def feed(self, src):
"""Add the specified source text to the existing parse.
Extends the internal data structures of the parser.
"""
if self.input is None:
self.parse(src)
else:
# Force the scanner to reconsider any trailing text in
# light of the new data we are about to add.
sp = 0
while self.wrappers:
if not self.wrappers[-1].obj_type.endswith('_tag'):
sp = self.wrappers[-1].start_pos
self.wrappers.pop()
self.objects.pop()
else:
break
op = len(self.wrappers)
self.input += src
scanner = markup_scanner(self.input)
self.objects.extend(scanner.scan(start_pos = sp))
self.wrappers.extend(self.make_wrappers(start_pos = op))
self.find_partners()
def find(self, **opts):
"""Returns a generator that produces the objects matching the
specified criteria, in nondecreasing order of start position.
Search criteria are specified with keyword arguments chosen
from the following set.
obj_type -- select objects whose object type matches.
See markup_scanner for possible values.
tag_name -- select tags whose tag name matches.
has_attr -- select tags which have the named attribute.
has_attrs -- as has_attr, but with multiple attributes.
attr_match -- select tags whose named attribute(s) match.
source -- select objects whose source text matches.
innersource -- select objects whose innersource text matches.
predicate -- f(obj) => bool select objects for which the given
function returns a true value.
The above names may be prefixed with 'not_' to negate the sense
of the individual comparison described, e.g., 'not_has_attr'.
match_any -- if true, at least one criterion must match.
match_all -- if true, all criteria must match.
search_after -- consider objects occurring after this object.
search_before -- consider objects occurring before this object.
search_inside -- consider the contents of this object.
case_matters -- if true, string comparisons are case sensitive.
negate -- if true, negate the sense of each criterion.
For matches, string arguments are compared for equality; such
comparisons are case-insensitive by default, but can be made
case-sensitive by setting case_matters to true. If a regular
expression object is given, its search method is used. If a
sequence of string/regex objects are given, each is tried in
sequence and any match selects the object. If a function is
given, it is called with the test key, and returns a true
value to select that key, false to reject.
For attr_match, specify (name, expr) to test a single attribute.
For multiple attributes, specify a list of (name, expr) tuples
or a dictionary with name keys and expr values. The values
None and True match if the attribute exists, regardless of its
value; the value False matches if the attribute does not exist.
By default, all criteria must match. Setting match_any to true
or match_all to false means at least one criterion must match.
If you set both to true, match_all takes priority.
If no criteria are specified, all objects are selected when
match_all is true; otherwise no objects are selected.
"""
negate = opts.get('negate', False)
match_any = opts.get('match_any', negate)
match_all = opts.get('match_all', not match_any)
case_matters = opts.get('case_matters', False)
def gsense(f):
def w(value, exp):
if negate: return not f(value, exp)
else: return f(value, exp)
return w
def tsense(f):
def w(candidate, test_name, test_value):
if test_name.startswith('not_'):
return not f(candidate, test_name[4:], test_value)
else:
return f(candidate, test_name, test_value)
return w
@gsense
def do_match(value, exp):
if is_string(exp):
if case_matters:
return value == exp
else:
return value.lower() == exp.lower()
elif hasattr(exp, 'search') and is_callable(exp.search):
return exp.search(value)
elif is_callable(exp):
return bool(exp(value))
elif exp in (None, True):
return True # the attribute exists
elif exp is False:
return False # no such attribute
# If we get here, the only other allowable option is
# for exp to be a collection of things to try. If that
# fails, the TypeError is propagated back to the caller.
for e in iter(exp):
if do_match(value, e):
return True
else:
return False
all_tests = []
# Order matters here...
for key in ('obj_type', 'tag_name', 'has_attr', 'has_attrs',
'attr_match', 'source', 'innersource', 'predicate'):
for k in (key, 'not_' + key):
if k in opts: all_tests.append((k, opts[k]))
candidates = []
if 'search_after' in opts:
pos = opts['search_after'].obj_id
candidates.extend(self.wrappers[pos + 1:])
if 'search_before' in opts:
pos = opts['search_before'].obj_id
candidates.extend(self.wrappers[:pos])
if 'search_inside' in opts:
candidates.extend(opts['search_inside'].contents)
if candidates:
candidates.sort(key = lambda obj: obj.start_pos)
else:
candidates = self.wrappers
@tsense
def t_obj_type(candidate, test_name, test_value):
return do_match(candidate.obj_type, test_value)
@tsense
def t_tag_name(candidate, test_name, test_value):
return (hasattr(candidate, 'tag_name') and
do_match(candidate.tag_name, test_value))
@tsense
def t_has_attr(candidate, test_name, test_value):
if test_name == 'has_attr':
test_value = [test_value]
if not hasattr(candidate, 'keys'):
return False
for tv in test_value:
for key in candidate.keys():
if do_match(key, tv):
break
else:
return False
else:
return True
@tsense
def t_attr_match(candidate, test_name, test_value):
if not hasattr(candidate, 'getattr'):
return False
elif isinstance(test_value, tuple):
tv = [test_value]
elif isinstance(test_value, dict):
tv = test_value.items()
for a_name, a_exp in tv:
try:
if not do_match(candidate[a_name].value, a_exp):
return False
except KeyError:
if a_exp is not False:
return False
else:
return True
@tsense
def t_predicate(candidate, test_name, test_value):
return test_value(candidate)
@tsense
def t_source(candidate, test_name, test_value):
return (hasattr(candidate, test_name) and
do_match(candidate.source, test_value))
def t_fail(candidate, test_name, test_value):
return False
test_map = dict(
obj_type = t_obj_type,
not_obj_type = t_obj_type,
tag_name = t_tag_name,
not_tag_name = t_tag_name,
has_attr = t_has_attr,
not_has_attr = t_has_attr,
has_attrs = t_has_attr,
not_has_attrs = t_has_attr,
attr_match = t_attr_match,
not_attr_match = t_attr_match,
source = t_source,
not_source = t_source,
innersource = t_source,
not_innersource = t_source,
predicate = t_predicate,
not_predicate = t_predicate,
)
for candidate in candidates:
ok = match_all
for test_name, test_value in all_tests:
tfunc = test_map.get(test_name, t_fail)
ok = tfunc(candidate, test_name, test_value)
if not match_all and ok: break
if match_all and not ok: break
if ok:
yield candidate
def first(self, **opts):
"""As .find(...), but returns only the first matching result,
or throws KeyError to indicate nothing was found.
"""
try:
return next(self.find(**opts))
except StopIteration:
raise KeyError("no matching objects")
def __len__(self):
"""Returns the number of tokens found in the input."""
return len(self.wrappers)
def __getitem__(self, itm):
"""Index into the parser's token list."""
return self.wrappers[itm]
def __iter__(self):
"""Iterate over the tokens in nondecreasing order of start position."""
return iter(self.wrappers)
def get_substring(self, start, end):
"""Return the substring of the parser's input from start up to
but not including end. Negative offsets count from the end of
the input.
"""
return self.input[start:end]
def get_linepos(self, pos):
"""Given an integer offset, return a tuple (line, col) giving
the line number and column number of the character at that
position in the parser's current input.
Lines and columns are numbered from 1, so subtract one from
each value if you want 0-indexed values.
"""
t = self.input
if pos < 0 or pos >= len(t):
raise ValueError("position %d not in 0..%d" % (pos, len(t)))
lnum = cnum = 0
nl = True
for c in range(pos + 1):
if nl:
lnum += 1
cnum = 1
nl = False
else:
cnum += 1
if t[c] == '\n':
nl = True
return lnum, cnum
# --- Private methods ----------------------------------------------
def make_wrappers(self, start_pos = 0):
# Build a wrapper object for each token; the end user will see
# these instead of the tuples.
wraps = [None] * (len(self.objects) - start_pos)
pos = start_pos
while pos < len(self.objects):
tag, u, v, data = self.objects[pos]
if tag in ('start_tag', 'self_tag'):
wraps[pos - start_pos] = markup_atag(self, pos)
elif tag == 'end_tag':
wraps[pos - start_pos] = markup_tag(self, pos)
elif tag == 'text':
wraps[pos - start_pos] = markup_text(self, pos)
elif tag in ('dir', 'udir'):
wraps[pos - start_pos] = markup_dir(self, pos)
else:
wraps[pos - start_pos] = markup_object(self, pos)
pos += 1
return wraps
def find_partners(self):
"""Identify partners for start and end tags, if possible.
Each end tag is matched with the closest matching start tag
that has not already been matched. Tags matched in this way
are matched symmetrically.
In addition, a start tag t whose name is in SINGULAR_TAGS will
not be matched, even if a close tag exists for it.
In addition, a start tag t will close any open tag u for which
t is in CLOSED_BY_OPEN[u].
In addition, an end tag t will close any open tag u for which
t is in CLOSED_BY_CLOSE[u]
In addition, EOF will close any open tag t for which t is in
CLOSED_BY_EOF.
Tags matched by the latter three rules are asymmetric matches;
the open tag u has t as a partner, but not vice versa.
All partnership relationships can be manually overridden by
assigning to or deleting the .partner property of a given
object. Only tags are assigned partners by default, but any
object can be given a partner by manual assignment.
"""
# When we find the end of a tag, we preemptively define that
# tag as the parent of all unclaimed tags inside it, that are
# either single or properly paired. This saves an expensive
# computation later.
#
# Unbalanced tags are NOT claimed.
def parentify(lo, hi, obj):
for p in range(lo, hi):
t = self.wrappers[p]
if (not hasattr(t, '_parent') and
(t.obj_type not in ('start_tag', 'end_tag') or
(p in matches and lo <= matches[p] < hi))):
t._parent = obj
queue = [] ; matches = {}
for pos, obj in enumerate(self.wrappers):
if (obj.obj_type == 'start_tag' and
obj.tag_name.lower() not in self.SINGULAR_TAGS):
# If the new start tag implicitly ends its predecessor, make
# a one-way link from the predecessor to this tag.
for q in reversed(range(len(queue))):
qpos, qobj = queue[q]
if (obj.tag_name.lower() in
self.CLOSED_BY_OPEN.get(qobj.tag_name.lower(), ())):
matches[qpos] = pos
parentify(qpos + 1, pos, qobj)
queue.pop(q)
queue.append((pos, obj))
elif obj.obj_type == 'end_tag':
for q in reversed(range(len(queue))):
qpos, qobj = queue[q]
# If we find the open tag to balance, make a two-way link
# between the two, and clear the open tag from the queue.
if qobj.tag_name == obj.tag_name:
matches[qpos] = pos
matches[pos] = qpos
parentify(qpos + 1, pos, qobj)
queue.pop(q)
break
# If the ending tag implicitly ends its predecessor, make
# a one-way link from the predecessor to this tag.
elif (obj.tag_name.lower() in
self.CLOSED_BY_CLOSE.get(qobj.tag_name.lower(), ())):
matches[qpos] = pos
parentify(qpos + 1, pos, qobj)
queue.pop(q)
elif obj.obj_type == 'eof':
# Close any remaining tags that are still open at end
# of input and which are deemed worthy of that action.
for q in reversed(range(len(queue))):
qpos, qobj = queue[q]
if qobj.tag_name.lower() in self.CLOSED_BY_EOF:
matches[qpos] = pos
parentify(qpos + 1, pos, qobj)
queue.pop(q)
self.matches = matches
def get_object(self, id):
assert 0 <= id < len(self.objects)
return self.objects[id]
def get_wrapper(self, id):
assert 0 <= id < len(self.wrappers)
return self.wrappers[id]
def set_partner(self, src_id, dst_id, mutual = False):
m = self.matches
if dst_id is None:
m.pop(src_id, None)
else:
m[src_id] = dst_id
if mutual:
m[dst_id] = src_id
# }}
# {{ class html_parser
class html_parser (markup_parser):
"""A specialization of markup_parser that knows some of the tag
rules for HTML.
"""
SINGULAR_TAGS = set((
'br', 'embed', 'hr', 'img', 'input', 'link', 'param'))
CLOSED_BY_OPEN = {
'p': set(('p',)),
'li': set(('li',)),
'dt': set(('dt', 'dd')),
'dd': set(('dt', 'dd')),
'th': set(('th', 'td')),
'td': set(('th', 'td')),
'thead': set(('tbody', 'tfoot')),
'tbody': set(('tbody', 'tfoot')),
'head': set(('body',)),
}
CLOSED_BY_CLOSE = {
'li': set(('ul', 'ol')),
'dt': set(('dl',)),
'dd': set(('dl',)),
'th': set(('table',)),
'td': set(('table',)),
'tbody': set(('table',)),
'tfoot': set(('table',)),
'body': set(('html',)),
}
CLOSED_BY_EOF = set(('body', 'html'))
# }}
__all__ = (
'markup_scanner', 'markup_parser',
'markup_object', 'markup_tag', 'markup_text', 'markup_dir',
'markup_entity', 'markup_attribute',
'parse_error',
'html_parser')
# Here there be dragons