#!/usr/bin/env python # -*- coding: utf-8 -*- ## ## Name: ASIN.py ## Purpose: Look up ISBN/ASIN numbers at Amazon.COM via screen-scraping. ## Author: M. J. Fromberger ## Info: $Id: ASIN.py 522 2007-11-14 17:07:04Z sting $ ## import inspect, re, urllib from decimal import Decimal as D from BeautifulSoup import BeautifulSoup as HTMLParser from ISBN import fix_isbn, convert_to_isbn10 # {{ class Lookup class Lookup (urllib.FancyURLopener): """A simple interface to look up ASIN/ISBN data online, using the web interface at Amazon.COM. Results are extracted from the HTML results. Subclasses of this class may override certain aspects of its default behaviour. Interface: .lookup(asin) -- look up the given ASIN; returns a dictionary. .raw_lookup(asin) -- raw interface to the above; no processing. Override protocol: The .lookup() method extracts key/value pairs from the result returned by the server; the keys are converted into method names of the form convert_, where is a variation of the key string generated by the .key_name() method. Each key/value pair is processed by the corresponding method, if defined; the results are collected into a dictionary and returned to the user. If the processing method returns None, the key/value pair is discarded. If no method is defined, the key/value pair is kept unchanged. The method .default_convert(), if defined, will be called for any keys for which processing methods are not otherwise defined. The .raw_lookup() interface bypasses the processing steps. Other fields: .url_templates -- collection of base URLs into which query strings are injected. Use '%s' to indicate where the query string should go. .tag_re -- regex defining the appearance of keys in HTML """ # Substitute the ISBN into these templates to form lookup URLs url_templates = ('http://www.amazon.com/o/ASIN/%s', # t: 0061056308 'http://www.amazon.co.uk/o/ASIN/%s', # t: 0571233139 'http://www.amazon.de/o/ASIN/%s') # t: 3938484772 # Expression matching a tag embedded in the HTML results tag_re = re.compile(r'(?ui)\s*(.+):?\s*\s*(.*)$') def http_error_404(self, *args): """[private] Convert "page not found" responses into exceptions for the benefit of the programming interface. See FancyURLopener for more information. """ raise KeyError("ASIN not found") def http_error_401(self, *args): """[private] Convert a response requiring an authentication response into an exception for the benefit of the programming interface. See FancyURLopener for more information. """ raise TypeError("Request requires authentication") @staticmethod def key_name(rawkey): """Convert a string key into a name suitable for use as a Python identifier. This is used to flatten keys for processing. """ key = re.sub(r'[-\s]+', '_', rawkey.strip().lower()) key = re.sub(r'\W+', '', key) return key def process_field(self, key, value): """Given a key and value, call any processing methods that may apply. The key is flattened to and a method named .convert_(...) is called if defined; if not, a method called .default_convert(...) is called if defined. If neither of these is defined, the key and value are returned unchanged. """ method_name = 'convert_' + self.key_name(key) if hasattr(self, method_name) and \ inspect.ismethod(getattr(self, method_name)): return getattr(self, method_name)(key, value) elif hasattr(self, 'default_convert') and \ inspect.ismethod(getattr(self, 'default_convert')): return getattr(self, 'default_convert')(key, value) else: return [(key, value)] def prep_search_query(self, raw): """Process a query string for submission to the search facility; raises ValueError if the query appears to be invalid. Passing this method does not guarantee the query will return any hits, but it should at least be legal. """ asin = re.sub(r'[- ]+', '', raw) if not re.match(r'(?i)[a-z0-9]{10,13}$', asin) or \ len(asin) not in (10, 13): raise ValueError("Not a valid ASIN: %s" % raw) if len(asin) == 13: return convert_to_isbn10(asin) else: return asin def get_search_data(self, asin): """Return the raw data from an ASIN query. Raises KeyError if the ASIN could not be found; raises TypeError if the query failed for another reason (e.g., required authentication). """ query = self.prep_search_query(asin) err = None for url in self.url_templates: full_url = url % query try: u = self.open(full_url) try: return u.read() finally: u.close() except (TypeError, KeyError, IOError), err: pass # has the effect of setting err else: if err is not None: raise err def parse_search_data(self, data): """Parse the web page returned by an ASIN query to extract product information. Returns a dictionary of text keys and values. If no information could be extracted, for example due to a format mismatch, raises ValueError. """ parser = HTMLParser(data) out = [] # Extract the author(s) from the title tag. I cannot believe # that Amazon doesn't make it any easier than this. cur = parser.first('title') ; authors = () if cur is not None: authors = list(s.strip() for s in re.split(r',\s*', re.split(r':\s*', cur.renderContents())[-1])) out.append(('Author', ', '.join(authors))) # Extract the title from the keywords tag. I cannot believe # that Amazon doesn't make it any easier than this. cur = parser.first('meta', {'name': 'keywords'}) if cur is not None: if cur.has_key('content'): fields = re.split(r',\s*', cur.get('content')) for pos, elt in enumerate(fields): if elt in authors: break else: pos = 1 title = ', '.join(fields[:pos]) out.append(('Title', title)) # Look for an anchor named "productDetails", then search its # immediate siblings for a table. cur = parser.find('a', {'name': 'productDetails'}) if cur is None: raise ValueError("Unable to locate productDetails") while True: cur = cur.findNextSibling() if cur is None: raise ValueError("Unable to locate details table") elif cur.name == 'table': break # Look for a division containing an unordered list; the list # should contain the attributes we want. cur = cur.find('div', {'class': 'content'}) if cur is None: raise ValueError("Unable to locate details table content") cur = cur.first('ul') if cur is None: raise ValueError("Unable to locate details table content list") for tag in cur.findAll('li'): match = self.tag_re.search(tag.renderContents()) if match: key = match.group(1).strip() val = match.group(2).strip() out.append((key, val)) return out def process_raw_results(self, res): """Process the raw field results created by .parse_search_data() through class-defined filters, if any; returns a new dictionary with the filtered results. """ out = {} for key, val in res: res = self.process_field(key, val) if res is not None: for k, v in res: out[k] = v return out def process_search_data(self, data): """Given raw HTML data returned from the search engine, convert it into a raw dictionary and then process the results according to the standard protocol. """ return self.process_raw_results(self.parse_search_data(data)) def lookup(self, asin): """Primary interface to ASIN lookup; returns a key/value dictionary.""" return self.process_search_data(self.get_search_data(asin)) def raw_lookup(self, asin): """Look up ASIN; returns raw key/value dictionary, not processed.""" return dict(self.parse_search_data(self.get_search_data(asin))) # }} # {{ class DataLookup class DataLookup (Lookup): """An implementation of Lookup that handles some of the more interesting data fields returned for books. """ # How to identify the portion of the Publisher field that Amazon # uses to store the publication date. Approximate only. date_re = re.compile(r'\(([\w\d, ]+)\)\s*$') # All keys not otherwise overridden or on this list are discarded keep_keys = set(('author', 'title', 'language', 'sprache', 'isbn_13')) def default_convert(self, key, value): """Discard all keys not specifically listed in keep_keys.""" key = self.key_name(key) if key in self.keep_keys: return [(key, value)] else: return None def convert_title(self, key, value): """Try to extract series and volume number information from the title, if present. """ title, comments = extract_comments(value) out = [(key.lower(), title)] if comments: ser = re.compile(r'(?P[^,]+)(?:,\s*' r'(?:b(?:oo)?k|vol(?:ume))\.?' r'\s*(?P\d+))?$', re.IGNORECASE) m = ser.match(comments[-1].strip('()')) if m: s = re.sub(r'\s+', ' ', m.group('tag').lower()) if not s.endswith('edition'): out.append(('series', re.sub(r'\s+', ' ', m.group('tag').lower()))) if m.group(2): out.append(('volume', int(m.group('volnum')))) return out def convert_isbn_10(self, key, value): return [('isbn', value)] def convert_publisher(self, key, value): """Convert publisher to factor out publication date, if possible, and to separate edition from publisher. Result may contain fields 'publisher', 'date', and 'edition' as appropriate. """ out = [] m = self.date_re.search(value) if m: out.append(('date', m.group(1))) value = value[:m.start()].strip() if ';' in value: pos = value.find(';') ed = re.sub(r'(?i) +edition', '', value[pos + 1:]).strip() # Strip off an ordinal marker, if present if ed[-2:] in ('st', 'nd', 'rd', 'th'): ed = ed[:-2] out.append(('edition', ed)) value = value[:pos].strip() if value: out.append(('publisher', value)) return out convert_verlag = convert_publisher # for German site def convert_format(self, key, value): """Generate "format" and "pages" fields.""" key = self.key_name(key) np = re.match(r'(?i)(?P\d+)\s*(pages|seiten)', value) res = [('format', key)] if np: res.append(('pages', int(np.group('pages')))) return res convert_hardcover = convert_format convert_paperback = convert_format convert_mass_market_paperback = convert_format convert_textbook_binding = convert_format convert_unknown_binding = convert_format convert_gebundene_ausgabe = convert_format # for German site convert_taschenbuch = convert_format # for German site def convert_product_dimensions(self, key, value): """Convert product dimensions into a sequence of values measured in centimeters. This may vary in size, as some products list a weight here instead of length. Where possible, lengths are converted to centimeters. If only one dimension unit is found, e.g., "a x b x c inches", it is assumed to apply to all the other unmarked dimensions. The order of dimensions is assumed to measure the book in its shelved orientation, height x depth x width. / /| /___/ | | s| |<-- height | p| | | i| | | n| / | e|/<-- depth ---/ ^-- width """ dims = list([None] + s.split() for s in re.split(r'\s*[xX]\s*', value.strip())) last_dim = None for tag, dim in zip(('height', 'depth', 'width'), dims): dim[0] = tag dim[1] = D(dim[1]) if len(dim) < 3: dim.append(None) elif re.match(r'(?i)in(ch(es)?|\.)$', dim[-1]): dim[-1] = D('2.54') # convert inches to cm elif re.match(r'(?i)c(m\.?|entimet(er|re)s?)$', dim[-1]): dim[-1] = D('1') # convert cm to cm (noop) else: # Unable to determine dimensions, bail out. This # might not be the best possible solution, but it will # have to do for now. return None last_dim = dim[-1] # Correct dimensions that were missing their units and # generate the output out = [] for dim in dims: if dim[-1] is None: dim[-1] = last_dim out.append((dim[0], '%.1f' % (dim[1] * dim[2]))) return out def convert_gre_undoder_gewicht(self, key, value): """Convert German dimensions.""" # This is kind of a greasy hack; rather than mess around with # locale settings, I'm just going to convert decimal-commas to # points, and use the English logic. return self.convert_product_dimensions(key, value.replace(',', '.')) def convert_shipping_weight(self, key, value): """Convert shipping weight into an amount in pounds.""" base = re.sub(r'\(.+\)\s*$', '', value).strip() amt, unit = re.split(r'\s+', base, 1) amt = D(amt) if re.match(r'(?i)o(unces|z\.?)', unit): amt /= D('16') elif re.match(r'(?i)k(ilo(gram)?s?|g\.?)', unit): amt *= D('2.2') elif re.match(r'(?i)g(rams?|\.)?', unit): amt *= D('0.0022') return [('weight', '%.2f' % amt)] def convert_author(self, key, value): """Convert author string into a list of names.""" names = [] for elt in list(s.strip() for s in re.split(r', *', value)): if re.match(r'(?i)(jr|sr|esq|ph\.?d)\.$', elt) and names: names[-1] += elt else: names.append(elt) return [(self.key_name(key), names)] # }} # {{ Public functions def lookup(asin): """A simple wrapper around DataLookup.lookup().""" db = DataLookup() return db.lookup(asin) def raw_lookup(asin): """A simple wrapper around DataLookup.raw_lookup().""" db = DataLookup() return db.raw_lookup(asin) def process_asin(asin): """A simple wrapper around DataLookup.prep_search_query().""" db = DataLookup() return db.prep_search_query(asin) def extract_comments(input): """Extract parenthetical comments from a string, taking account of nested parentheses. Returns a tuple (t, p) where t is the string with the parenthetical comments removed, and p is a list (maybe empty) of top-level comments. Notes: Leading and trailing whitespace are removed from t, and each group of whitespace is collapsed to a single space character. Comments retain their parentheses. However, if the input ends with an open comment still in effect, the function behaves as if a close parenthesis were inserted just after the string, and takes the rest of the string as a comment. You can detect this situation by examining the last character of the last comment; if it is not ')', then the string was unclosed. Similarly, any occurrence of ')' in t is unbalanced. This is not reported as an error, but can be detected by scanning t for ')'. """ t = '' p = [] s = [] for pos, ch in enumerate(input): if s: if ch == '(': s.append(pos) if ch == ')': old = s.pop() if not s: p.append(input[old:pos + 1]) elif ch == '(': s.append(pos) else: t += ch if s: old = s[0] p.append(input[old:]) return re.sub(r'\s+', ' ', t.strip()), p # }} __all__ = ('Lookup', 'DataLookup', 'lookup', 'raw_lookup', 'process_asin', 'extract_comments') # Here there be dragons