#!/usr/bin/env python
# -*- coding: utf-8 -*-
##
## Name: ASIN.py
## Purpose: Look up ISBN/ASIN numbers at Amazon.COM via screen-scraping.
## Author: M. J. Fromberger
## Info: $Id: ASIN.py 522 2007-11-14 17:07:04Z sting $
##
import inspect, re, urllib
from decimal import Decimal as D
from BeautifulSoup import BeautifulSoup as HTMLParser
from ISBN import fix_isbn, convert_to_isbn10
# {{ class Lookup
class Lookup (urllib.FancyURLopener):
"""A simple interface to look up ASIN/ISBN data online, using the
web interface at Amazon.COM. Results are extracted from the HTML
results. Subclasses of this class may override certain aspects of
its default behaviour.
Interface:
.lookup(asin) -- look up the given ASIN; returns a dictionary.
.raw_lookup(asin) -- raw interface to the above; no processing.
Override protocol:
The .lookup() method extracts key/value pairs from the result
returned by the server; the keys are converted into method names
of the form convert_, where is a variation of the key
string generated by the .key_name() method. Each key/value pair
is processed by the corresponding method, if defined; the results
are collected into a dictionary and returned to the user. If the
processing method returns None, the key/value pair is discarded.
If no method is defined, the key/value pair is kept unchanged.
The method .default_convert(), if defined, will be called for any
keys for which processing methods are not otherwise defined.
The .raw_lookup() interface bypasses the processing steps.
Other fields:
.url_templates -- collection of base URLs into which query strings
are injected. Use '%s' to indicate where the
query string should go.
.tag_re -- regex defining the appearance of keys in HTML
"""
# Substitute the ISBN into these templates to form lookup URLs
url_templates = ('http://www.amazon.com/o/ASIN/%s', # t: 0061056308
'http://www.amazon.co.uk/o/ASIN/%s', # t: 0571233139
'http://www.amazon.de/o/ASIN/%s') # t: 3938484772
# Expression matching a tag embedded in the HTML results
tag_re = re.compile(r'(?ui)\s*(.+):?\s*\s*(.*)$')
def http_error_404(self, *args):
"""[private] Convert "page not found" responses into
exceptions for the benefit of the programming interface.
See FancyURLopener for more information.
"""
raise KeyError("ASIN not found")
def http_error_401(self, *args):
"""[private] Convert a response requiring an authentication
response into an exception for the benefit of the programming
interface.
See FancyURLopener for more information.
"""
raise TypeError("Request requires authentication")
@staticmethod
def key_name(rawkey):
"""Convert a string key into a name suitable for use as a
Python identifier. This is used to flatten keys for
processing.
"""
key = re.sub(r'[-\s]+', '_', rawkey.strip().lower())
key = re.sub(r'\W+', '', key)
return key
def process_field(self, key, value):
"""Given a key and value, call any processing methods that may
apply. The key is flattened to and a method named
.convert_(...) is called if defined; if not, a method
called .default_convert(...) is called if defined. If neither
of these is defined, the key and value are returned unchanged.
"""
method_name = 'convert_' + self.key_name(key)
if hasattr(self, method_name) and \
inspect.ismethod(getattr(self, method_name)):
return getattr(self, method_name)(key, value)
elif hasattr(self, 'default_convert') and \
inspect.ismethod(getattr(self, 'default_convert')):
return getattr(self, 'default_convert')(key, value)
else:
return [(key, value)]
def prep_search_query(self, raw):
"""Process a query string for submission to the search
facility; raises ValueError if the query appears to be
invalid. Passing this method does not guarantee the query
will return any hits, but it should at least be legal.
"""
asin = re.sub(r'[- ]+', '', raw)
if not re.match(r'(?i)[a-z0-9]{10,13}$', asin) or \
len(asin) not in (10, 13):
raise ValueError("Not a valid ASIN: %s" % raw)
if len(asin) == 13:
return convert_to_isbn10(asin)
else:
return asin
def get_search_data(self, asin):
"""Return the raw data from an ASIN query. Raises KeyError if
the ASIN could not be found; raises TypeError if the query
failed for another reason (e.g., required authentication).
"""
query = self.prep_search_query(asin)
err = None
for url in self.url_templates:
full_url = url % query
try:
u = self.open(full_url)
try:
return u.read()
finally:
u.close()
except (TypeError, KeyError, IOError), err:
pass # has the effect of setting err
else:
if err is not None:
raise err
def parse_search_data(self, data):
"""Parse the web page returned by an ASIN query to extract
product information. Returns a dictionary of text keys and
values. If no information could be extracted, for example due
to a format mismatch, raises ValueError.
"""
parser = HTMLParser(data)
out = []
# Extract the author(s) from the title tag. I cannot believe
# that Amazon doesn't make it any easier than this.
cur = parser.first('title') ; authors = ()
if cur is not None:
authors = list(s.strip() for s in
re.split(r',\s*',
re.split(r':\s*',
cur.renderContents())[-1]))
out.append(('Author', ', '.join(authors)))
# Extract the title from the keywords tag. I cannot believe
# that Amazon doesn't make it any easier than this.
cur = parser.first('meta', {'name': 'keywords'})
if cur is not None:
if cur.has_key('content'):
fields = re.split(r',\s*', cur.get('content'))
for pos, elt in enumerate(fields):
if elt in authors: break
else:
pos = 1
title = ', '.join(fields[:pos])
out.append(('Title', title))
# Look for an anchor named "productDetails", then search its
# immediate siblings for a table.
cur = parser.find('a', {'name': 'productDetails'})
if cur is None:
raise ValueError("Unable to locate productDetails")
while True:
cur = cur.findNextSibling()
if cur is None:
raise ValueError("Unable to locate details table")
elif cur.name == 'table':
break
# Look for a division containing an unordered list; the list
# should contain the attributes we want.
cur = cur.find('div', {'class': 'content'})
if cur is None:
raise ValueError("Unable to locate details table content")
cur = cur.first('ul')
if cur is None:
raise ValueError("Unable to locate details table content list")
for tag in cur.findAll('li'):
match = self.tag_re.search(tag.renderContents())
if match:
key = match.group(1).strip()
val = match.group(2).strip()
out.append((key, val))
return out
def process_raw_results(self, res):
"""Process the raw field results created by
.parse_search_data() through class-defined filters, if any;
returns a new dictionary with the filtered results.
"""
out = {}
for key, val in res:
res = self.process_field(key, val)
if res is not None:
for k, v in res:
out[k] = v
return out
def process_search_data(self, data):
"""Given raw HTML data returned from the search engine, convert
it into a raw dictionary and then process the results according
to the standard protocol.
"""
return self.process_raw_results(self.parse_search_data(data))
def lookup(self, asin):
"""Primary interface to ASIN lookup; returns a key/value dictionary."""
return self.process_search_data(self.get_search_data(asin))
def raw_lookup(self, asin):
"""Look up ASIN; returns raw key/value dictionary, not processed."""
return dict(self.parse_search_data(self.get_search_data(asin)))
# }}
# {{ class DataLookup
class DataLookup (Lookup):
"""An implementation of Lookup that handles some of the more
interesting data fields returned for books.
"""
# How to identify the portion of the Publisher field that Amazon
# uses to store the publication date. Approximate only.
date_re = re.compile(r'\(([\w\d, ]+)\)\s*$')
# All keys not otherwise overridden or on this list are discarded
keep_keys = set(('author', 'title', 'language', 'sprache', 'isbn_13'))
def default_convert(self, key, value):
"""Discard all keys not specifically listed in keep_keys."""
key = self.key_name(key)
if key in self.keep_keys:
return [(key, value)]
else:
return None
def convert_title(self, key, value):
"""Try to extract series and volume number information from
the title, if present.
"""
title, comments = extract_comments(value)
out = [(key.lower(), title)]
if comments:
ser = re.compile(r'(?P[^,]+)(?:,\s*'
r'(?:b(?:oo)?k|vol(?:ume))\.?'
r'\s*(?P\d+))?$', re.IGNORECASE)
m = ser.match(comments[-1].strip('()'))
if m:
s = re.sub(r'\s+', ' ', m.group('tag').lower())
if not s.endswith('edition'):
out.append(('series', re.sub(r'\s+', ' ',
m.group('tag').lower())))
if m.group(2):
out.append(('volume', int(m.group('volnum'))))
return out
def convert_isbn_10(self, key, value):
return [('isbn', value)]
def convert_publisher(self, key, value):
"""Convert publisher to factor out publication date, if
possible, and to separate edition from publisher. Result may
contain fields 'publisher', 'date', and 'edition' as
appropriate.
"""
out = []
m = self.date_re.search(value)
if m:
out.append(('date', m.group(1)))
value = value[:m.start()].strip()
if ';' in value:
pos = value.find(';')
ed = re.sub(r'(?i) +edition', '',
value[pos + 1:]).strip()
# Strip off an ordinal marker, if present
if ed[-2:] in ('st', 'nd', 'rd', 'th'):
ed = ed[:-2]
out.append(('edition', ed))
value = value[:pos].strip()
if value:
out.append(('publisher', value))
return out
convert_verlag = convert_publisher # for German site
def convert_format(self, key, value):
"""Generate "format" and "pages" fields."""
key = self.key_name(key)
np = re.match(r'(?i)(?P\d+)\s*(pages|seiten)', value)
res = [('format', key)]
if np:
res.append(('pages', int(np.group('pages'))))
return res
convert_hardcover = convert_format
convert_paperback = convert_format
convert_mass_market_paperback = convert_format
convert_textbook_binding = convert_format
convert_unknown_binding = convert_format
convert_gebundene_ausgabe = convert_format # for German site
convert_taschenbuch = convert_format # for German site
def convert_product_dimensions(self, key, value):
"""Convert product dimensions into a sequence of values
measured in centimeters. This may vary in size, as some
products list a weight here instead of length. Where
possible, lengths are converted to centimeters.
If only one dimension unit is found, e.g., "a x b x c inches",
it is assumed to apply to all the other unmarked dimensions.
The order of dimensions is assumed to measure the book in its
shelved orientation, height x depth x width.
/ /|
/___/ |
| s| |<-- height
| p| |
| i| |
| n| /
| e|/<-- depth
---/
^-- width
"""
dims = list([None] + s.split()
for s in re.split(r'\s*[xX]\s*', value.strip()))
last_dim = None
for tag, dim in zip(('height', 'depth', 'width'), dims):
dim[0] = tag
dim[1] = D(dim[1])
if len(dim) < 3:
dim.append(None)
elif re.match(r'(?i)in(ch(es)?|\.)$', dim[-1]):
dim[-1] = D('2.54') # convert inches to cm
elif re.match(r'(?i)c(m\.?|entimet(er|re)s?)$', dim[-1]):
dim[-1] = D('1') # convert cm to cm (noop)
else:
# Unable to determine dimensions, bail out. This
# might not be the best possible solution, but it will
# have to do for now.
return None
last_dim = dim[-1]
# Correct dimensions that were missing their units and
# generate the output
out = []
for dim in dims:
if dim[-1] is None:
dim[-1] = last_dim
out.append((dim[0], '%.1f' % (dim[1] * dim[2])))
return out
def convert_gre_undoder_gewicht(self, key, value):
"""Convert German dimensions."""
# This is kind of a greasy hack; rather than mess around with
# locale settings, I'm just going to convert decimal-commas to
# points, and use the English logic.
return self.convert_product_dimensions(key, value.replace(',', '.'))
def convert_shipping_weight(self, key, value):
"""Convert shipping weight into an amount in pounds."""
base = re.sub(r'\(.+\)\s*$', '', value).strip()
amt, unit = re.split(r'\s+', base, 1)
amt = D(amt)
if re.match(r'(?i)o(unces|z\.?)', unit):
amt /= D('16')
elif re.match(r'(?i)k(ilo(gram)?s?|g\.?)', unit):
amt *= D('2.2')
elif re.match(r'(?i)g(rams?|\.)?', unit):
amt *= D('0.0022')
return [('weight', '%.2f' % amt)]
def convert_author(self, key, value):
"""Convert author string into a list of names."""
names = []
for elt in list(s.strip() for s in re.split(r', *', value)):
if re.match(r'(?i)(jr|sr|esq|ph\.?d)\.$', elt) and names:
names[-1] += elt
else:
names.append(elt)
return [(self.key_name(key), names)]
# }}
# {{ Public functions
def lookup(asin):
"""A simple wrapper around DataLookup.lookup()."""
db = DataLookup()
return db.lookup(asin)
def raw_lookup(asin):
"""A simple wrapper around DataLookup.raw_lookup()."""
db = DataLookup()
return db.raw_lookup(asin)
def process_asin(asin):
"""A simple wrapper around DataLookup.prep_search_query()."""
db = DataLookup()
return db.prep_search_query(asin)
def extract_comments(input):
"""Extract parenthetical comments from a string, taking account of
nested parentheses. Returns a tuple (t, p) where t is the string
with the parenthetical comments removed, and p is a list (maybe
empty) of top-level comments.
Notes:
Leading and trailing whitespace are removed from t, and each group
of whitespace is collapsed to a single space character.
Comments retain their parentheses. However, if the input ends
with an open comment still in effect, the function behaves as if a
close parenthesis were inserted just after the string, and takes
the rest of the string as a comment. You can detect this
situation by examining the last character of the last comment; if
it is not ')', then the string was unclosed.
Similarly, any occurrence of ')' in t is unbalanced. This is not
reported as an error, but can be detected by scanning t for ')'.
"""
t = ''
p = []
s = []
for pos, ch in enumerate(input):
if s:
if ch == '(':
s.append(pos)
if ch == ')':
old = s.pop()
if not s:
p.append(input[old:pos + 1])
elif ch == '(':
s.append(pos)
else:
t += ch
if s:
old = s[0]
p.append(input[old:])
return re.sub(r'\s+', ' ', t.strip()), p
# }}
__all__ = ('Lookup', 'DataLookup', 'lookup', 'raw_lookup',
'process_asin', 'extract_comments')
# Here there be dragons