## ## Name: scrubby.py ## Purpose: A tag-soup parser for HTML and other similar markup languages. ## Author: M. J. Fromberger ## ## Copyright (c) 2009-2010 Michael J. Fromberger. All Rights Reserved. ## __version__ = "1.021" import collections, functools, re, sys # {{ Compatibility definitions if sys.version_info[0] < 3: def is_string(obj): return isinstance(obj, basestring) else: def is_string(obj): return isinstance(obj, str) def is_callable(obj): return isinstance(obj, collections.Callable) # }} # {{ Exception classes class parse_error (Exception): pass class parse_failed (parse_error): pass # }} # {{ @bounded(meth) def bounded(meth): """Convert a scanner method taking a starting position and returning a tuple of (tag, endpos), so that the resulting method returns (tag, startpos, endpos). Abstracts the common bookkeeping for several of the methods of markup_scanner. """ def wrapper(self, start, *args, **kw): res = meth(self, start, *args, **kw) if res is None: raise parse_failed(meth.__name__, start, *args) return res[0], start, res[1] return functools.update_wrapper(wrapper, meth) # }} # {{ @unescape(meth) def unescape(meth): def wrapper(self, *args, **kw): esc = self.parser.ENTITY_NAME_MAP def replace_entity(m): return esc.get(m.group('key'), m.group(0)) value = meth(self, *args, **kw) return re.sub(r'&(?P\w+);', replace_entity, value) return functools.update_wrapper(wrapper, meth) # }} # {{ class markup_scanner class markup_scanner (object): """Implements a permissive lexical analyzer for HTML and other similar markup languages. Usage: s = markup_scanner(input_string) for tag, start_pos, end_pos, data in s.scan(): process(tag, start_pos, end_pos, data) The components of each result are: tag -- a string describing what the token is. start_pos -- the starting offset of the token. end_pos -- the offset one past the end of the token. data -- token-specific data, or None. The tag values are: 'eof' -- end of input. 'com', 'ucom' -- a comment (unterminated if 'ucom'). 'dir', 'udir' -- a markup directive (e.g., ). 'start_tag' -- an opening tag (e.g., ). 'self_tag' -- a self-delimited tag (e.g.,
). 'end_tag' -- a closing tag (e.g., ). 'text' -- running text. The data values are: 'start_tag' -- a dictionary giving tag name and attributes. 'self_tag' -- a dictionary giving tag name and attributes. 'text' -- a list of entities found in the text. 'dir', 'udir' -- a dictionary giving directive name and content. Tag names ('name') are given as tuples of (start_pos, end_pos). Tag attributes ('attribs') are given as a list of tuples, each having one of the following forms: ('none', 0, 0) -- for bare attributes. ('name', s, e) -- for unquoted attribute values. ('str', s, e) -- for quoted attribute values, terminated. ('ustr', s, e) -- for quoted attribute values, unterminated. All data values not described above will be None. A string or comment value is "terminated" if its end marker occurs before the end of input; otherwise it is unterminated. """ def __init__(self, src): """Create a new scanner using the string src as input.""" if not is_string(src): raise TypeError("input must be a string") self.input = src def scan(self, start_pos = 0, **opts): """Produces a generator for each token found in the input, starting at the specified starting offset. Thread-safe. Throws parse_error if for some reason a token could not be consumed from the input. This should not happen under ordinary circumstances. """ end = start_pos otag, ostart, oend, odata = '', 0, 0, None while True: tag, start, end, data = self.parse_one(end, **opts) # Fold multiple consecutive text tokens into one. if tag == otag == 'text': oend = end odata.extend(data) else: if otag: yield otag, ostart, oend, odata otag, ostart, oend, odata = tag, start, end, data if tag == 'eof': break if otag: yield otag, ostart, oend, odata # --- Private methods ---------------------------------------------- # Conventions: # scan_* methods return a tuple of "tag", start, end # parse_* methods return a tuple of "tag", start, end, data # # All methods should return a valid tuple or throw parse_failed. # The @bounded decorator can help you keep track of this. def get_char(self, p): return self.input[p] if 0 <= p < len(self.input) else '' @bounded def scan_string(self, p): t = self.input q = t[p] p += 1 while self.get_char(p) not in (q, ''): p += 1 if p < len(t): return 'str', p + 1 else: return 'ustr', p # Unterminated string @bounded def scan_name(self, p): ig, u, v = self.scan_unquoted(p, '' + end_marks while True: c = self.get_char(p) if (not c or c in ends or c.isspace() or ord(c) < 32): break p += 1 ok = s <= p if allow_blank else s < p if ok: return 'unq', p @bounded def scan_space(self, p): while self.get_char(p).isspace(): p += 1 return 'ws', p @bounded def scan_literal(self, p, value): end = p + len(value) if self.input[p:end] == value: return 'lit', end @bounded def scan_entity(self, p): while True: c = self.get_char(p) if not c or c.isspace(): return 'uent', p elif c == ';': return 'ent', p + 1 p += 1 @bounded def scan_comment(self, p): ig, u, v = self.scan_literal(p, '': return 'com', v + 3 v += 1 # Unterminated comment return 'ucom', v def parse_attrib(self, p, **opts): ntag, ns, ne = self.scan_name(p) vtag, vs, ve = 'none', ne, ne try: ig, u, v = self.scan_space(ne) ig, u, v = self.scan_literal(v, '=') ig, u, v = self.scan_space(v) if self.get_char(v) in ('"', "'"): vtag, vs, ve = self.scan_string(v) elif opts.get('allow_unquoted', True): end_marks = '=' + opts.get('end_marks', '') vtag, vs, ve = self.scan_unquoted( v, end_marks, opts.get('allow_blank', True)) except parse_failed: pass return 'attr', p, ve, dict( name = (ns, ne), value = (vtag, vs, ve)) def parse_attribs(self, p, **opts): data = [] v = p while True: ig, u, v = self.scan_space(v) try: ig, u, v, info = self.parse_attrib(v, **opts) data.append((u, v, info)) except parse_failed: break return 'attrs', p, v, data def parse_starttag(self, p, **opts): ig, u, v = self.scan_literal(p, '<') ntag, ns, ne = self.scan_name(v) atag, atts, atte, data = self.parse_attribs(ne, **opts) kind = 'start_tag' if self.get_char(atte) == '/': kind = 'self_tag' atte += 1 if self.get_char(atte) != '>': raise parse_failed("parse_starttag", p) return kind, p, atte + 1, dict( name = (ns, ne), attribs = (atts, atte, data)) def parse_endtag(self, p, **opts): ig, u, v = self.scan_literal(p, '': raise parse_failed("parse_endtag", p) return 'end_tag', p, v + 1, dict( name = (ns, ne)) def parse_directive(self, p, end_mark, **opts): ig, ns, ne = self.scan_name(p + 2) # skip past = len(t): return 'eof', p, p, None elif t[p] == '<': np = self.get_char(p + 1) try: if np == '!': try: tag, u, v = self.scan_comment(p) return tag, u, v, None except parse_failed: pass return self.parse_directive(p, '>', **opts) elif np == '?': return self.parse_directive(p, '?>', **opts) elif np == '/': return self.parse_endtag(p, **opts) else: return self.parse_starttag(p, **opts) except parse_failed: c += 1 # skip past an unconsumed "<" # Falling through to here means we're in text data while c < len(t) and t[c] != '<': c += 1 return 'text', p, c, self.find_entities(p, c) # }} # {{ class markup_object class markup_object (object): """Represents a markup object such as a tag, a comment, or a run of text discovered by the markup parser. Key attributes: .obj_type -- string describing the type of markup object. .start_pos -- offset of the beginning of the object in the source. .end_pos -- offset of the end of the object in the source. .source -- the literal text of the object. .next_obj -- the next object in the input (or None). .prev_obj -- the previous object in the input (or None). .next_sib -- the right sibling of this object (or None). .prev_sib -- the left sibling of this object (or None). .parent -- the smallest enclosing tag (or None). .partner -- the matching tag (for start/end tags). .contents -- list of objects between this and its partner. .siblings -- list of all the parent's children. .path -- the tag path from the root to this object. .linepos -- the (line, column) of this object in the source. The parent of an object is defined as the closest tag of which the object is a child, if any. The .partner attribute is writable; you may set the partner of any object to any other object. The partner relationship is not mutual; object A may have partner B, but B may not have partner A. Setting partner to None, or deleting partner, removes the relationship. Partnerships between start/end tags are computed by the parser, but you are free to manipulate them if you wish. The .contents of are those objects occurring between (but not including) the object and its partner; this list is empty for objects with no partner. """ def __init__(self, parser, id): self.parser = parser self.obj_id = id def __reprtag__(self): return '|' def __repr__(self): return '#<%s type=%r %s %d..%d>' % ( type(self).__name__, self.obj_type, self.__reprtag__(), self.start_pos, self.end_pos) def __str__(self): return self.source @property def obj_type(self): obj = self.parser.get_object(self.obj_id) return obj[0] @property def start_pos(self): obj = self.parser.get_object(self.obj_id) return obj[1] @property def end_pos(self): obj = self.parser.get_object(self.obj_id) return obj[2] @property def source(self): return self.parser.get_substring( self.start_pos, self.end_pos) @property def next_obj(self): if self.obj_id < len(self.parser) - 1: return self.parser.get_wrapper(self.obj_id + 1) else: return None @property def prev_obj(self): if self.obj_id <= 0: return None else: return self.parser.get_wrapper(self.obj_id - 1) @property def next_sib(self): sibs = self.siblings if sibs: me = sibs.index(self) if me < len(sibs) - 1: return sibs[me + 1] return None @property def prev_sib(self): sibs = self.siblings if sibs: me = sibs.index(self) if me > 0: return sibs[me - 1] return None @property def parent(self): if hasattr(self, '_parent'): return self._parent # This is just one-dimensional breadth-first search, # saturating at the ends of the line. pre = self.obj_id - 1 post = self.obj_id + 1 while pre >= 0 or post < len(self.parser): if pre >= 0: t = self.parser.get_wrapper(pre) if (t.obj_type == 'start_tag' and self in t.contents and (self.partner is None or self.partner in t.contents)): self._parent = t ; return t pre -= 1 if post < len(self.parser): t = self.parser.get_wrapper(post) if (t.obj_type == 'end_tag' and self in t.contents and (self.partner is None or self.partner in t.contents)): self._parent = t.partner return self._parent post += 1 # If we don't find a parent, we won't cache it, since adding # source later might create a viable parent. return None @property def siblings(self): if self.parent is None: return list(s for s in self.parser.wrappers if s.parent is None) else: return self.parent.children @property def partner(self): m = self.parser.matches w = self.parser.wrappers if self.obj_id in m: return w[m[self.obj_id]] else: return None @partner.setter def partner(self, other): self.parser.set_partner(self.obj_id, other and other.obj_id) @partner.deleter def partner(self): self.parser.set_partner(self.obj_id, None) @property def contents(self): p = self.partner if p is not None: lo = min(self.obj_id, p.obj_id) hi = max(self.obj_id, p.obj_id) return list(self.parser.get_wrapper(id) for id in range(lo + 1, hi)) return [] # default, no contents @property def path(self): path = [] cur = self while cur is not None: path.append(cur) cur = cur.parent path.reverse() return path @property def linepos(self): return self.parser.get_linepos(self.start_pos) def get_object_data(self): obj = self.parser.get_object(self.obj_id) return obj[3] # }} # {{ class markup_dir class markup_dir (markup_object): """Represents an SGML or XML style processing directive. Key attributes (in addition to those of markup_object): .dir_name -- a string giving the directive name. .innersource -- the source between the name and the end marker. """ @property def dir_name(self): data = self.get_object_data() start, end = data['name'] return self.parser.get_substring(start, end) @property def innersource(self): data = self.get_object_data() start, end = data['content'] return self.parser.get_substring(start, end) # }} # {{ class markup_tag class markup_tag (markup_object): """Represents a start tag, an end tag, or a self-contained tag. Key attributes (in addition to those of markup_object): .tag_name -- string giving the name of the tag. .attributes -- a list of markup_attribute objects (for start tags). .children -- a list of direct descendants of this tag. .innersource -- the source text between this tag and its partner. The contents of a start tag are defined as those objects fully contained between the end point of the start tag and the start point of its partner, if any; if no partner is defined, the tag is considered to have no contents. The contents of an end tag are the contents of its corresponding partner, if any; if no partner is defined, the tag is considered to have no contents. Self-delimiting tags do not have contents. The children of a tag are those contents of the tag whose parent is the tag itself (i.e., only direct descendants are children). """ def __reprtag__(self): return 'name=%r' % self.tag_name def find(self, **opts): """As markup_parser.find(...) but considering only the contents of this tag, if any. """ return self.parser.find(search_inside = self, **opts) def first(self, **opts): """As markup_parser.first(...) but considering only the contents of this tag, if any. """ return self.parser.first(search_inside = self, **opts) @property def tag_name(self): data = self.get_object_data() start, end = data['name'] return self.parser.get_substring(start, end) @property def children(self): return list(obj for obj in self.contents if obj.parent is self) @property def innersource(self): if self.obj_type == 'start_tag': p = self.partner if p is not None: return self.parser.get_substring(self.end_pos, p.start_pos) elif self.obj_type == 'end_tag': p = self.partner if p is not None: return p.innersource return '' # default, no content # }} # {{ class markup_atag class markup_atag (markup_tag): """Represents a start tag or self-contained tag, which may have attributes associated with it. Key attributes (in addition to those of markup_tag): .attributes -- a list of markup_attribute objects. """ def __getitem__(self, key): return self.getattr(key) def __contains__(self, key): return key.lower() in self.keys() @property def attributes(self): if getattr(self, "_attribs", None) is None: data = self.get_object_data() u, v, elts = data['attribs'] self._attribs = list( markup_attribute(self.parser, self.obj_id, p) for p in range(len(elts))) return self._attribs def keys(self): """Return a list of the attribute names defined on this tag.""" return list(s.name.lower() for s in self.attributes) def getattr(self, name, *default): """Get the attribute whose name is given. If absent, and a default value is given, the default value is returned; otherwise KeyError is raised. """ for attr in self.attributes: if attr.name.lower() == name.lower(): return attr else: if default: return default[0] raise KeyError(name) # }} # {{ class markup_text class markup_text (markup_object): """Defines a run of text found in the markup. Key attributes (in addition to those of markup_object): .entities -- a list of markup_entity objects found in the text. """ @property def entities(self): if getattr(self, '_entities', None) is None: data = self.get_object_data() self._entities = list( markup_entity(self.parser, self.obj_id, p) for p in range(len(data))) return self._entities # }} # {{ class markup_datum class markup_datum (markup_object): """Base class for entities, attributes, and other data that hang from other markup objects. Key attributes (in addition to those of markup_object): .parent -- the markup object that owns this datum. """ def __repr__(self): return '#<%s tag=%d %s %d..%d>' % ( type(self).__name__, self.obj_id, self.__reprtag__(), self.start_pos, self.end_pos) def __init__(self, parser, id, index): super(markup_datum, self).__init__(parser, id) self.obj_index = index @property def parent(self): return self.parser.get_wrapper(self.obj_id) def get_item_data(self): data = self.get_object_data() assert 0 <= self.obj_index < len(data) return data[self.obj_index] # }} # {{ class markup_entity class markup_entity (markup_datum): """Represents an escaped entity.""" @property def start_pos(self): kind, u, v = self.get_item_data() return u @property def end_pos(self): kind, u, v = self.get_item_data() return v @property def source(self): kind, u, v = self.get_item_data() return self.parser.get_substring(u, v) @property def siblings(self): return self.parent.entities # }} # {{ class markup_attribute class markup_attribute (markup_datum): """Represents an attribute defined on a start tag. Key attributes (in addition to those of markup_object): .name -- string giving the name of the attribute .value -- string giving the content of the attribute. .name_start_pos -- start position of the name in the source text. .name_end_pos -- end position of the name in the source text. .value_start_pos -- start position of the value in the source text. .value_end_pos -- end position of the value in the source text. Note that the name and value start/end positions are not the same as the .start_pos and .end_pos for the overall attribute except in that .name_start_pos will usually be the same as .start_pos, and .value_end_pos will usually be the same as .end_pos, modulo the removal of quotation marks. """ def __reprtag__(self): return 'name=%r' % self.name def get_item_data(self): data = self.get_object_data() u, v, lst = data['attribs'] assert 0 <= self.obj_index < len(lst) return lst[self.obj_index] @property def start_pos(self): u, v, data = self.get_item_data() return u @property def end_pos(self): u, v, data = self.get_item_data() return v @property def name(self): u, v, data = self.get_item_data() start, end = data['name'] return self.parser.get_substring(start, end) @property @unescape def value(self): u, v, data = self.get_item_data() kind, start, end = data['value'] raw = self.parser.get_substring(start, end) if kind == 'str': return raw[1:-1] # remove quotation marks elif kind == 'ustr': return raw[1:] # remove left quotation mark else: return raw @property def name_start_pos(self): u, v, data = self.get_item_data() start, end = data['name'] return start @property def name_end_pos(self): u, v, data = self.get_item_data() start, end = data['name'] return end @property def value_start_pos(self): u, v, data = self.get_item_data() kind, start, end = data['value'] return start @property def value_end_pos(self): u, v, data = self.get_item_data() kind, start, end = data['value'] return end @property def siblings(self): return self.parent.attributes # }} # {{ class markup_parser class markup_parser (object): """A tag-soup parser for HTML and other similar markup langauges. """ # A set of tag names that should be considered singular by default, # even if they are not syntactically singular. SINGULAR_TAGS = set() # A map of tag names to sets of tag names. If an open tag in the # set is seen while the key tag is open, the key tag will be implicitly # closed before the open tag is processed. CLOSED_BY_OPEN = {} # A map of tag names to sets of tag names. If a close tag in the set # is seen while the key tag is open, the key tag will be implicitly # closed before the close tag is processed. CLOSED_BY_CLOSE = {} # A set of tag names that should be implicitly closed if the end # of input is discovered while the tag is open. CLOSED_BY_EOF = set() # A map of entity names to values, used when reading attribute values. ENTITY_NAME_MAP = dict(amp = '&', lt = '<', gt = '>', quot = '"') def __init__(self, src = None): """Initialize a new parser with the specified source text. If the source is omitted, input may be given to the .parse() method. """ self.input = None if src is not None: self.parse(src) def parse(self, src): """Parse the specified source text. Modifies the internal data structures of the parser, discarding any previous state. """ if not is_string(src): raise TypeError("input must be a string") self.input = src scanner = markup_scanner(src) self.objects = list(scanner.scan()) self.wrappers = self.make_wrappers() self.find_partners() def feed(self, src): """Add the specified source text to the existing parse. Extends the internal data structures of the parser. """ if self.input is None: self.parse(src) else: # Force the scanner to reconsider any trailing text in # light of the new data we are about to add. sp = 0 while self.wrappers: if not self.wrappers[-1].obj_type.endswith('_tag'): sp = self.wrappers[-1].start_pos self.wrappers.pop() self.objects.pop() else: break op = len(self.wrappers) self.input += src scanner = markup_scanner(self.input) self.objects.extend(scanner.scan(start_pos = sp)) self.wrappers.extend(self.make_wrappers(start_pos = op)) self.find_partners() def find(self, **opts): """Returns a generator that produces the objects matching the specified criteria, in nondecreasing order of start position. Search criteria are specified with keyword arguments chosen from the following set. obj_type -- select objects whose object type matches. See markup_scanner for possible values. tag_name -- select tags whose tag name matches. has_attr -- select tags which have the named attribute. has_attrs -- as has_attr, but with multiple attributes. attr_match -- select tags whose named attribute(s) match. source -- select objects whose source text matches. innersource -- select objects whose innersource text matches. predicate -- f(obj) => bool select objects for which the given function returns a true value. The above names may be prefixed with 'not_' to negate the sense of the individual comparison described, e.g., 'not_has_attr'. match_any -- if true, at least one criterion must match. match_all -- if true, all criteria must match. search_after -- consider objects occurring after this object. search_before -- consider objects occurring before this object. search_inside -- consider the contents of this object. case_matters -- if true, string comparisons are case sensitive. negate -- if true, negate the sense of each criterion. For matches, string arguments are compared for equality; such comparisons are case-insensitive by default, but can be made case-sensitive by setting case_matters to true. If a regular expression object is given, its search method is used. If a sequence of string/regex objects are given, each is tried in sequence and any match selects the object. If a function is given, it is called with the test key, and returns a true value to select that key, false to reject. For attr_match, specify (name, expr) to test a single attribute. For multiple attributes, specify a list of (name, expr) tuples or a dictionary with name keys and expr values. The values None and True match if the attribute exists, regardless of its value; the value False matches if the attribute does not exist. By default, all criteria must match. Setting match_any to true or match_all to false means at least one criterion must match. If you set both to true, match_all takes priority. If no criteria are specified, all objects are selected when match_all is true; otherwise no objects are selected. """ negate = opts.get('negate', False) match_any = opts.get('match_any', negate) match_all = opts.get('match_all', not match_any) case_matters = opts.get('case_matters', False) def gsense(f): def w(value, exp): if negate: return not f(value, exp) else: return f(value, exp) return w def tsense(f): def w(candidate, test_name, test_value): if test_name.startswith('not_'): return not f(candidate, test_name[4:], test_value) else: return f(candidate, test_name, test_value) return w @gsense def do_match(value, exp): if is_string(exp): if case_matters: return value == exp else: return value.lower() == exp.lower() elif hasattr(exp, 'search') and is_callable(exp.search): return exp.search(value) elif is_callable(exp): return bool(exp(value)) elif exp in (None, True): return True # the attribute exists elif exp is False: return False # no such attribute # If we get here, the only other allowable option is # for exp to be a collection of things to try. If that # fails, the TypeError is propagated back to the caller. for e in iter(exp): if do_match(value, e): return True else: return False all_tests = [] # Order matters here... for key in ('obj_type', 'tag_name', 'has_attr', 'has_attrs', 'attr_match', 'source', 'innersource', 'predicate'): for k in (key, 'not_' + key): if k in opts: all_tests.append((k, opts[k])) candidates = [] if 'search_after' in opts: pos = opts['search_after'].obj_id candidates.extend(self.wrappers[pos + 1:]) if 'search_before' in opts: pos = opts['search_before'].obj_id candidates.extend(self.wrappers[:pos]) if 'search_inside' in opts: candidates.extend(opts['search_inside'].contents) if candidates: candidates.sort(key = lambda obj: obj.start_pos) else: candidates = self.wrappers @tsense def t_obj_type(candidate, test_name, test_value): return do_match(candidate.obj_type, test_value) @tsense def t_tag_name(candidate, test_name, test_value): return (hasattr(candidate, 'tag_name') and do_match(candidate.tag_name, test_value)) @tsense def t_has_attr(candidate, test_name, test_value): if test_name == 'has_attr': test_value = [test_value] if not hasattr(candidate, 'keys'): return False for tv in test_value: for key in candidate.keys(): if do_match(key, tv): break else: return False else: return True @tsense def t_attr_match(candidate, test_name, test_value): if not hasattr(candidate, 'getattr'): return False elif isinstance(test_value, tuple): tv = [test_value] elif isinstance(test_value, dict): tv = test_value.items() for a_name, a_exp in tv: try: if not do_match(candidate[a_name].value, a_exp): return False except KeyError: if a_exp is not False: return False else: return True @tsense def t_predicate(candidate, test_name, test_value): return test_value(candidate) @tsense def t_source(candidate, test_name, test_value): return (hasattr(candidate, test_name) and do_match(candidate.source, test_value)) def t_fail(candidate, test_name, test_value): return False test_map = dict( obj_type = t_obj_type, not_obj_type = t_obj_type, tag_name = t_tag_name, not_tag_name = t_tag_name, has_attr = t_has_attr, not_has_attr = t_has_attr, has_attrs = t_has_attr, not_has_attrs = t_has_attr, attr_match = t_attr_match, not_attr_match = t_attr_match, source = t_source, not_source = t_source, innersource = t_source, not_innersource = t_source, predicate = t_predicate, not_predicate = t_predicate, ) for candidate in candidates: ok = match_all for test_name, test_value in all_tests: tfunc = test_map.get(test_name, t_fail) ok = tfunc(candidate, test_name, test_value) if not match_all and ok: break if match_all and not ok: break if ok: yield candidate def first(self, **opts): """As .find(...), but returns only the first matching result, or throws KeyError to indicate nothing was found. """ try: return next(self.find(**opts)) except StopIteration: raise KeyError("no matching objects") def __len__(self): """Returns the number of tokens found in the input.""" return len(self.wrappers) def __getitem__(self, itm): """Index into the parser's token list.""" return self.wrappers[itm] def __iter__(self): """Iterate over the tokens in nondecreasing order of start position.""" return iter(self.wrappers) def get_substring(self, start, end): """Return the substring of the parser's input from start up to but not including end. Negative offsets count from the end of the input. """ return self.input[start:end] def get_linepos(self, pos): """Given an integer offset, return a tuple (line, col) giving the line number and column number of the character at that position in the parser's current input. Lines and columns are numbered from 1, so subtract one from each value if you want 0-indexed values. """ t = self.input if pos < 0 or pos >= len(t): raise ValueError("position %d not in 0..%d" % (pos, len(t))) lnum = cnum = 0 nl = True for c in range(pos + 1): if nl: lnum += 1 cnum = 1 nl = False else: cnum += 1 if t[c] == '\n': nl = True return lnum, cnum # --- Private methods ---------------------------------------------- def make_wrappers(self, start_pos = 0): # Build a wrapper object for each token; the end user will see # these instead of the tuples. wraps = [None] * (len(self.objects) - start_pos) pos = start_pos while pos < len(self.objects): tag, u, v, data = self.objects[pos] if tag in ('start_tag', 'self_tag'): wraps[pos - start_pos] = markup_atag(self, pos) elif tag == 'end_tag': wraps[pos - start_pos] = markup_tag(self, pos) elif tag == 'text': wraps[pos - start_pos] = markup_text(self, pos) elif tag in ('dir', 'udir'): wraps[pos - start_pos] = markup_dir(self, pos) else: wraps[pos - start_pos] = markup_object(self, pos) pos += 1 return wraps def find_partners(self): """Identify partners for start and end tags, if possible. Each end tag is matched with the closest matching start tag that has not already been matched. Tags matched in this way are matched symmetrically. In addition, a start tag t whose name is in SINGULAR_TAGS will not be matched, even if a close tag exists for it. In addition, a start tag t will close any open tag u for which t is in CLOSED_BY_OPEN[u]. In addition, an end tag t will close any open tag u for which t is in CLOSED_BY_CLOSE[u] In addition, EOF will close any open tag t for which t is in CLOSED_BY_EOF. Tags matched by the latter three rules are asymmetric matches; the open tag u has t as a partner, but not vice versa. All partnership relationships can be manually overridden by assigning to or deleting the .partner property of a given object. Only tags are assigned partners by default, but any object can be given a partner by manual assignment. """ # When we find the end of a tag, we preemptively define that # tag as the parent of all unclaimed tags inside it, that are # either single or properly paired. This saves an expensive # computation later. # # Unbalanced tags are NOT claimed. def parentify(lo, hi, obj): for p in range(lo, hi): t = self.wrappers[p] if (not hasattr(t, '_parent') and (t.obj_type not in ('start_tag', 'end_tag') or (p in matches and lo <= matches[p] < hi))): t._parent = obj queue = [] ; matches = {} for pos, obj in enumerate(self.wrappers): if (obj.obj_type == 'start_tag' and obj.tag_name.lower() not in self.SINGULAR_TAGS): # If the new start tag implicitly ends its predecessor, make # a one-way link from the predecessor to this tag. for q in reversed(range(len(queue))): qpos, qobj = queue[q] if (obj.tag_name.lower() in self.CLOSED_BY_OPEN.get(qobj.tag_name.lower(), ())): matches[qpos] = pos parentify(qpos + 1, pos, qobj) queue.pop(q) queue.append((pos, obj)) elif obj.obj_type == 'end_tag': for q in reversed(range(len(queue))): qpos, qobj = queue[q] # If we find the open tag to balance, make a two-way link # between the two, and clear the open tag from the queue. if qobj.tag_name == obj.tag_name: matches[qpos] = pos matches[pos] = qpos parentify(qpos + 1, pos, qobj) queue.pop(q) break # If the ending tag implicitly ends its predecessor, make # a one-way link from the predecessor to this tag. elif (obj.tag_name.lower() in self.CLOSED_BY_CLOSE.get(qobj.tag_name.lower(), ())): matches[qpos] = pos parentify(qpos + 1, pos, qobj) queue.pop(q) elif obj.obj_type == 'eof': # Close any remaining tags that are still open at end # of input and which are deemed worthy of that action. for q in reversed(range(len(queue))): qpos, qobj = queue[q] if qobj.tag_name.lower() in self.CLOSED_BY_EOF: matches[qpos] = pos parentify(qpos + 1, pos, qobj) queue.pop(q) self.matches = matches def get_object(self, id): assert 0 <= id < len(self.objects) return self.objects[id] def get_wrapper(self, id): assert 0 <= id < len(self.wrappers) return self.wrappers[id] def set_partner(self, src_id, dst_id, mutual = False): m = self.matches if dst_id is None: m.pop(src_id, None) else: m[src_id] = dst_id if mutual: m[dst_id] = src_id # }} # {{ class html_parser class html_parser (markup_parser): """A specialization of markup_parser that knows some of the tag rules for HTML. """ SINGULAR_TAGS = set(( 'br', 'embed', 'hr', 'img', 'input', 'link', 'param')) CLOSED_BY_OPEN = { 'p': set(('p',)), 'li': set(('li',)), 'dt': set(('dt', 'dd')), 'dd': set(('dt', 'dd')), 'th': set(('th', 'td')), 'td': set(('th', 'td')), 'thead': set(('tbody', 'tfoot')), 'tbody': set(('tbody', 'tfoot')), 'head': set(('body',)), } CLOSED_BY_CLOSE = { 'li': set(('ul', 'ol')), 'dt': set(('dl',)), 'dd': set(('dl',)), 'th': set(('table',)), 'td': set(('table',)), 'tbody': set(('table',)), 'tfoot': set(('table',)), 'body': set(('html',)), } CLOSED_BY_EOF = set(('body', 'html')) # }} __all__ = ( 'markup_scanner', 'markup_parser', 'markup_object', 'markup_tag', 'markup_text', 'markup_dir', 'markup_entity', 'markup_attribute', 'parse_error', 'html_parser') # Here there be dragons