#!/usr/bin/env python ## ## Name: uuextract ## Purpose: Extract tarballs from e-mail messages. ## Author: M. J. Fromberger ## Info: $Id: uuextract,v 1.2 2005/06/28 22:38:19 sting Exp $ ## ## Notes: ## Copyright (C) 2003 Michael J. Fromberger, All Rights Reserved ## ## Usage: ## uuextract ## import sys, os, tempfile, string, uu import mailbox, email, sre, base64, quopri, tarfile from email.Utils import parseaddr from stat import * if len(sys.argv) < 3: print "Usage: uuextract " sys.exit(1) begin_tag = sre.compile('^begin\s+([0-7]{3})\s+(.+)$', sre.MULTILINE | sre.IGNORECASE) # Set create_tempfile to be a function returning a suitable temporary # file name as a tuple (fd, name), with fd open for writing # if hasattr(tempfile, 'mkstemp'): create_tempfile = tempfile.mkstemp; else: print "Warning -- this system does not have mkstemp(), using mktemp()" def create_tempfile(dir = ''): if dir: tempfile.tempdir = dir fname = tempfile.mktemp() fd = os.open(fname, os.O_CREAT | os.O_TRUNC | os.O_RDWR, 0600) return fd, fname # -------------------------------------------------------------------- def find_body(msg): """Look through the given message body (possibly multipart) for a part consisting of a uuencoded file. If found, return a tuple consisting of the file name and the text of the message, broken out into a list of strings. Otherwise, returns (None, None).""" found = None # For multipart messages, look at each part separately; these may # need to be decoded if msg.is_multipart(): for part in msg.get_payload(): encoding = part.get('content-transfer-encoding') if encoding == None: continue if encoding == "BASE64": text = base64.decodestring(part.get_payload()) elif encoding == "QUOTED-PRINTABLE": text = quopri.decodestring(part.get_payload()) else: continue text = text.replace('\r', '') found = begin_tag.search(text) if found: break else: text = msg.get_payload() found = begin_tag.search(text) if found: return (found.group(1), text.split('\n')) else: return (None, None) # -------------------------------------------------------------------- def check_dir_for(username): """If a directory whose name is the same as the given user name is found in the current working directory, this function returns True. Otherwise, it attempts to create such a directory, and returns either True (if created) or False (in case of failure).""" try: info = os.stat(username) if info[ST_MODE] & S_IFDIR: return True except OSError: try: os.mkdir(username) return True except OSError: return False return False def process_message(who_name, who_username, file_name, body): """Extract a uuencoded and possibly compressed tar file from the given message body, and unpack the tar file into a directory named for the given user name.""" check_dir_for(who_username) # Make a safe temp file and dump the uuencoded body into it, then # unpack that file and get rid of the temp. fd, path = create_tempfile(dir = ".") tmp = os.fdopen(fd, 'w') for line in body: tmp.write(line) tmp.write("\n") tmp.close() try: uu.decode(path, out_file = file_name) except: print " -- unable to decode `%s': %s" % (file_name, sys.exc_value) print "" os.unlink(path) return os.unlink(path) # Try to open the unpacked file as a tar archive, and unpack all # its members into the username directory. print "Extracting files for %s into `%s':" % (who_name, who_username) try: tfile = tarfile.open(file_name) for elt in tfile.getmembers(): tfile.extract(elt, path = who_username) print " -- %s" % elt.name except tarfile.ReadError, msg: print " -- unable to open `%s': %s" % (file_name, msg) print "" return # Finally, get rid of the original archive os.unlink(file_name) print "" # -------------------------------------------------------------------- # Open up the input file and enter the target directory, creating it # if it does not already exist. # try: fp = open(sys.argv[1], 'r') except IOError, (err, msg): print "Unable to open mailbox `%s': %s" % (sys.argv[1], msg) sys.exit(1) try: info = os.stat(sys.argv[2]) if not (info[ST_MODE] & S_IFDIR): print "The file `%s' exists, but is not a directory" % sys.argv[2] sys.exit(2) print "The target directory `%s' exists" % sys.argv[2] except OSError: os.mkdir(sys.argv[2]) print "Created new target directory `%s'" % sys.argv[2] os.chdir(sys.argv[2]) # Set up a mailbox reader for the input file, and suck in the # messages, finding those which contain uuencoded files. mbox = mailbox.PortableUnixMailbox(fp, email.message_from_file) while 1: msg = mbox.next() if msg == None: break who_addr = msg.get("from"); who_name, who_email = parseaddr(who_addr) who_username = who_email.split('@', 1)[0] # In general, we should find the files in the body of the message, # not as an attachment; however, it's possible somebody might not # follow directions. # (file_name, body) = find_body(msg) if file_name == None: print " -- No file found in message from %s" % who_name continue process_message(who_name, who_username, file_name, body) print "" sys.exit(0) # -- Here there be dragons -------------------------------------------